1. Previous episodes

RabbitMQ installation and Configuration and Hello World example

RabbitMQ User Management, role management and permission Settings

How to ensure that 99.99% of messages are sent successfully?

RabbitMQ: How to persist 99.99% of messages without losing them?

So far we have been able to guarantee that messages will be successfully sent to the RabbitMQ server by the producer and will not be lost in the event of a RabbitMQ server failure (reboot, downtime, etc.). Perhaps you think messages are now secure? It’s not safe enough. Keep reading.

2. Summary of this article

In fact, there is another scenario to consider: when a consumer receives a message and hangs up before processing the business logic, the message is lost? For example, when a user places an order, the order center sends a message to the queue in RabbitMQ. The points center receives the message and is about to add 20 points to the user who placed the order, but the points center hangs up and causes data problems.

So how to solve this problem?

RabbitMQ provides message acknowledgement to ensure that messages are successfully consumed by consumers. This article describes how RabbitMQ uses message acknowledgement mechanisms to ensure that messages are successfully consumed by consumers and avoid message loss caused by sudden consumer outage.

3. Enable explicit Ack mode

In the RabbitMQ installation and configuration and Hello World example, we start a consumer code like this:

// Create a queue consumer
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                               AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("Received Message '" + message + "'"); }}; channel.basicConsume(QUEUE_NAME,true, consumer);
Copy the code

The emphasis here is channel.basicConsume(QUEUE_NAME, true, consumer); For the second argument to the basicConsume() method, let’s first look at the source code:

public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException {
    return this.basicConsume(queue, autoAck, "", callback);
}
Copy the code

The autoAck parameter here refers to whether the message is automatically acknowledged or not. If true, RabbitMQ automatically sets the sent message to acknowledge and deletes it from memory (or disk) regardless of whether the consumer received the message successfully. If set to false, RabbitMQ will wait for an explicit reply from the consumer before removing it from memory (or disk).

It is recommended that autoAck be set to false so that the consumer has enough time to process the message without worrying that the message will be lost due to consumer downtime.

At this point, the message in the queue is split into two parts:

  1. Messages waiting to be delivered to consumers (Ready section in figure below)
  2. It has been delivered to the consumer, but there is no confirmation message from the consumer (part Unacked in the figure below)

If RabbitMQ does not receive an acknowledgement from the consumer and the consumer is disconnected, RabbitMQ will re-queue the message for delivery to the next consumer, which may be the same consumer.

RabbitMQ does not set an expiration date for unacknowledged messages. The only way it can determine whether the message needs to be redelivered to the consumer is if the consumer has been disconnected. This is because RabbitMQ allows consumers to consume a message for a long time.

To make it easier to understand, we can use DurableProducer as an example:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection
        ConnectionFactory factory = new ConnectionFactory();
        // Set the hostname of RabbitMQ
        factory.setHost("localhost");
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Create an Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct".true);
        channel.queueDeclare(QUEUE_NAME, true.false.false.null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // Send a message
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // Close channels and connectionschannel.close(); connection.close(); }}Copy the code

Then create a new consumer AckConsumer class:

package com.zwwhnly.springbootaction.rabbitmq.ack;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class AckConsumer {
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection
        ConnectionFactory factory = new ConnectionFactory();
        // Set the hostname of RabbitMQ
        factory.setHost("localhost");
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Create a queue consumer
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                int result = 1 / 0;
                System.out.println("Received Message '" + message + "'"); }}; channel.basicConsume(QUEUE_NAME,true, consumer); }}Copy the code

We set the autoAck parameter to true, that is, automatic acknowledgement, and intentionally write an exception when consuming the message. Then we run the producer client to write the message to the queue. Then we run the consumer client and find that the message is not consumed but has disappeared:

Then we set autoAck to false:

channel.basicConsume(QUEUE_NAME, false, consumer);
Copy the code

Run the producer client again to write the message to the queue, and then run the consumer client, where the message is still in the queue even though the consumer client code is still abnormal:

Then we delete the exception code in the consumer client, restart the consumer client, and find that the message consumption succeeded, but the message never Ack:

Manually stop the consumer client and find that the message is Ready again for redelivery:

The reason we consume messages is because we don’t have an explicit Ack code in our code:

String message = new String(body, "UTF-8");
//int result = 1 / 0;
System.out.println("Received Message '" + message + "'");

long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
Copy the code

DeliveryTag can be seen as the message number, which is a long integer value of 64 bits.

When the consumer client runs, the message is successfully consumed and removed from the queue:

4. Source code and reference

Source code address: github.com/zwwhnly/spr… Welcome to download.

RabbitMQ Combat Guide by Zhu Zhonghua