The background,

The Stream type is a new type since Redis5. In this article, we implement the use of Spring Boot Data Redis to consume data in redis Stream. Realize independent consumption and consumption group consumption.

Second, integration steps

1. Introduce jar packages

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
  </dependency>
  <dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
    <version>2.11.1</version>
  </dependency>
</dependencies>
Copy the code

Mainly the package above, other irrelevant packages omitted here import.

2. Configure the RedisTemplate dependency

@Configuration
public class RedisConfig {
    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        / / this place do not use json serialization, if you are using ObjectRecord transfer objects, there may be a problem, there will be a Java. Lang. IllegalArgumentException: the Value must not be null! error
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        returnredisTemplate; }}Copy the code

Note:

Note the serialization method of setHashValueSerializer. Details will be discussed later.

3. Prepare an entity object

This entity object is the object that needs to be sent to the Stream.

@Getter
@Setter
@ToString
public class Book {
    private String title;
    private String author;
    
    public static Book create(a) {
        com.github.javafaker.Book fakerBook = Faker.instance().book();
        Book book = new Book();
        book.setTitle(fakerBook.title());
        book.setAuthor(fakerBook.author());
        returnbook; }}Copy the code

Each time the create method is called, an object of Book is automatically generated, and the object simulation data is generated using Javafaker simulation.

4. Write a constant class and configure the Stream name

/** ** constant ** /
public class Cosntants {
    
    public static final String STREAM_KEY_001 = "stream-001";
    
}
Copy the code

Write a producer that produces data into a Stream

Write a producer that generates ObjectRecord data into the Stream

/** * Message producer */
@Component
@RequiredArgsConstructor
@Slf4j
public class StreamProducer {
    
    private final RedisTemplate<String, Object> redisTemplate;
    
    public void sendRecord(String streamKey) {
        Book book = Book.create();
        log.info("Generate a book of information :[{}]", book);
        
        ObjectRecord<String, Book> record = StreamRecords.newRecord()
                .in(streamKey)
                .ofObject(book)
                .withId(RecordId.autoGenerate());
        
        RecordId recordId = redisTemplate.opsForStream()
                .add(record);
        
        log.info("Record-id returned :[{}]", recordId); }}Copy the code

2. Produce data to Stream every 5s

/** * periodically generates messages to the stream */
@Component
@AllArgsConstructor
public class CycleGeneratorStreamMessageRunner implements ApplicationRunner {
    
    private final StreamProducer streamProducer;
    
    @Override
    public void run(ApplicationArguments args) {
        Executors.newSingleThreadScheduledExecutor()
                .scheduleAtFixedRate(() -> streamProducer.sendRecord(STREAM_KEY_001),
                        0.5, TimeUnit.SECONDS); }}Copy the code

Third, independent consumption

Independent consumption refers to the direct consumption of messages in a Stream outside the consumption group. The data in the Stream is read using the Xread method. The data in the Stream is not deleted after reading, but still exists. If multiple programs read using Xread at the same time, messages can be read.

1, implementation from scratch consumption -xread implementation

The implementation here is to consume from the first message of the Stream

package com.huan.study.redis.stream.consumer.xread;

import com.huan.study.redis.constan.Cosntants;
import com.huan.study.redis.entity.Book;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/** ** Unconsume Stream data directly, and get all Stream messages */
@Component
@Slf4j
public class XreadNonBlockConsumer01 implements InitializingBean.DisposableBean {
    
    private ThreadPoolExecutor threadPoolExecutor;
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
    
    private volatile boolean stop = false;
    
    @Override
    public void afterPropertiesSet(a) {
        
        // Initialize the thread pool
        threadPoolExecutor = new ThreadPoolExecutor(1.1.0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("xread-nonblock-01");
            return thread;
        });
        
        StreamReadOptions streamReadOptions = StreamReadOptions.empty()
                // If there is no data, the blocking time for 1s needs to be less than the time configured for 'spring.redis.timeout'
                .block(Duration.ofMillis(1000))
                // Block until data is retrieved, a timeout exception may be reported
                // .block(Duration.ofMillis(0))
                // Get 10 data at a time
                .count(10);
        
        StringBuilder readOffset = new StringBuilder("0-0");
        threadPoolExecutor.execute(() -> {
            while(! stop) {// When using xread to read data, you need to record the last read to offset, and then use it as the next read offset, otherwise the read data will have problems
                List<ObjectRecord<String, Book>> objectRecords = redisTemplate.opsForStream()
                        .read(Book.class, streamReadOptions, StreamOffset.create(Cosntants.STREAM_KEY_001, ReadOffset.from(readOffset.toString())));
                if (CollectionUtils.isEmpty(objectRecords)) {
                    log.warn("No data was obtained.");
                    continue;
                }
                for (ObjectRecord<String, Book> objectRecord : objectRecords) {
                    log.info(Id :[{}] book:[{}]", objectRecord.getId(), objectRecord.getValue());
                    readOffset.setLength(0); readOffset.append(objectRecord.getId()); }}}); }@Override
    public void destroy(a) throws Exception {
        stop = true;
        threadPoolExecutor.shutdown();
        threadPoolExecutor.awaitTermination(3, TimeUnit.SECONDS); }}Copy the code

Note:

The next time data is read, offset is the last obtained ID value, otherwise data may be missed.

2, StreamMessageListenerContainer implementation independent consumption

See the code for consumer group consumption below

Consumption group consumption

1. Implement StreamListener interface

The purpose of this interface is to consume data in the Stream. Need to pay attention to at the time of registration are using streamMessageListenerContainer. ReceiveAutoAck () or streamMessageListenerContainer. The receive () method, if this is the second, you need to manually ack, Manual ack code: redisTemplate opsForStream () acknowledge (” key “, “group”, “recordId”);

/** * asynchronously consume ** via listener@authorHuan. Fu 2021/11/10-5:51 PM */
@Slf4j
@Getter
@Setter
public class AsyncConsumeStreamListener implements StreamListener<String.ObjectRecord<String.Book>> {
    /** * Types of consumers: independent consumption, consumer group consumption */
    private String consumerType;
    /** * Consumer group */
    private String group;
    /** * A consumer in the consumer group */
    private String consumerName;
    
    public AsyncConsumeStreamListener(String consumerType, String group, String consumerName) {
        this.consumerType = consumerType;
        this.group = group;
        this.consumerName = consumerName;
    }
    
    private RedisTemplate<String, Object> redisTemplate;
    
    @Override
    public void onMessage(ObjectRecord<String, Book> message) {
        String stream = message.getStream();
        RecordId id = message.getId();
        Book value = message.getValue();
        if (StringUtils.isBlank(group)) {
            log.info("[{}]: received a message stream:[{}], ID :[{}],value:[{}]", consumerType, stream, id, value);
        } else {
            log.info("[{}] group: [{}] consumerName: [{}] receives a message stream: [{}], id: [{}], value: [{}]." ", consumerType,
                    group, consumerName, stream, id, value);
        }
        
        // If it is not automatic ack, it needs to be ack manually in this place
        // redisTemplate.opsForStream()
        // .acknowledge("key","group","recordId");}}Copy the code

2. Error handling during the process of getting a consumption or consumption message

/** * An exception occurred during the StreamPollTask or listener consumption of the message **@authorHuan. Fu 2021/11/11-3:44 PM */
@Slf4j
public class CustomErrorHandler implements ErrorHandler {
    @Override
    public void handleError(Throwable t) {
        log.error("Something abnormal has occurred.", t); }}Copy the code

3. Consumer group configuration

/** * Redis Stream consumer group configuration **@authorHuan. Fu 2021/11/11-12:22 PM */
@Configuration
public class RedisStreamConfiguration {
    
    @Resource
    private RedisConnectionFactory redisConnectionFactory;
    
    /** * can support both individual consumption and consumer group consumption * <p> * can support dynamic addition and deletion of consumers * <p> * consumer groups need to be created in advance **@return StreamMessageListenerContainer
     */
    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer() {
        AtomicInteger index = new AtomicInteger(1);
        int processors = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(), r -> {
            Thread thread = new Thread(r);
            thread.setName("async-stream-consumer-" + index.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        });
        
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Book>> options =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // The maximum number of messages to be obtained at a time
                        .batchSize(10)
                        // Run the Stream poll task
                        .executor(executor)
                        // Serialize the Stream Key
                        .keySerializer(RedisSerializer.string())
                        // serialize the key of the field behind Stream
                        .hashKeySerializer(RedisSerializer.string())
                        // serialize the value of the field after Stream
                        .hashValueSerializer(RedisSerializer.string())
                        // How long does the Stream block when there is no message in it? It needs to be shorter than 'spring.redis
                        .pollTimeout(Duration.ofSeconds(1))
                        Filed and value of the object are converted into a Map. For example, the Book object is converted into a Map
                        .objectMapper(new ObjectHashMapper())
                        Exception handling occurred during the process of getting the message or getting the message to a specific message handler
                        .errorHandler(new CustomErrorHandler())
                        // Convert the Record sent to Stream to ObjectRecord, the type specified here
                        .targetType(Book.class)
                        .build();
        
        StreamMessageListenerContainer<String, ObjectRecord<String, Book>> streamMessageListenerContainer =
                StreamMessageListenerContainer.create(redisConnectionFactory, options);
        
        // Independent consumption
        String streamKey = Cosntants.STREAM_KEY_001;
        streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
                new AsyncConsumeStreamListener("Independent consumption".null.null));
        
        // Consumer group A, no automatic ACK
        // Start consuming from messages that are not assigned to consumers in the consumer group
        streamMessageListenerContainer.receive(Consumer.from("group-a"."consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("Consumer group consumption"."group-a"."consumer-a"));
        // Start consuming from messages that are not assigned to consumers in the consumer group
        streamMessageListenerContainer.receive(Consumer.from("group-a"."consumer-b"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("Consumer group consumption A"."group-a"."consumer-b"));
        
        // Consumer group B, automatic ACK
        streamMessageListenerContainer.receiveAutoAck(Consumer.from("group-b"."consumer-a"),
                StreamOffset.create(streamKey, ReadOffset.lastConsumed()), new AsyncConsumeStreamListener("Consumer group consumption B"."group-b"."consumer-bb"));
        
        // If you need to personalize a consumer, pass the StreamReadRequest object when the Register method is called
        
        returnstreamMessageListenerContainer; }}Copy the code

Note:

Build your consumer group in advance

127.0.0.1:6379> xgroup create stream-001 group-a $OK 127.0.0.1:6379> xgroup create stream-001 group-b $OKCopy the code

1. Exclusive consumption configuration

StreamMessageListenerContainer. The receive (StreamOffset fromStart (streamKey), new AsyncConsumeStreamListener (" independent consumption ", null, null));Copy the code

Do not pass Consumer.

2. Configure consumer groups – do not automatically ack messages

streamMessageListenerContainer.receive(Consumer.from("group-a", "consumer-b"), StreamOffset.create(streamKey, ReadOffset. LastConsumed ()), the new AsyncConsumeStreamListener (" consumption group A, "" group - A", "consumer - b"));Copy the code

1. Note the value of ReadOffset.

2. Note that the group needs to be created in advance.

3. Configure consumer group-automatic ACK messages

streamMessageListenerContainer.receiveAutoAck()
Copy the code

Serialization strategy

Stream Property Serializer Description
key keySerializer used for Record#getStream()
field hashKeySerializer used for each map key in the payload
value hashValueSerializer used for each map value in the payload

Vi.ReadOffsetstrategy

The Read Offset policy for consuming messages

Read offset Standalone Consumer Group
Latest Read latest message Read latest message
Specific Message Id Use last seen message as the next MessageId

(Reads a message greater than the specified message ID)
Use last seen message as the next MessageId

(Reads a message greater than the specified message ID)
Last Consumed Use last seen message as the next MessageId

(Reads a message greater than the specified message ID)
Last consumed message as per consumer group

(Reads a message that has not been assigned to a consumer group in a consumer group)

Seven, notes

1. Timeout for reading messages

When using streamReadoptions.empty ().block(duration.ofmillis (1000)), this configuration must block for a shorter time than spring.redis.timeout. Otherwise, a timeout exception may be reported.

ObjectRecord deserialization error

If the following exceptions occur when we read messages, the troubleshooting roadmap is as follows:

java.lang.IllegalArgumentException: Value must not be null! at org.springframework.util.Assert.notNull(Assert.java:201) at org.springframework.data.redis.connection.stream.Record.of(Record.java:81) at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147) at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:138) at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecords(StreamObjectMapper.java:164) at org.springframework.data.redis.core.StreamOperations.map(StreamOperations.java:594) at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:413) at com.huan.study.redis.stream.consumer.xread.XreadNonBlockConsumer02.lambda$afterPropertiesSet$1(XreadNonBlockConsumer02.j ava:61) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Copy the code

RedisSerializer. String () is the best way to serialize RedisTemplate HashValueSerializer.

2, check redisTemplate. OpsForStream HashMapper () in the configuration, the default is ObjectHashMapper this is the value of the object field and serialized into byte [] format.

Provide an available configuration

# RedisTemplate hash value use of type string serialization RedisTemplate. SetHashValueSerializer (RedisSerializer. The string ()); # this method opsForStream () using the default ObjectHashMapper redisTemplate. OpsForStream ()Copy the code

As for the above mistake, I submitted an issue in the official warehouse of Spring Data Redis, and the official reply was that it was a bug and would be fixed later.

3. Use xread to read missing data sequentially

If we read data using Xread and find that some write data is missing, we need to check that the StreamOffset configured on the second read is valid. This value should be the last value of the last read.

For example:

1, SteamOffset passes $to read the latest data.

2. Process the data read in the previous step, while another producer inserts more data into the Stream, which is still processing the data read.

$Stream = Stream; $Stream = Stream; The consumer will not be able to read the data that flowed into the Stream in the previous step because it is reading the most recent data.

4,StreamMessageListenerContainerThe use of

1. Consumers can be added and removed dynamically

2. Group consumption is available

3, can be directly independent consumption

4, If you transfer ObjectRecord, need to pay attention to the serialization method. Refer to the code above.

Viii. Complete code

Gitee.com/huan1993/sp…

Ix. Reference documents

1, the docs. Spring. IO/spring – the data… 2, github.com/spring-proj…