1. Previous episodes

RabbitMQ installation and Configuration and Hello World example

RabbitMQ User Management, role management and permission Settings

How to ensure that 99.99% of messages are sent successfully?

In our last blog we explained how to ensure that messages are sent to the RabbitMQ server as successfully as possible using RabbitMQ’s producer confirmation mechanism, which only reduces the chance of message loss at source and does not really solve the problem mentioned earlier: How do I ensure that queues and messages are not lost in RabbitMQ exceptions (manual restart, abnormal outage, etc.)?

2. Summary of this article

To solve this problem, RabbitMQ uses the concept of persistence, which means that RabbitMQ will store data in memory (exchanges, queues, messages) to disk in case something goes wrong and the data is lost.

RabbitMQ persistence is divided into three parts:

  1. Persistence of exchanges
  2. Persistence of queues
  3. Persistence of messages

3. Persistence of exchanges

In the last blog post, we stated that the code for Exchange looks like this:

private final static String EXCHANGE_NAME = "normal-confirm-exchange";

// Create an Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Copy the code

The Exchange declared in this case is non-persistent and will be lost if RabbitMQ fails (restart, downtime), affecting subsequent messages to be written to it. The answer is to set the durable parameters.

Durable: Whether to be durable. Durable True: Durable. Otherwise, it is non-persistent.

Persistence saves the switch to disk without losing information when the server restarts.

Set Exchange persistence:

channel.exchangeDeclare(EXCHANGE_NAME, "direct".true);
Copy the code

The overloaded method called is:

public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
    return this.exchangeDeclare(exchange, (String)type, durable, false, (Map)null);
}
Copy the code

For better understanding, let’s create a new production class as follows:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection
        ConnectionFactory factory = new ConnectionFactory();
        // Set the hostname of RabbitMQ
        factory.setHost("localhost");
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Create an Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // Send a message
        String message = "durable exchange test";
        channel.basicPublish(EXCHANGE_NAME, "".null, message.getBytes());

        // Close channels and connectionschannel.close(); connection.close(); }}Copy the code

The durable Exchange and Queue are successfully created and the messages’ Exchange Test ‘(durable Exchange test) are properly posted to the Queue.

If you restart the RabbitMQ service, Exchange is lost:

The durable parameter is set to true.

// Create an Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct".true);
Copy the code

Run the code and restart the RabbitMQ service. Exchange is no longer lost:

4. Persistence of queues

If you restart the RabbitMQ service, Exchange will not be lost, but queues and messages will be lost. The answer is to set the durable parameter.

Durable: Whether to be durable. True sets the queue to persistent.

Persistent queues are saved to ensure that information is not lost when the server restarts.

The durable Queue parameter is set to true:

channel.queueDeclare(QUEUE_NAME, true.false.false.null);
Copy the code

The overloaded method is called as follows:

public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
    validateQueueNameLength(queue);
    return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((newcom.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDele te).arguments(arguments).build()).getMethod(); }Copy the code

Run the code and restart the RabbitMQ service to see that the queue is now not lost:

5. Message persistence

Although Exchange and Queue are not lost after RabbitMQ is restarted, messages stored in the Queue are still lost, so how can messages not be lost? The answer is to set the message delivery mode to 2, which stands for persistence.

Change the code for sending messages to:

// Send a message
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
Copy the code

The overloaded method called is:

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
    this.basicPublish(exchange, routingKey, false, props, body);
}
Copy the code

Run the code and restart the RabbitMQ service to find that exchanges, queues and messages are not lost:

This has solved the problem of message loss after RabbitMQ restarts.

The final code is as follows, or you can download all the source code for this article from the source link at the end of this article:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection
        ConnectionFactory factory = new ConnectionFactory();
        // Set the hostname of RabbitMQ
        factory.setHost("localhost");
        // Create a connection
        Connection connection = factory.newConnection();
        // Create a channel
        Channel channel = connection.createChannel();
        // Create an Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct".true);
        channel.queueDeclare(QUEUE_NAME, true.false.false.null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // Send a message
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // Close channels and connectionschannel.close(); connection.close(); }}Copy the code

6. Precautions

1) It is theoretically possible to set all messages to persist, but this severely affects RabbitMQ performance. Because writing to disk is more than a little slower than writing to memory. Messages that are less reliable can be dispensed with persistence to improve overall throughput. When choosing whether to persist messages, there is a trade-off between reliability and throughput.

2) Persisting exchanges, queues, and messages is not a 100% guarantee against data loss, as it takes a short but significant amount of time for persistent messages to be stored in disk once they are properly stored in RabbitMQ. If the RabbitMQ service node breaks down or restarts during this period, the RabbitMQ messages will be lost before they are dropped.

3) Only set the queue persistence, the message will be lost after the restart; Simply set the message to persist, and after a restart the queue disappears and the message is lost. It makes no sense to set message persistence without queue persistence.

7. Source code and reference

Source code address: github.com/zwwhnly/spr… Welcome to download.

RabbitMQ Combat Guide by Zhu Zhonghua