A repeated message

Why does message duplication occur? There are two reasons for message duplication: 1. Message duplication during production and 2. Message duplication during consumption.

1.1 Message Duplication during Production

Because the producer sent the message to MQ, there was a network fluctuation during MQ confirmation, and the producer did not receive the acknowledgement, but MQ actually received the message. The producer then resends the message.

If the message is not acknowledged or fails to be acknowledged in the producer, we can use the scheduled task + (redis/db) to retry the message.

@Component
@Slf4J
public class SendMessage {
    @Autowired
    private MessageService messageService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // Maximum number of deliveries
    private static final int MAX_TRY_COUNT = 3;

    /** * every 30s to pull the message failed delivery, re-delivery */
    @Scheduled(cron = "0/30 * * * * ?" )
    public void resend(a) {
        log.info("Start scheduled task (redeliver message)");

        List<MsgLog> msgLogs = messageService.selectTimeoutMsg();
        msgLogs.forEach(msgLog -> {
            String msgId = msgLog.getMsgId();
            if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
                messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
                log.info("Message delivery failed when maximum retry times exceeded, msgId: {}", msgId);
            } else {
                messageService.updateTryCount(msgId, msgLog.getNextTryTime());// Number of deliveries +1

                CorrelationData correlationData = new CorrelationData(msgId);
                rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// repost

                log.info("The first" + (msgLog.getTryCount() + 1) + "Second redelivery message"); }}); log.info("Scheduled task execution terminated (redelivery message)"); }}Copy the code

1.2 Message repetition during consumption

After a successful consumer, there is a network fluctuation when the consumer tries to confirm to MQ. MQ does not receive the confirmation, and in order to ensure that the message is consumed, MQ continues to deliver the previous message to the consumer. The consumer then receives two identical messages.

Modify consumers to simulate exceptions

@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{

    System.out.println("Try again"+System.currentTimeMillis());
    System.out.println(message);
    int i = 1 / 0;
}
Copy the code

Configure the YML retry policy

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # enable consumer retry
          max-attempts: 5 # Maximum retry times
          initial-interval: 3000 Retry interval
Copy the code

Because duplicate messages are due to network reasons, duplicate messages are unavoidable. But we need to keep the message idempotent.

How to ensure message idempotency

Make each message carry a globally unique ID to ensure the idempotency of the message. The specific consumption process is as follows:

  1. After obtaining the message, the consumer first queries whether the message exists in Redis/DB according to the ID
  2. If it does not exist, it is normally consumed. After the consumption, redis/db is written
  3. If so, the message is discarded.

producers

@PostMapping("/send")
public void sendMessage(a){

    JSONObject jsonObject = new JSONObject();
    jsonObject.put("message"."Java journey");
    String json = jsonObject.toJSONString();
    Message message = MessageBuilder.withBody(json.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();
    amqpTemplate.convertAndSend("javatrip",message);
}
Copy the code

consumers

@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {

    @RabbitHandler
    public void receiveMessage(Message message) throws Exception {

        Jedis jedis = new Jedis("localhost".6379);

        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"UTF-8");
        System.out.println("The message received is:"+msg+"== Message id is:"+messageId);

        String messageIdRedis = jedis.get("messageId");

        if(messageId == messageIdRedis){
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String email = jsonObject.getString("message");
        jedis.set("messageId",messageId); }}Copy the code

If you need to save to db, you can directly set this ID as the primary key of the message. The next time you get a duplicate message for consumption, due to the uniqueness of the database primary key, you will directly throw an exception.