RabbitMQ provides a solution for ensuring message reliability at the application level, which is just as important as the RabbitMQ node ensuring message reliability.

I. Confirmation of Consumer message

When a Consumer subscrires to RabbitMQ messages with autoAck=true, the message may be lost due to network problems, Consuerm may fail to process the message, or the server may be down. When autoAck=true, RabbitMQ automatically sets the sent messages to acknowledgement and deletes them from memory (or disk), regardless of whether the consumer actually consumes them. To avoid this, RabbitMQ provides consumer confirmation for processing messages, which requires autoAck=false

autoAck = false;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
Copy the code

When set to autoAck=false, the RabbitMQ node will wait for an acknowledgement from the consumer and will delete the message when RabbitMQ receives it. If RabbitMQ does not receive an acknowledgement from the consumer and confirms that the consumer is disconnected, RabbitMQ will insert the message back into the head of the message queue and forward it to the surviving consumer for consumption.

RabbitMQ has not received any confirmation from the consumer, probably because this is a long task and does not mean that the consumer has been hung up, so it will reforward unconfirmed messages only when RabbitMQ confirms that the Channel has actually been lost.

The consumer needs to confirm that the message has been consumed and tell RabbitMQ. How does RabbitMQ identify each message? The answer is deliveryTag.

When a RabbitMQ message has been subscribed to RabbitMQ, RabbitMQ will call Basic. deliver to push the message to the consumer, with a monotonically increasing positive integer deliveryTag that uniquely identifies the delivery of the message on the channel. The scope of deliveryTag is Channel, so the message must be confirmed in the same channel; otherwise, an unknown Delivery tag will be abnormal. Of course I think it’s really rare.

RabbitMQ provides three ways to confirm or reject messages on the consumer side:

  • com.rabbitmq.client.Channel#basicAck()

    /** * @param deliveryTag * @param multipletrue: will ack all messages smaller than deliveryTag on this channel at one time. */ void basicAck(long deliveryTag, boolean multiple) throws IOException;Copy the code
  • com.rabbitmq.client.Channel#basicNack()

    /** * @param deliveryTag * @param multipletrue: reject all messages smaller than deliveryTag on the channel. */ void basicNack(Long deliveryTag, Boolean multiple, Boolean Requeue) throws IOException;Copy the code
  • com.rabbitmq.client.Channel#basicReject()

    // Channel. basicNack differs from channel.basicReject in that basicNack can reject multiple messages, BasicReject (Long deliveryTag, Boolean Requeue) throws IOException. BasicReject (Long deliveryTag, Boolean Requeue) throws IOException.Copy the code

How does the consumer confirm the message?

  • Consumer
public static void main(String[] args) throws Exception {
      Connection connection = ConnectionUtil.getConnection();
      Channel channel = connection.createChannel();
      channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
              BuiltinExchangeType.DIRECT);
      boolean durable = true;
      channel.queueDeclare(QueueCostant.CONSUMER_ACK, durable, false.false, null);
      channel.queueBind(QueueCostant.CONSUMER_ACK, ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack");
      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
          String message = new String(delivery.getBody(), "UTF-8");
          System.out.println("ConsumerTag is [" + consumerTag + "]," +
                  " [x] Received '" + message + "'," +
                  " DeliveryTag is [" + delivery.getEnvelope().getDeliveryTag() + "]," +
                  " Thread is [" + Thread.currentThread().getName() + "]"); Try {timeunit.seconds.sleep (10); } catch (InterruptedException e) { e.printStackTrace(); } // Return message acknowledgement channel.basicack (delivery.getenvelope ().getdeliveryTag (),false);
          System.out.println(" [x] Done");

      };
      boolean autoAck = false;
      channel.basicConsume(QueueCostant.CONSUMER_ACK, autoAck, deliverCallback, consumerTag -> {
      });
  }
Copy the code
  • Pubisher
public static void main(String[] args) throws Exception {
      Connection connection = ConnectionUtil.getConnection();
      Channel channel = connection.createChannel();
      channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
              BuiltinExchangeType.DIRECT);
      for(int i = 1; i <= 10; I++) {// push persistent message String message ="Send message " + i;
          channel.basicPublish(ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack".false, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
          System.out.println(" [x] sent ' " + message + "'"); }}Copy the code

We start by starting the producer to send 10 messages to the consumer before the consumer is started (the switch queue has already been declared).

At this point, 10 messages are stored in the queue, waiting to be consumed by the consumer. We then start the consumer, all the messages in the message queue are sent to the consumer, who consumes three messages and returns an acknowledgement to RabbitMQ

Then we simulate the consumer STOP situation in which the consumer hangs. At this time, the 7 unacknowledged messages are resumed in the message queue, waiting to be sent to the surviving consumers.

Next, what if an exception occurs when a consumer processes a message? RabbitMQ determines whether a message should be redelivered only if the consumer has broken the channel, and should the message be reprocessed at that point? This should be determined by the specific usage scenario and it is not always necessary for RabbitMQ to redeliver messages. But Rabbit gives us a way to reject the message and return to the queue.

public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
                BuiltinExchangeType.DIRECT);
        boolean durable = true;
        channel.queueDeclare(QueueCostant.CONSUMER_ACK, durable, false.false, null);
        channel.queueBind(QueueCostant.CONSUMER_ACK, ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            try {
                String message = new String(delivery.getBody(), "UTF-8");

                if (2 == delivery.getEnvelope().getDeliveryTag()) {
                    throw new IllegalStateException("Message cannot be processed properly, DeliveryTag is [" + delivery.getEnvelope().getDeliveryTag() + "]");
                }

                System.out.println("ConsumerTag is [" + consumerTag + "]," +
                        " [x] Received '" + message + "'," +
                        " DeliveryTag is [" + delivery.getEnvelope().getDeliveryTag() + "]," +
                        " Thread is [" + Thread.currentThread().getName() + "]"); Try {timeunit.seconds.sleep (10); } catch (InterruptedException e) { e.printStackTrace(); } // Return message acknowledgement channel.basicack (delivery.getenvelope ().getdeliveryTag (),false);
                System.out.println(" [x] Done");
            } catch (Exception e) {
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false.false); }}; boolean autoAck =false;
        channel.basicConsume(QueueCostant.CONSUMER_ACK, autoAck, deliverCallback, consumerTag -> {
        });
    }
Copy the code

However, it is generally not recommended to use the re-queue function, because rejected messages will be inserted into the head of the message queue, which can easily lead to the application into an infinite loop. We can use the dead-letter queue instead of re-queue function.

Ii. Publiser news confirmation

When a producer sends a message to a RabbitMQ switch, there is no way to guarantee that the message will reach the switch. To ensure reliable delivery of a message, RabbitMQ provides transaction messages and message acknowledgements, which are rarely used because transaction messages can seriously degrade RabbitMQ performance. We can use asynchronous message acknowledgement to ensure that a message is sent to RabbitMQ.

static SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK, BuiltinExchangeType.DIRECT); Channel.confirmselect (); // Confirm mode channel.confirmSelect(); channel.addConfirmListener( (deliveryTag, multiple) -> { System.out.println("Ack Callback DeliveryTag is [" + deliveryTag + "] multiple is " + multiple);
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                },

                (deliveryTag, multiple) -> {
                }
        );
        channel.addReturnListener(returnMessage -> {
            String message = new String(returnMessage.getBody());
            System.out.println("No routing message " + message);
        });

        boolean mandatory = true;
        for(int i = 1; i <= 100; I++) {/ / push persistent message long nextSeqNo = channel. GetNextPublishSeqNo (); String message ="Send message " + nextSeqNo;
            channel.basicPublish(ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack", mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            confirmSet.add(nextSeqNo);
            System.out.println(" [x] sent ' " + message + "'"); }}Copy the code

When a message arrives at RabbitMQ and is persisted, it is returned with deliveryTag, asynchronous listening acknowledgments can be batch or single, and we can maintain an ordered collection of unacknowledged messages to resend to RabbitMQ.

Another problem is that messages can be lost if the switch has no matching queue or if the queue has not yet been declared. In this case, you can set Mandatory =true and add returnListener to handle messages that have not been routed. You can also set up a backup switch to store these unrouted messages by setting the alternate-exchange parameter on the switch to reduce the complexity of the sender’s program.

    static SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(ExchangeCostant.EXCHANGE_CONSUMER_ACK,
                BuiltinExchangeType.DIRECT, true.false, createBackUpExchange(channel)); Channel.confirmselect (); // Confirm mode channel.confirmSelect(); channel.addConfirmListener( (deliveryTag, multiple) -> { System.out.println("Ack Callback DeliveryTag is [" + deliveryTag + "] multiple is " + multiple);
                    if (multiple) {
                        confirmSet.headSet(deliveryTag + 1).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                },

                (deliveryTag, multiple) -> {
                }
        );
        boolean mandatory = false;
        for(int i = 1; i <= 100; I++) {/ / push persistent message long nextSeqNo = channel. GetNextPublishSeqNo (); String message ="Send message " + nextSeqNo;
            channel.basicPublish(ExchangeCostant.EXCHANGE_CONSUMER_ACK, "consumer.ack", mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            confirmSet.add(nextSeqNo);
            System.out.println(" [x] sent ' " + message + "'");
        }
    }

    public static Map<String, Object> createBackUpExchange(Channel channel) throws Exception {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange", ExchangeCostant.EXCHANGE_BACKUP); / / declare a radio channel. The types of switches exchangeDeclare (ExchangeCostant. EXCHANGE_BACKUP, BuiltinExchangeType FANOUT,true.false, null);
        channel.queueDeclare(QueueCostant.BACK_UP, true.false.false, null);
        channel.queueBind(QueueCostant.BACK_UP, ExchangeCostant.EXCHANGE_BACKUP, "");

        return arguments;
    }
Copy the code

3. RabbitMQ nodes

Ensuring the reliability of messages at the application level has greatly improved the security of applications, but messages can still be lost when RabbitMQ nodes are restarted or down, so we need to set the persistence of queues and messages to ensure that messages can be recovered after a node is down or restarted.

If the machine cannot recover due to the failure of a single machine, messages will still be lost, so we need to set up mirror queues and clusters for key messages to ensure the high availability of message services.

Refer to the article: blog.csdn.net/u013256816/…