Mind mapping

1. Analyze the cause of data loss

To analyze RabbitMQ message loss, take a look at the process of sending a message from producer to consumer:

As you can see, a message goes through the network twice: from the producer to the RabbitMQ server and from the RabbitMQ server to the consumer.

It is stored in a Queue before the consumer consumes it.

As you can see, there are three scenarios where message loss can occur:

  • Stored in a queue, if the queue does not persist messages, the RabbitMQ server will lose data when it is down and restarted.
  • The producer sends a message to the RabbitMQ server. If the RabbitMQ server goes down and stops serving, the message is lost.
  • The consumer gets the data consumption stored in the queue from the RabbitMQ server, but the consumer program fails or crashes and does not consume the data correctly, resulting in data loss.

RabbitMQ provides three solutions for these scenarios: message persistence, confirm, and ACK transactions.

Second, message persistence

RabbitMQ supports message persistence. Message persistence needs to be set to Exchange and Queue so that messages will persist when sent to the RabbitMQ server.

First look at the Exchange switch class diagram:

If you want to create an instance of a subclass, you need to call the AbstractExchange constructor. If you want to create an instance of a subclass, you need to call the AbstractExchange constructor.

The durable parameter indicates whether it is persistent or not. The default is persistence (true). To create a persistent Exchange, write:

	@Bean
    public DirectExchange rabbitmqDemoDirectExchange(a) {
        / / Direct switches
        return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true.false);
    }
Copy the code

Then Queue, let’s look at the constructor of Queue:

Durable Specifies whether the durable parameter is durable. The default value is true. Therefore, you can create it without specifying:

	@Bean
    public Queue fanoutExchangeQueueA(a) {
    	// You only need to specify the name, which is persistent by default
        return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A);
    }
Copy the code

Now that message persistence is set up, start the project and send a few messages, as we can see:

You can actually find the corresponding file:Find the directory on the corresponding disk: Message persistence prevents messages from being lost in RabbitMQ Server due to downtime and restart.

3. Message confirmation mechanism

3.1 confirm mechanism

The producer sending to RabbitMQ Server may fail due to network problems and thus lose data. We can use confirm mode to prevent data loss. Here’s how it works:From the figure above, you can see that notifications are made through two callback functions ** Confirm () and returnedMessage()**.

A message sent from the producer to RabbitMQ is first sent to Exchange, corresponding to the callback confirm(). The second step is allocated from the Exchange route to the Queue, with the corresponding callback function returnedMessage().

How to implement the code, please see the demo:

First add the following configuration to the application.yml configuration file:

spring:
  rabbitmq:
    publisher-confirms: true
# publisher-returns: true
    template:
      mandatory: true
# publisher- Confirms: when set to true. When a message is posted to Exchange, the Confirm () method is called back to notify the producer
# publisher-returns: when set to true When a message matches the Queue and fails, the message is returned via the callback returnedMessage() method
# spring. The rabbitmq. Template. Mandatory: when set to true. Specifies that messages will be returned by the callback returnedMessage() method if not received by the queue.
Copy the code

There’s a little detail,Publisher-returns and Mandatory if both are set, the priority is MANDATORY. You can see the source code:Next we need to define the callback method:

@Component
public class RabbitmqConfirmCallback implements RabbitTemplate.ConfirmCallback.RabbitTemplate.ReturnCallback {
    private Logger logger = LoggerFactory.getLogger(RabbitmqConfirmCallback.class);

    /** * listen for messages to reach Exchange **@paramCorrelationData Object * that contains the unique identifier of the message@paramAck True indicates ACK, and false indicates Nack *@paramCause Cause of nack delivery failure */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("Message delivered successfully ~ message Id: {}", correlationData.getId());
        } else {
            logger.error("Message delivery failed, Id: {}, error: {}", correlationData.getId(), cause); }}@Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.info("Message not routed to queue, get returned message");
        Map map = byteToObject(message.getBody(), Map.class);
        logger.info("message body: {}", map == null ? "" : map.toString());
        logger.info("replyCode: {}", replyCode);
        logger.info("replyText: {}", replyText);
        logger.info("exchange: {}", exchange);
        logger.info("routingKey: {}", exchange);
        logger.info("------------> end <------------");
    }

    @SuppressWarnings("unchecked")
    private <T> T byteToObject(byte[] bytes, Class<T> clazz) {
        T t;
        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
             ObjectInputStream ois = new ObjectInputStream(bis)) {
            t = (T) ois.readObject();
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
        returnt; }}Copy the code

I’ll simply print the message returned by the callback method here, and in a real project, the returned message could be stored in a log table for further processing using a scheduled task.

I’m using RabbitTemplate for sending, so the RabbitTemplate in the Service layer will need to be set:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
	@Resource
    private RabbitmqConfirmCallback rabbitmqConfirmCallback;

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(a) {
        / / specified ConfirmCallback
        rabbitTemplate.setConfirmCallback(rabbitmqConfirmCallback);
        / / specified ReturnCallback
        rabbitTemplate.setReturnCallback(rabbitmqConfirmCallback);
    }
    
    @Override
    public String sendMsg(String msg) throws Exception {
        Map<String, Object> message = getMessage(msg);
        try {
            CorrelationData correlationData = (CorrelationData) message.remove("correlationData");
            rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, message, correlationData);
            return "ok";
        } catch (Exception e) {
            e.printStackTrace();
            return "error"; }}private Map<String, Object> getMessage(String msg) {
        String msgId = UUID.randomUUID().toString().replace("-"."").substring(0.32);
        CorrelationData correlationData = new CorrelationData(msgId);
        String sendTime = sdf.format(new Date());
        Map<String, Object> map = new HashMap<>();
        map.put("msgId", msgId);
        map.put("sendTime", sendTime);
        map.put("msg", msg);
        map.put("correlationData", correlationData);
        returnmap; }}Copy the code

And you’re done! Next we test by sending a message that we can console:If a message is sent and no route matches the queue, the following information can be seen:This is confirm mode. What it does isTo protect producers from message loss when sending messages to RabbitMQ.

3.2 Transaction Mechanism (ACK)

As the original graph shows, the consumer receives the message from the queue and directly confirms the receipt. If the consumer is down or the application is abnormal and the data is not consumed normally, data loss will occur in this case.

So the key is to change the automatic signing to manual signing, normal consumption is returned to confirm signing, if abnormal, return to refuse to sign back to the queue.How to implement the code, please see the demo:

First set the transaction commit to manual mode in the consumer’s application.yml file:

spring:
  rabbitmq:
    listener:
      simple:
		acknowledge-mode: manual # Manual ACK mode
        concurrency: 1 # Minimum number of consumers
        max-concurrency: 10 # Maximum number of consumers
Copy the code

Then write the consumer’s listener:

@Component
public class RabbitDemoConsumer {

    enum Action {
        // The processing succeeded
        SUCCESS,
        // Error can be retried, message back to queue
        RETRY,
        // Error without retry, reject message, and remove from queue
        REJECT
    }

    @RabbitHandler
    @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
    public void process(String msg, Message message, Channel channel) {
        long tag = message.getMessageProperties().getDeliveryTag();
        Action action = Action.SUCCESS;
        try {
            System.out.println("Consumer RabbitDemoConsumer consumes messages from the RabbitMQ server:" + msg);
            if ("bad".equals(msg)) {
                throw new IllegalArgumentException("Test: Throw requeueable exception");
            }
            if ("error".equals(msg)) {
                throw new Exception("Test: Throw an exception that does not need to be requeued."); }}catch (IllegalArgumentException e1) {
            e1.printStackTrace();
            // Depending on the type of exception, set the action to be retried or not to be retried
            action = Action.RETRY;
        } catch (Exception e2) {
            // Print exception
            e2.printStackTrace();
            // Depending on the type of exception, set the action to be retried or not to be retried
            action = Action.REJECT;
        } finally {
            try {
                if (action == Action.SUCCESS) {
                    //multiple indicates whether batch processing is possible. True: Batch ACK processes all messages smaller than tag. False processes the current message
                    channel.basicAck(tag, false);
                } else if (action == Action.RETRY) {
                    //Nack, reject policy, message back to queue
                    channel.basicNack(tag, false.true);
                } else {
                    //Nack, reject the policy and remove it from the queue
                    channel.basicNack(tag, false.false);
                }
                channel.close();
            } catch(Exception e) { e.printStackTrace(); }}}}Copy the code

Explain the above code, if there is no exception, manually confirm the reply to the RabbitMQ server basicAck.

If some exception is thrown that can be re-queued, we reply to basicNack and set re-queued.

If an exception is thrown that cannot be requeued, reply basicNack and set it to be removed from RabbitMQ’s queue.

To test this, send a plain “hello” message:Explain the meaning of the three methods returned by ACK.

① Successful confirmation

void basicAck(long deliveryTag, boolean multiple) throws IOException;
Copy the code

This method is called to validate the message after successful processing by the consumer.

  • DeliveryTag: Index of the message
  • Multiple: specifies whether to batch. True: Will ack all messages smaller than deliveryTag at one time.

② Failure confirmation

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
Copy the code
  • DeliveryTag: Index of the message.
  • Multiple: Specifies whether to batch. True: All messages smaller than deliveryTag will be rejected at once.
  • Requeue: Indicates whether the rejected item is re-enqueued.

③ Failure confirmation

void basicReject(long deliveryTag, boolean requeue) throws IOException;
Copy the code
  • DeliveryTag: Index of the message.
  • Requeue: Indicates whether the rejected item is re-enqueued.

The difference between basicNack() and basicReject() is that basicNack() can reject a batch, while basicReject() can reject only one message at a time.

Four, encountered pit

4.1 Endless Loops Caused by NACK Enabled

I intentionally wrote a bug in the above code. The test sends a “bad” and then throws an exception back to the queue. Here’s the problem: consumers return to the queue and then spend again, which throws an exception and returns to the queue, creating an endless cycle.So how do you avoid that?

Since nack can cause an infinite loop, one idea I have is to drop the message thrown into a table instead of basicNack() and record the exception thrown, the message body, and the message Id. Deal with it through scheduled tasks.

If you have any good solutions, please leave a comment

4.2 double ack

Sometimes careless, accidentally opened the automatic Ack mode, and manual reply Ack. This error will be reported:

Consumer RabbitDemoConsumer consumes messages from the RabbitMQ server: Java technology enthusiast2020- 08 -02 22:52:42.148 ERROR 4880-- -- -127.0. 01.:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1.class-id=60, method-id=80)
2020- 08 -02 22:52:43.102  INFO 4880 --- [cTaskExecutor-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@f4a3a8d: tags=[{amq.ctag-8MJeQ7el_PNbVJxGOOw7Rw=rabbitmq.demo.topic}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,5), conn: Proxy@782a1679 Shared Rabbit Connection: SimpleConnection @ 67 c5b175 [delegate = it: / / [email protected]:5672 /, localPort = 56938]. acknowledgeMode=AUTO local queue size=0
Copy the code

If this error occurs, check to see if the following configuration has been added to the YML file:

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 10
Copy the code

If the above Configuration have been added, or an error, could you use @ SimpleRabbitListenerContainerFactory is configured on the Configuration, according to the characteristics of SpringBoot, code is better than that of the Configuration, Configuration of code covered the yml Configuration, And forgot to set manual mode:

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // Set the manual ACK mode
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
Copy the code

If you still get an error, it’s probably written in the wrong place, in the producer’s project. The above configuration should be configured in the consumer’s project. Because the ACK pattern is for consumers. I just write wrong, write in the producer, toss over a few hours, tears eye ~

4.3 Performance Problems

In fact, manual ACK is certainly much slower than automatic ACK. I checked some information on the Internet, and the performance difference is about 10 times. Therefore, manual ACK mode is not recommended in practical applications. However, it is not absolutely impossible to open, specific case specific analysis, see the amount of concurrency, and the importance of data and so on.

Therefore, in a real project, it is necessary to weigh the importance of concurrency and data before deciding on a specific solution.

4.4 If the manual ACK mode is enabled, queue exceptions may occur if no reply is received in time

If manual ACK mode is turned on and the RabbitMQ server does not reply because the code is buggy, the message will sit in the Unacked message until the consumer disconnects. If you don’t disconnect, you’re going to get more and more Unacked messages, you’re going to get more and more of them, and then you’re going to get an anomaly.

I can’t demonstrate this problem on my computer, it’s too jammed.

Five, the summary

RabbitMQ can prevent data loss in three ways:

  • Message persistence
  • Producer message confirmation mechanism (Confirm mode)
  • Consumer Message Acknowledgement pattern (ACK pattern)

The code for all of the above examples is uploaded to Github:

Github.com/yehongzhi/m…

If you find this article useful, give it a thumbs up

Your praise is the biggest motivation for my creation ~

If you want to see my updated article for the first time, you can search the official account on wechat.Java technology enthusiast“,Refusing to be a salt fish, I’m a programmer trying to be remembered. See you next time!!

Ability is limited, if there is any mistake or improper place, please criticize and correct, study together!