This is the 27th day of my participation in the More Text Challenge. For more details, see more text Challenge

Little drops of water wear through a stone 😄

preface

In my last post, I looked at how RabbitMQ can ensure that data is not lost, but there is also the problem of whether the message reaches the RabbitMQ server correctly once it is sent by the producer. By default, sending a message does not return any information to the producer if it is not configured. By default, the producer does not know whether the message has reached the RabbitMQ server. If the message is lost before it reaches the server, persistence will not solve the problem because the message never reaches the server.

RabbitMQ provides two solutions to this problem:

  • Through the transaction mechanism

  • Through the sender acknowledgement mechanism

Transaction mechanism

Note: The transaction mechanism is to confirm whether the producer has successfully sent a message to the switch

There are three methods related to the transaction mechanism in RabbitMQ clients: channel.txSelect, channel.txCommit, and channel.txRollback.

Channel. txSelect is used to start transactions;

Channel. txCommit is used to commit transactions;

Channel. txRollback is used to roll back transactions.

After the transaction is started using the channel.txSelect method, we can send the message to RabbitMQ. If the transaction is committed, the message must reach RabbitMQ. At this point we can capture it and roll back the transaction by executing the channel.txRollback method.

Commit the transaction

public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        Channel channel = conn.createChannel();
        String exchange = "exchange-1";
        String key = "key-1";

        // Create a switch
        channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
        // Start the transaction
        channel.txSelect();
        try{
            // Send a message to the switch
            channel.basicPublish(exchange,key,null."Send a message with route key = key-1".getBytes());
            // Commit the transaction
            channel.txCommit();
            System.out.println("Sent successfully");
        }catch (Exception e){
            System.out.println("Send failed, log");
            // Roll back the transactionchannel.txRollback(); }}Copy the code

runmainMethod, outputSend a success. This is because the switch already exists.According to the figure above, there are four additional steps to enable and disable transactions (direct sending) :

  • 1. The client sends tx. Select to set the channel to transaction mode.
  • 2. The Broker replies to tx. select-OK, confirming that the channel has been set to transactional mode.
  • 3. After sending the message, the client sends Tx.Com MIT to commit the transaction.
  • 4. The Broker replies to Tx.Commit.Ok to confirm that the transaction has been committed.

Transaction rollback

Let’s take a look at the transaction rollback code. Change the value of Exchange to exchange-122, and a code comment for the switch is created.

public static void main(String[] args) throws IOException {
        Connection conn = RabbitMQUtil.createConn();
        Channel channel = conn.createChannel();
        String exchange = "exchange-122";
        String key = "key-1";

        // Create a switch
       //channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC,true);
        // Start the transaction
        channel.txSelect();
        try{
            // Send a message to the switch
            channel.basicPublish(exchange,key,null."Send a message with route key = key-1".getBytes());
            // Commit the transaction
            channel.txCommit();
            System.out.println("Sent successfully");
        }catch (Exception e){
            System.out.println("Send failed, log");
            // Roll back the transactionchannel.txRollback(); }}Copy the code

runmainMethod, the output result:The sending failed and logs were generated. Procedure.The process steps are:

  • 1. The client sends tx. Select to set the channel to transaction mode.
  • 2. The Broker replies to tx. select-OK, confirming that the channel has been set to transactional mode.
  • 3. After sending the message, an exception is found and the client sends tx. Rollback to Rollback the transaction.
  • 4. The Broker replies to tx.rollback. Ok, confirming the transaction Rollback.

Batch transaction

To send multiple messages, wrap channel.basicPublish, channel.txCommit and other methods in the loop.

Example:

Sending a message to an exchange-1 switch that already exists but has an exception after sending the message also enters a rollback transaction operation.

 String exchange = "exchange-1";
 // Start the transaction
        channel.txSelect();
        for (int a = 0; a < 10; a++) {
            try{
                channel.basicPublish(exchange,key,null."Send a message with route key = key-1".getBytes());
                int i = 1/0;
                // Commit the transaction
                channel.txCommit();
                System.out.println("Sent successfully");
            }catch (Exception e){
                System.out.println("Send failed, log");
                // Roll back the transactionchannel.txRollback(); }}Copy the code

Transactions do resolve the problem of message acknowledgement between the sender and RabbitMQ. The transaction can only be committed if the message is successfully received by RabbitMQ, otherwise the transaction can be rolled back after the exception is caught and the message can be resent at the same time. However, using transactions can degrade RabbitMQ’s performance, so is there a better way to ensure that the sender of the message acknowledges that the message has been delivered without incurring a substantial performance penalty? RabbitMQ provides another method: the sender acknowledgement mechanism.

Sender acknowledgement mechanism

Note: The sender acknowledgement mechanism is to confirm whether the producer has successfully sent a message to the switch

The principle of

The producer sets the channel to confirm mode by calling channel.confirmSelect. Once the channel enters confirm mode, all messages published on the channel are assigned a unique ID(starting with 1). Once the message has been delivered to the matched queue, the broker sends an acknowledgement (basic.ACK) to the producer (containing the message’s unique ID). This lets the producer know that the message has arrived at the correct destination queue. If the message and queue are persistent, the acknowledgement will be sent after the message is written to disk. The delivery-tag in the acknowledgment messages that the broker sends back to the producer contains the serial number of the acknowledgement message. The broker can also set multiple arguments in basic.ack to indicate that all messages have been processed up to this serial number.

After a message is sent, the transaction mechanism blocks the sender, waiting for a response from RabbitMQ before sending the next message. The main benefit of the confirm mode is that it is asynchronous. Once a message is published, the producer application can continue to send the next message while waiting for the channel to return an acknowledgement. When the acknowledgement is finally received, the producer application can process the acknowledgement using the callback method. If RabbitMQ loses a message due to an internal error, it sends a nack(basic.nack) command, which the producer application can also process in the callback method.

After a channel is set to confirm mode, all subsequent messages sent will be ack or nack once. , a message is not ack and nACK, and RabbitMQ does not guarantee how quickly a message will be confirmed.

The native API

Ordinary confirm

Channe. waitforguarantees will be invoked every time a message is sent, waiting for acknowledgement from the server. This is actually a serial synchronous wait mode. Same as the transaction mechanism. Slow.

public static void main(String[] args) throws Exception {
    Connection conn = RabbitMQUtil.createConn();
    Channel channel = conn.createChannel();
    String quequ = "queue-2";
    String exchange = "exchange-2";
    String key = "key-2";
    // Create a switch
    channel.exchangeDeclare(exchange, 
    BuiltinExchangeType.TOPIC, true);
    // Create a queue
    channel.queueDeclare(quequ, true.false.false.null);
    // Bind the queue to the switch
    channel.queueBind(quequ, exchange, key);
    // Set the channel to Publisher confirm mode
    channel.confirmSelect();
    String message = "Send route key is ="+ key + "The news of";
    channel.basicPublish(exchange,key,null,
    message.getBytes());
    boolean b = channel.waitForConfirms();
    System.out.println("Sent successfully ="+ b); } Result: Sent successfully =true
Copy the code

Change the route key to key-22121, create a switch, create a queue, and bind the queue to the switch. Check whether the result is successful.

public static void main(String[] args) throws Exception {
    Connection conn = RabbitMQUtil.createConn();
    Channel channel = conn.createChannel();
    String quequ = "queue-2";
    String exchange = "exchange-2";
    String key = "key-22121";
    // Create a switch
    //channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
    // Create a queue
   // channel.queueDeclare(quequ, true, false, false, null);
    // Bind the queue to the switch
   // channel.queueBind(quequ, exchange, key);
    // Set the channel to Publisher confirm mode
    channel.confirmSelect();
    String message = "Send route key is ="+ key + "The news of";
    channel.basicPublish(exchange,key,null,message.getBytes());
    boolean b = channel.waitForConfirms();
    System.out.println("Sent successfully ="+ b); } Result: Sent successfully =true
Copy the code

You can see that the sending result is successful. Then modify the code again, change the value of exchange to exchange-2222, and leave the rest of the code unchanged.

Boot direct error!

If multiple messages are sent, only wrap channel.basicPublish and channel. waitforguarantees in the loop. However, the channe. waitforguarantees method is invoked every time a message is sent, waiting for confirmation from the server.

channel.confirmSelect();
for (int i = 1; i < 10; i++) {
    String message = "Send route key is ="+ key + "The news of";
    channel.basicPublish(exchange,key,null,message.getBytes());
    boolean b = channel.waitForConfirms();
    System.out.println("Sent successfully" + b);
}
Copy the code

Batch confirm

After sending a batch of messages, call channel. waitforacknowledge method, wait for the server to acknowledge return (also synchronous, only send multiple messages at once, and then uniform).

channel.confirmSelect();
for (int i = 1; i < 10; i++) {
    String message = "Send route key is ="+ key + "The news of";
    channel.basicPublish(exchange,key,null,message.getBytes());
}
// Batch confirm message, if there is a failed message, do not know which failed
boolean b = channel.waitForConfirms();
System.out.println("Sent successfully" + b);
Copy the code

Asynchronous confirm

The asynchronous confirm method is the most complex and efficient to implement programmatically. The ConfirmListener callback interface is added to the addConfirmListener method provided in the client Channel interface. The ConfirmListener interface contains two methods: The handleAck and handleNack are used to handle basic. Ack and basic. Nack sent back by RabbitMQ, respectively. In both methods, there are two parameters: deliveryTag (the unique order number that marks the message) and Multiple (whether to batch confirm true for yes)

String quequ = "queue-2";
String exchange = "exchange-2";
String key = "key-2";
// Create a switch
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
// Create a queue
channel.queueDeclare(quequ, true.false.false.null);
// Bind the queue to the switch
channel.queueBind(quequ, exchange, key);
channel.confirmSelect();
final ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap();
// Add an asynchronous acknowledgement listener
channel.addConfirmListener(new ConfirmListener() {
    // Parameter 1: deliveryTag: the number of the message
    // Parameter two: Multiple: batch confirm True Yes
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Map data:" + map.size());
        if (multiple) {
            // If batch validation returns a message with a sequence number less than or equal to the current sequence number, it is a map
            ConcurrentNavigableMap<Long, String> confirmed =
                    map.headMap(deliveryTag, true);
            // Clear the part of unacknowledged messages
            confirmed.clear();
            System.out.println("Confirm map data in batches:" + map.size());
        }else{
            // Clear only messages with the current sequence number
            map.remove(deliveryTag);
            System.out.println("Clear only message map data for the current sequence number:" + map.size());
        }
        System.out.println("Message sent to switch successfully,deliveryTag:" + deliveryTag + ", multiple: " + multiple);
    }

    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Message sent to switch failed, deliveryTag:" + deliveryTag + ", multiple: " + multiple);
        String message = map.get(deliveryTag);
        System.out.println("Message sent to switch failed, published message:"+message+"Not identified, serial number:"+deliveryTag);
        // If you get unconfirmed information, you can do other logic, such as add processing message resend}});for (int i = 1; i < 6; i++) {
    String message = "Send route key is ="+ key + "The news of";
    / / channel. GetNextPublishSeqNo () to obtain the serial number of the next message
    map.put(channel.getNextPublishSeqNo(),message);
    channel.basicPublish(exchange,key,null,message.getBytes());

}
System.out.println("Other logic");
Copy the code

To test if the switch does not exist, change the name of Exchange to Exchange-9527 and comment the code that creates the switch.

Connection conn = RabbitMQUtil.createConn();
Channel channel = conn.createChannel();
String quequ = "queue-2";
String exchange = "exchange-9527";
String key = "key-2";
// Create a switch
// channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC, true);
// Create a queue
// channel.queueDeclare(quequ, true, false, false, null);
// Bind the queue to the switch
// channel.queueBind(quequ, exchange, key);
channel.confirmSelect();
final ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap();
// Add an asynchronous acknowledgement listener
channel.addConfirmListener(new ConfirmListener() {
    // Parameter 1: deliveryTag: the number of the message
    // Parameter two: Multiple: batch confirm True Yes
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Map data:" + map.size());
        if (multiple) {
            // If batch validation returns a message with a sequence number less than or equal to the current sequence number, it is a map
            ConcurrentNavigableMap<Long, String> confirmed =
                    map.headMap(deliveryTag, true);
            // Clear the part of unacknowledged messages
            confirmed.clear();
            System.out.println("Confirm map data in batches:" + map.size());
        }else{
            // Clear only messages with the current sequence number
            map.remove(deliveryTag);
            System.out.println("Clear only message map data for the current sequence number:" + map.size());
        }
        System.out.println("Message sent to switch successfully,deliveryTag:" + deliveryTag + ", multiple: " + multiple);
    }

    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Message sent to switch failed, deliveryTag:" + deliveryTag + ", multiple: " + multiple);
        String message = map.get(deliveryTag);
        System.out.println("Message sent to switch failed, published message:"+message+"Not identified, serial number:"+deliveryTag);
        // If you get unconfirmed information, you can do other logic, such as add processing message resend}});for (int i = 1; i < 6; i++) {
    String message = "Send route key is ="+ key + "The news of";
    / / channel. GetNextPublishSeqNo () to obtain the serial number of the next message
    map.put(channel.getNextPublishSeqNo(),message);
    channel.basicPublish(exchange,key,null,message.getBytes());

}
System.out.println("Other logic");
Copy the code

As you can see, none of the code in the listener is executing. That is, no switch received the message.

conclusion

Normal Confirm: Waits for confirmation synchronously, simple, but with very limited throughput.

Batch confirm: Batch synchronous waiting for confirmation, simple, reasonable throughput, once a problem but difficult to infer which message is the problem.

Asynchronous Confirm: Optimal performance and resource usage, well controlled in case of error, but slightly cumbersome to implement.

The Boot mode

Configure whether message confirmation is required in YML

spring:
  application:
    name: info-config-boot
  rabbitmq:
    host: 47.105. *
    port: 5672
    virtual-host: /test-1
    username: *
    password: *
    # Enable message confirmation
    publisher-confirm-type: correlated
Copy the code

Publisher-confirm-type has three options:

  • NONE: disables the publication confirmation mode. It is the default value
  • CORRELATED: Callback method is triggered when a message is successfully published to the switch
  • SIMPLE: There are two effects tested, one effect andCORRELATEDValue also triggers the callback method, which is used after the message is successfully publishedrabbitTemplatecallwaitForConfirmsorwaitForConfirmsOrDieMethod waits for the broker node to return the send result and determines the logic of the next step based on the return resultwaitForConfirmsOrDieMethod if returnfalseWill be closedchannel, the following message cannot be sent tobroker.

coding

Implement ConfirmCallback

@Component
public class InfoConfirm implements RabbitTemplate.ConfirmCallback {

   Logger logger = LoggerFactory.getLogger(InfoConfirm.class);

   @Autowired
   private RabbitTemplate rabbitTemplate;

    /** * the ConfirmCallback method is null */
   @PostConstruct
   public void init(a){
       rabbitTemplate.setConfirmCallback(this);
   }

    /** * This method is used to listen for messages sent to the switch *@param correlationData
     * @param ack
     * @param cause
     */
   @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(ack){
            logger.info("Message sent to switch successfully");
            logger.info("id = {} ",correlationData.getId());
            if(correlationData.getReturnedMessage() == null){
                logger.info("Message confirmed.");
            }else{
                byte[] body = correlationData.getReturnedMessage().getBody();
                logger.info("message = {}".newString(body)); }}else {
            logger.info("Message sent to switch failed");
            logger.info("cause = {}",cause);
            logger.info("id = {} ",correlationData.getId());
            if(correlationData.getReturnedMessage() == null){
                logger.info("Message exception");
            }else{
                byte[] body = correlationData.getReturnedMessage().getBody();
                logger.info("message = {}".newString(body)); }}}}Copy the code

Implement interface ConfirmCallback and override its confirm() method, which has three parameters correlationData, ACK and cause.

  • CorrelationData: There is only one ID attribute in the object, which represents the uniqueness of the current message.
  • Ack: Status of the message delivered to the broker. True indicates success.
  • Cause: Indicates the cause of the delivery failure.

Provide sending methods

   @GetMapping("/send")
    public void send(a){
        CorrelationData correlation = new CorrelationData("Settings:" + UUID.randomUUID().toString());
        // The exchange for exchange-1 already exists
        rabbitTemplate.convertAndSend("exchange-1"."key-55"."Send a message",correlation);
    }
Copy the code

Call interface: http://localhost:8080/send

Send another message that the switch does not exist and change the switch value toexchange-12222.

Call interface:http://localhost:8080/send You can see that even if the switch does not exist,confirmMethod can also be listened to. Smarter than all of the above.Boot YYDS.

  • If you have any questions or errors in this article, please feel free to comment. If you find this article helpful, please click like and follow.