This article is included in personal blog: www.chengxy-nds.top, technical resources sharing, progress together

Recently, the department called on us to organize more technology sharing meetings to activate the company’s technology atmosphere. However, as I have long seen through everything, I know that this is just to brush KPI. That said, it’s a good thing. Instead of having boring meetings, it’s good for personal growth to have technical conversations.

So I took the initiative to participate in the sharing, cough cough ~, really not for that KPI, just want to learn together with everyone!

Insert a picture description here

This time I will share how SpringBoot + RabbitMQ implemented the message confirmation mechanism, and a bit of experience in the actual development of the pit. In fact, the overall content is relatively simple, sometimes things are so magical, the simpler things are more likely to go wrong.

As you can see, with RabbitMQ, our service links are significantly longer and, despite decoupling between systems, there are more scenarios where messages can be lost. Such as:

  • Message producer – > RabbitMQ server (message sending failed)

  • The rabbitMQ server fails, causing message loss

  • Message consumer – > RabbitMQ service (message consumption failed)

So if you can not use middleware, try not to use it. If you use it for the sake of using it, it will only increase your annoyance. After the message confirmation mechanism is opened, although the accurate delivery of messages is ensured to a large extent, due to frequent confirmation interaction,rabbitmqOverall efficiency drops, throughput drops significantly, and it is really not recommended to use message confirmation for messages that are not very important.


Let’s first implement springboot + RabbitMQ message confirmation mechanism, and then analyze the problems encountered.

1. Prepare the environment

1. Import rabbitMQ dependency packages

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

2. Modify the application.properties configuration

The confirmation of messages must be enabled on the sender and the consumer.

spring.rabbitmq.host=127.0. 01.
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# Senders enable confirmspring.rabbitmq.publisher-confirms=true # senders openreturnConfirm the mechanismspring.rabbitmq.publisher-returns=true # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #Set the consumer to manual ACKspring.rabbitmq.listener.simple.acknowledge-mode=manual Whether to support retryspring.rabbitmq.listener.simple.retry.enabled=true Copy the code

3. Define exchanges and queues

Define switch confirmTestExchange and queue confirm_test_Queue, and bind the queue to the switch.

@Configuration
public class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
 return new Queue("confirm_test_queue".true.false.false);  }   @Bean(name = "confirmTestExchange")  public FanoutExchange confirmTestExchange() {  return new FanoutExchange("confirmTestExchange");  }   @Bean  public Binding confirmTestFanoutExchangeAndQueue(  @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,  @Qualifier("confirmTestQueue") Queue confirmTestQueue) {  return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);  } } Copy the code

Rabbitmq message confirmation has two parts: sending message confirmation and receiving message confirmation.

Insert a picture description here

2. Message sending confirmation

Sending message confirmation: Used to confirm whether a message is successfully delivered when producer Producer Producer sends a message to the Broker, and the exchange on the broker sends a message to the queue.

Messages from producer to RabbitMQ broker have a confirmCallback mode.

Message delivery failure from Exchange to Queue has a returnCallback return pattern.

We can use these two callbacks to ensure 100% delivery of elimination.

1. ConfirmCallback Confirms the mode

The confirmCallback callback is triggered whenever a message is received by the RabbitMQ broker.

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
    @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {   if(! ack) { log.error("Message sending exception!");  } else {  log.info("Sender's father has received the confirmation, correlationData={}, ACK ={}, cause={}", correlationData.getId(), ack, cause);  }  } } Copy the code

Interface ConfirmCallback, rewrite confirm() method, method has three parameters correlationData, ACK, cause.

  • correlationData: There is only one internal objectidProperty to indicate the uniqueness of the current message.
  • ack: Message delivered tobrokerThe status of thetrueSuccess.
  • cause: indicates the cause of delivery failure.

However, a message received by the broker only indicates that it has reached the MQ server and does not guarantee that the message will be delivered to the target queue. So the next step is to use the returnCallback.

2, ReturnCallback callback mode

If the message fails to be delivered to the target queue, the returnCallback will be triggered. If the message fails to be delivered to the queue, the detailed delivery data of the current message will be recorded for subsequent resending or compensation operations.

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {  log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);  } } Copy the code

Implement interface ReturnCallback, rewrite the returnedMessage() method, The method takes five parameters: Message (message body), replyCode (response code), replyText (response content), Exchange, and routingKey.

To send a message, set Confirm and Return callbacks to rabbitTemplate, persist the message with setDeliveryMode(), and create a CorrelationData object for the test. Example Add one whose ID is 10000000000.

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService confirmCallbackService;
  @Autowired  private ReturnCallbackService returnCallbackService;   public void sendMessage(String exchange, String routingKey, Object msg) {   / * ** Ensure that a message can be sent back to the queue if it fails* Note: YML requires a publisher-returns: true configuration* /  rabbitTemplate.setMandatory(true);   / * ** After the consumer confirms receipt of the message, manual ACK callback processing* /  rabbitTemplate.setConfirmCallback(confirmCallbackService);   / * ** Failed message delivery to the queue callback processing* /  rabbitTemplate.setReturnCallback(returnCallbackService);   / * ** Send messages* /  rabbitTemplate.convertAndSend(exchange, routingKey, msg,  message -> {  message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  return message;  },  new CorrelationData(UUID.randomUUID().toString()));  } Copy the code

3. Confirmation of message reception

Message receive acknowledgement is a little easier than message send acknowledgement because there is only one message return receipt (ACK) process. Annotation methods using @Rabbithandler annotations add channel and message parameters.

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
    
 @RabbitHandler  public void processHandler(String msg, Channel channel, Message message) throws IOException {   try {  log.info("Little rich received message: {}", msg);   //TODO specific business   channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);   } catch (Exception e) {   if (message.getMessageProperties().getRedelivered()) {   log.error("Message has failed to be processed twice, refused to receive again...");   channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // Reject the message  } else {   log.error("Message about to return to queue for processing again...");   channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.true);  }  }  } } Copy the code

There are three ways to consume a message, and let’s examine what each means.

1, basicAck

BasicAck: Confirmation of success. After this method is used, the message will be deleted by rabbitMQ broker.

void basicAck(long deliveryTag, boolean multiple) 
Copy the code

DeliveryTag: Indicates the delivery number of the message. After each consumption or re-delivery of the message, the deliveryTag will increase. In manual message confirmation mode, we can ack, nack, reject and other operations on the message specified deliveryTag.

Multiple: Whether to batch confirm. If true, all messages smaller than the current message deliveryTag will be ack at one time.

For example: Suppose I send three messages with deliveryTag 5, 6 and 7 respectively, but none of them are confirmed. When I send the fourth message with deliveryTag 8 and multiple set to True, all the messages of 5, 6, 7 and 8 will be confirmed.

2, basicNack

BasicNack: indicates failure acknowledgement. This method is used when consuming message service exceptions. Messages can be re-queued.

void basicNack(long deliveryTag, boolean multiple, boolean requeue)
Copy the code

DeliveryTag: indicates the message delivery number.

Multiple: Specifies whether to confirm in batches.

Requeue: Messages with a value of true are re-queued.

3, basicReject

BasicReject: Reject a message. BasicNack differs from basicNack in that it cannot be batch processed.

void basicReject(long deliveryTag, boolean requeue)
Copy the code

DeliveryTag: indicates the message delivery number.

Requeue: Messages with a value of true are re-queued.

Four, test,

Send a message to test whether the message confirmation mechanism takes effect. According to the execution result, the sender successfully calls back the message after sending it, and the consumer successfully consumes the message.Use the packet capture toolWiresharkLook at therabbitmqAmqp protocol interaction changes, tooackIn the process.

Five, step pit log

1. No message confirmation

It’s a very low-tech pit, but a very error-prone place.

Turn on message confirmation and don’t forget to consume messageschannel.basicAckOtherwise, the message will always exist, leading to repeated consumption.

2. Unlimited delivery of messages

When I first came into contact with the message confirmation mechanism, the code on the consumer side was written like the following. The idea was very simple: after processing the business logic, the message was acknowledged, and when int A = 1/0 was abnormal, the message was put back into the queue.

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("Consumer 2 received: {}", msg);
  int a = 1 / 0;   channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);   } catch (Exception e) {   channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.true);  }  } Copy the code

However, the problem is that the business code will not be fixed 99.9% of the time once there is a bug. A message will be sent to the queue indefinitely, and the consumer side will execute indefinitely, leading to an endless loop.

Insert a picture description here

The local CPU was suddenly full, so you can imagine my panic when the service crashed in the production environment.

andrabbitmq managementThere’s only one unconfirmed message.

Insert a picture description here

Test analysis shows that when a message is reposted to a message queue, it does not return to the end of the queue, but remains at the head of the queue.

The consumer immediately consumes the message, the business process throws an exception, the message is re-queued, and so on. The message queue processing is blocked, causing normal messages to fail to run.

At that time, our solution was to reply the message first, and then the message queue would delete the message. At the same time, we sent the message to the message queue again, and the abnormal message was put at the end of the message queue, which not only ensured that the message would not be lost, but also ensured the normal business.

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// Resend the message to the end of the queue
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(msg));
Copy the code

However, this method does not solve the fundamental problem, and error messages are still reported from time to time. Later, the number of message retries is optimized and set. After reaching the retry upper limit, manual confirmation is made, the message is deleted from the queue, and the message is persisted into MySQL and pushed to the alarm, and manual processing and scheduled tasks are performed to compensate.

3. Repeated consumption

Depending on the business, the consumption of MQ can be idempotent. Messages can be persisted with MySQL or Redis and verified by unique properties in messages.

Demo making address https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm


Original is not easy, burning hair output content, if there is a lost harvest, a point to encourage it!

I sorted out hundreds of technical e-books and gave them to my friends. Pay attention to the public number reply [666] to get yourself. I set up a technology exchange group with some friends to discuss technology and share technical information, aiming to learn and progress together. If you are interested, just scan code and join us!


This article is formatted using MDNICE