preface

The subsidiary, with a small number of users, uses the framework of the parent company to provide services. Because it is engaged in overseas projects and has changed many third-party services, it does not want to be deeply involved with ali Cloud products. Therefore, it plans to transfer some simple businesses from RocketMQ to Redis Stream. I’ll test the waters first.

Lettuce version

Because the jedis 2.9 version used in the company project does not support stream at the moment, and there are a lot of internal logic, so it is difficult to upgrade the library at the moment, SO I will try spring-boot-starter-data-redis first.

Xread and xreadgroup

Since the business is not complicated and there is only one service to do this, I was going to use Xread to try it out. Then I thought, what if a machine is suddenly added? I started two test projects to try it out. Using Xread is actually like PubSub, every linked client can read it, which is not suitable for our scenario, so WE used XreadGroup directly

Create consumer groups

The $represents starting at queue 0; > indicates that the group mode reads from the value last recorded by the consumer group

// the redis command XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key  groupname consumername]Copy the code

The Java code is as follows

// test: stream key, readoffset.lastest: $, consumer: group key
redisTemplate.opsForStream().createGroup("test",ReadOffset.latest(),"consumer");
Copy the code

If the stream key is read in group mode, the stream key is not equipped with a group exception. So we can set it up in the program

// redis XINFO GROUPS streamkeyCopy the code

The Java code is as follows

StreamInfo.XInfoGroups info = redisTemplate.opsForStream().groups("test"); if(info! =null){ info.stream( x-> ??) // Check whether the required consumption group exists.Copy the code

Read data from the queue

StreamReadOptions options = streamReadOptions.empty (); Options = options.block(duration.ofhours (1)); // Set the number of read options = options.count(1); // set automatic submission //options = options.autoacknowledge (); Consumer = Consumer. From (" Consumer ","consumer2"); Consumer = Consumer. // Define what stream to read from and where in the stream to read from, where $is latest(),> is lastConsumerd() and is used only for group reads. StreamOffset<String> offset = StreamOffset.create("test",ReadOffset.lastConsumed()); while(true) { List<MapRecord<String, Object, Object>> list = redisTemplate.opsForStream().read(consumer, options,offset); if(list! =null){handle(list)}}Copy the code

Add data to stream

Map<String,String> map =new HashMap(1); Map. Put ("xx"," like Zhang Shuo "); redisTemplate.opsForStream().add("test",map);Copy the code

The effect

Redis configuration

package com.zxs.redisconsumersecond.config; import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import java.time.Duration; /** * @author zh * @date 2020/12/1 16:49 */ @Slf4j @Configuration public class RedisConfig { @Bean @ConfigurationProperties(prefix = "spring.redis.test") public RedisStandaloneConfiguration redisStandaloneConfiguration() { return new RedisStandaloneConfiguration(); } @Bean public LettuceClientConfiguration lettuceClientConfiguration() { return LettucePoolingClientConfiguration .builder().poolConfig(new GenericObjectPoolConfig<>()) // ** CommandTimeout (duration.zero).build(); } @Bean public RedisTemplate<String, String> redisTemplate() { RedisTemplate<String, String> template = getRedisTemplate(); LettuceConnectionFactory factory = new LettuceConnectionFactory(redisStandaloneConfiguration(), lettuceClientConfiguration()); // This is very important, because if not set it will result in a null pointer exception when Redis gets nativeConnection. Because the factory. The afterPropertiesSet (); template.setConnectionFactory(factory); return template; } private RedisTemplate<String, String> getRedisTemplate() { RedisTemplate<String, String> template = new RedisTemplate<>(); template.setKeySerializer(keySerializer()); template.setValueSerializer(keySerializer()); template.setHashKeySerializer(keySerializer()); template.setHashValueSerializer(valueSerializer()); return template; } private RedisSerializer<? > keySerializer() { return new StringRedisSerializer(); } private RedisSerializer<? > valueSerializer() { return new StringRedisSerializer(); }}Copy the code

Explain afterPropertiesSet() above; Need an object LettuceConnectionProvider, redis connection for abnormal is not set to an empty words will quoteThe source code for the afterPropertiesSet() method is as follows

public void afterPropertiesSet() { this.client = this.createClient(); this.connectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceConnection.CODEC)); this.reactiveConnectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC)); if (this.isClusterAware()) { this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient)this.client), new LettuceClusterNodeResourceProvider(this.connectionProvider), EXCEPTION_TRANSLATION); } if (this.getEagerInitialization() && this.getShareNativeConnection()) { this.initConnection(); }}Copy the code

pom.xml

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <groupId>org.apache.commons</groupId> The < artifactId > Commons - pool2 < / artifactId > < version > 2.9.0 < / version > < / dependency > < / dependencies >Copy the code