define

Backdating consumption is when a Consumer has successfully consumed a message. To support this function, the message needs to be retained after the Broker has delivered a successful message to the Consumer because of business requirements for re-consumption. Reconsumption is usually done in time. For example, if the Consumer system fails and needs to be reconsumed one hour ago, the Broker should provide a mechanism to reverse the consumption progress in time. RocketMQ supports backtracking of consumption in time, down to the millisecond.

Demo

We still use the programming model of Spring Cloud Stream + Spring Cloud Alibaba RocketMQ for implementation.

The theory of

In the consumer, can set a field ConsumeFromWhere (source locations in: org.apache.rocketmq.com mon. Consumer. ConsumeFromWhere), where to begin. Optional argument, remove Deprecated, and what is left is

public enum ConsumeFromWhere {
  CONSUME_FROM_LAST_OFFSET,
  CONSUME_FROM_FIRST_OFFSET,
  CONSUME_FROM_TIMESTAMP,
}
Copy the code
  • CONSUME_FROM_LAST_OFFSET: consumes the value from the last offset
  • CONSUME_FROM_FIRST_OFFSET: consume from the minimum offset
  • CONSUME_FROM_TIMESTAMP: consumption started at a certain time

We need to set consumption to start at a certain time, that is, CONSUME_FROM_TIMESTAMP and set the specific time point.

implementation

First, take a look at the configuration file

server:
  port: 8080
  servlet:
    context-path: /mq-example

spring:
 
  application:
    name: mq-example
  cloud:
    stream:
      bindings:

        input-backtracking:
          content-type: application/json
          destination: test-topic3
          group: backtracking-consumer-group

        Binding production with name = output
        output-order:
          content-type: application/json
          destination: test-topic3

      rocketmq:
        # RocketMQ Binder configuration items, corresponding RocketMQBinderConfigurationProperties class
        binder:
          Configure rocketMQ nameserver address
          name-server: 127.0. 01.: 9876
          group: rocketmq-group
        bindings:
          output-order:
            # RocketMQ Producer configuration items, corresponding RocketMQProducerProperties class
            producer:
              #group: producer-group
              sync: true # Whether to send messages synchronously. Default: false Asynchronously.
          input-backtracking: Backtrace message configuration
            # com.alibaba.cloud.stream.binder.rocketmq.properties.RocketMQConsumerProperties
            consumer:
              consumeFromWhere: CONSUME_FROM_TIMESTAMP
              consumeTimestamp: 20220117110148
              enabled: true # Whether to enable consumption. Default is true
              broadcasting: false # Whether to use broadcast consumption. Default: false

Copy the code

Here we still use the previous Ouput-order as the producer, producing the message.

The message maker configuration is mainly concerned with the attribute configuration in the input-Backtracking node:

  • ConsumeFromWhere do we start spending from, where do we specify a time to spend
  • ConsumeTimestamp is the specified time point

Program entry:

@SpringBootApplication
@EnableBinding({ MySource.class})
public class MqBootstrapApplication {
    public static void main(String[] args) { SpringApplication.run(MqBootstrapApplication.class); }}Copy the code

Add @ EnableBinding

MySource:

public interface MySource {

    @Output("output-order")
    MessageChannel output4Order(a);

    @Input("input-backtracking")
    MessageChannel inputBackTracking(a);

}

Copy the code

Controller Production news:

@GetMapping("/produce")
    public void produceMsg(a) {

        Map<String, Object> headers = Maps.newHashMapWithExpectedSize(16);
        headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 2);
        headers.put(MessageConst.PROPERTY_TAGS, "test03");

        Order order = Order.builder().id(1L).desc("test").build();
        Message message = MessageBuilder.createMessage(order, new MessageHeaders(headers));
        mySource.output4Order().send(message);

    }
Copy the code

ReceiveService consumption message:


@Service
@Log4j2
public class ReceiveService {

    @StreamListener("input-backtracking")
    public void receiveBackTrackingInput(String receiveMsg, GenericMessage message, @Headers Map headers) {

        log.info("Backtrace message received: {}", receiveMsg); }}Copy the code

test

You can call the Controller to produce the message first, or you can produce the message without the producer in the Demo, find a topic that has sent the message before, look at its message trace, and find the storage time

Remember to change the topic name in the configuration file if you are using a topic that you have sent messages to before:

Confirm that the found message has been consumed (at least twice because of backtracking), and set the time of consumeTimestamp after the stored time.

Start the project and observe the ReceiveService output:

Backtracking message received: {"id":1,"desc":"test"}Copy the code

Prove message backtracking consumption is successful.

reference

  • Github.com/alibaba/spr…
  • www.niewenjun.com/2020/05/09/…