The introduction

According to the previous knowledge (understanding the RabbitMQ work principle and simple to use, several working mode is introduced and the practice of Rabbit) as we know, if you want to guarantee the reliability of the news, the need to persistent message processing, message persistent besides need code set, however, there is another important step is crucial, This is to ensure that your messages go smoothly to the Broker, as shown in the figure below:

Normally, messages can be persisted if they are queued through the exchange, but is there any way to solve the problem if messages are accidentally lost before they reach the broker?

RabbitMQ has two ways to solve this problem:

  1. Through the transaction mechanism provided by AMQP;
  2. Using the sender confirmation pattern;

1. Transaction usage

Transaction implementation is mainly to set the Channel (Channel), there are three main methods:

  1. Channel.txselect () claims to start transaction mode;

  2. Channel.txcomment () commits the transaction;

  3. Channel.txrollback () rolls back the transaction;

Can be seen from the above transaction is based on the tx beginning, tx should be the transaction the extend extension module (affairs), if there is a precise explanation under the blog comments welcome.

Let’s look at the code implementation:

// Create a connection
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);	
Connection conn = factory.newConnection();
// Create a channel
Channel channel = conn.createChannel();
// Declare the queue
channel.queueDeclare(_queueName, true.false.false.null);
String message = String.format("Time => %s".new Date().getTime());
try {
	channel.txSelect(); // Declare the transaction
	// Send the message
	channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
	channel.txCommit(); // Commit the transaction
} catch (Exception e) {
	channel.txRollback();
} finally {
	channel.close();
	conn.close();
}
Copy the code

Note: Users need to configure config.xx as their own Rabbit information.

From the above code, we can see that the code before sending the message is the same as before, except that before sending the message, you need to declare the channel as transaction mode, commit or roll back the transaction.

Let’s use Wireshark to capture a package, as shown in the figure below:

Enter IP.addr ==rabbitip && AMqp to view the communication between the client and Rabbit. You can see the interaction flow:

  • Client sends to server Tx.Select(enable transaction mode)
  • Return Tx.Select -OK on server
  • Push message
  • The client sends the transaction to commit Tx.Com MIT
  • The server returns Tx.Com mit-ok

This completes the interaction flow of the transaction. If any part of the transaction is faulty, an IoException is thrown and the user can intercept the exception to roll back the transaction or decide whether to repeat the message.

So, since there are already transactions, there is no need to use the sender validation mode, because transaction performance is very poor. Transaction performance testing:

Transaction mode, the result is as follows:

  • Transaction mode, 1W data sent, execution time: 14197s
  • Transaction mode, send 1W data, execution time: 13597s
  • Transaction mode, 1W data sent, execution time: 14216s

In non-transactional mode, the results are as follows:

  • Non-transactional mode, 1W data sent, execution time: 101s
  • Non-transactional mode, 1W data sent, execution time: 77s
  • Non-transactional mode, 1W data sent, execution time: 106s

As can be seen from the above, the performance of non-transaction mode is 149 times higher than that of transaction mode. This is the result of my computer test. Different computer configurations are slightly different, but the conclusion is the same, the performance of transaction mode is much worse. That is the Confirm sender confirmation mode.

Expand the knowledge

We know that consumers can use messages to send automatically or manually to confirm consumption messages, so what happens if we use transactions in consumer mode (and of course no transactions at all if we use manual confirmation messages)?

The consumer pattern uses transactions

Assuming that transactions are used in consumer mode and are rolled back after message validation, what will happen to RabbitMQ?

The results were divided into two conditions:

  1. AutoAck = false manual to be support transactions, that is to say, even if you’ve had a manual confirmation message has been received, but in the confirmation message will be etc, after returning to solve in making a decision is a confirmation message or put back in the queue, if you manually after confirmation now, and roll back the transaction, the transaction rollback is given priority to, The message is put back on the queue;
  2. AutoAck =true If the autoAck is true and transactions are not supported, it is useless to roll back the transaction even after receiving the message, the queue has removed the message;

Confirm Sender confirmation mode

The Confirm sender acknowledgement mode uses a Channel to Confirm the sender, similar to transactions.

Confirm is implemented in three ways:

Method 1: channel.waitforguarantees () Common sender confirmation mode;

Method 2: channel. WaitForConfirmsOrDie () batch confirm mode;

Three: channel. AddConfirmListener (asynchronous) to monitor the sender to confirm mode;

Method 1: Confirm mode

// Create a connection
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// Create a channel
Channel channel = conn.createChannel();
// Declare the queue
channel.queueDeclare(config.QueueName, false.false.false.null);
// Enable the sender acknowledgement mode
channel.confirmSelect();
String message = String.format("Time => %s".new Date().getTime());
channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
if (channel.waitForConfirms()) {
	System.out.println("Message sent successfully" );
}
Copy the code

ConfirmSelect () declare to open the sender confirmation mode before sending messages, and then use channel.waitforconfirms () to wait for messages to be confirmed by the server.

Method 2: Batch Confirm mode

// Create a connection
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// Create a channel
Channel channel = conn.createChannel();
// Declare the queue
channel.queueDeclare(config.QueueName, false.false.false.null);
// Enable the sender acknowledgement mode
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
	String message = String.format("Time => %s".new Date().getTime());
	channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
channel.waitForConfirmsOrDie(); // Until all information is published, there will be an IOException if one is not confirmed
System.out.println("Complete execution");
Copy the code

The above code can be seen channel. WaitForConfirmsOrDie (), use synchronous mode after all of the messages are sent to execute that code, as long as there is a message has not been confirmed will be thrown IOException exception.

Mode 3: Asynchronous Confirm mode

// Create a connection
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(config.UserName);
factory.setPassword(config.Password);
factory.setVirtualHost(config.VHost);
factory.setHost(config.Host);
factory.setPort(config.Port);
Connection conn = factory.newConnection();
// Create a channel
Channel channel = conn.createChannel();
// Declare the queue
channel.queueDeclare(config.QueueName, false.false.false.null);
// Enable the sender acknowledgement mode
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
	String message = String.format("Time => %s".new Date().getTime());
	channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8"));
}
// Listen asynchronously for acknowledged and unacknowledged messages
channel.addConfirmListener(new ConfirmListener() {
	@Override
	public void handleNack(long deliveryTag, boolean multiple) throws IOException {
		System.out.println("Unconfirmed message, identified:" + deliveryTag);
	}
	@Override
	public void handleAck(long deliveryTag, boolean multiple) throws IOException {
		System.out.println(String.format("Confirmed messages, id: %d, multiple messages: %b", deliveryTag, multiple)); }});Copy the code

The advantage of asynchronous mode is that the execution efficiency is high. You do not need to wait for the message to finish, but only need to listen for the message. The information returned by asynchronous mode is as follows:

As you can see, the code is executed asynchronously, and it is possible that the message is batched. Whether or not the message is batched depends on the multiple parameter returned, which is a bool. If true, all messages before the deliveryTag is batched, and if false, a single acknowledgement.

Confirm performance test

Test premise: As with transactions, we send 1w messages.

Method 1: Confirm Normal mode

  • Execution time: 2253s
  • Execution time: 2018s
  • Execution time: 2043s

Method 2: Confirm batch mode

  • Execution time: 1576s
  • Execution time: 1400s
  • Execution time: 1374s

Mode 3: Confirm Asynchronous listening

  • Execution time: 1498s
  • Execution time: 1368s
  • Execution time: 1363s

conclusion

Overall, the performance of Confirm batch confirmations and Confirm asynchronous confirmations was about 10 times faster than transactions.

Long press the TWO-DIMENSIONAL code to follow my technical public number