1. Confirmation mechanism of the sender

– Why is the sender confirmation mechanism needed?

– What is the sender confirmation mechanism?

– Three confirmation mechanisms

  • 1. Confirm a single synchronization

    • 1.1 Configuring a Channel To enable the confirm mode


    • 1.2 After sending a message, call waitForConfirms() and wait for confirmation. Return true to confirm that RabbitMQ has received the message


  • 2. Confirm multiple synchronization

    • 2.1 Configuring a Channel and enabling the confirm mode


    • 2.2 After sending many guarantees, call waitForConfirms() and wait for confirmation. Returning true means that RabbitMQ has received all guarantees, but returning false does not mean that all guarantees have failed. It is possible that some guarantees have failed.

  • 3. Confirm asynchronously

    • 3.1 Configuring a Channel and enabling the confirm mode


    • 3.2 Add addConfirmListener to a channel, and when a message is sent, this method is called back to confirm whether it was sent successfully, but RabbitMQ asynchronously confirms either single or multiple messages

Asynchronous acknowledgment code

ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory. SetHost (" "); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { channel.confirmSelect(); ConfirmListener confirmListener = new ConfirmListener() { @Override public void handleAck(long deliveryTag, Boolean multiple) throws IOException {// Confirm success // deliveryTag: Number of messages before confirm // Multiple :true } @override public void handleNack(long deliveryTag, Boolean multiple) throws IOException {// Confirm failure}}; channel.addConfirmListener(confirmListener); String msgToSend = "Message content "; BasicPublish ("exchange.test.msg", "key.msg", null, msgTosend.getBytes (standardCharsets.utf_8)); }Copy the code

:: Single synchronization confirmation is recommended ::

2. Message return mechanism

– Why do you need a message return mechanism?

– What is the message return mechanism

– Enable the message return mechanism

BasicPublish has a method with three different parameters:

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
    this.delegate.basicPublish(exchange, routingKey, props, body);

public void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException {
    this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);

public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException {
    this.delegate.basicPublish(exchange, routingKey, mandatory, immediate, props, body);
Boolean mandatory (true) ¶ Boolean mandatory (true) ¶

First, new ReturnListener()

channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replayCode, String replayText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, Byte [] body) throws IOException {// Message cannot be routed}}); String msgToSend = objectMapper.writeValueAsString(orderMessage); / / message channel. BasicPublish (" exchange. Order. Restaurant ", "key. The restaurant", true, null, msgToSend.getBytes(StandardCharsets.UTF_8));Copy the code

New ReturnCallback()

channel.addReturnListener(new ReturnCallback() {
    public void handle(Return returnMessage) {

There is no difference between the two methods, except that new ReturnCallback() encapsulates the returned data in the Return class and returns the same data.

The data returned are replayCode, replayText, Exchange, routingKey, basicProperties, and Body

3. Confirmation mechanism at the consumer end

By default, when the consumer receives a message, it automatically acknowledges it (ACK)

– ACK type of the consumer

    1. Automatic ACK: The consumer automatically signs the message after receiving it
    1. Manual ACK: The message is not automatically signed for and needs to be signed for explicitly in the code

– Manual ACK type

    1. Single manual ACK: multiple=false
    1. Multiple manual ACK: multiple=true

A single manual ACK is recommended

DeliverCallback deliverCallback = (consumerTag, message) -> {
    String messageBody = new String(message.getBody());
4. Consumption end traffic limiting mechanism

– Why do you need to limit traffic

– QoS (Quality of Service assurance)

QoS prerequisite: Automatic confirmation is not used

Specific parameter Settings

RabbitMQ does not currently implement global and prefetchSize

// Enable channel limiting channel.basicqos (10); channel.basicConsume("queue.msg", false, deliverCallback, consumerTag -> {});Copy the code

5. Message expiration mechanism

By default, messages are queued and remain in the queue until consumed

The expiration Time of RabbitMQ is called TTL (Time to Live).

It is divided into message TTL (expiration time of a single message) and queue TTL (expiration time of all messages in a queue).

Using TTL alone is not recommended because it results in deleting the message directly

How do I set the appropriate TTL

Method of use

  • Example Set the TTL of a single message
// expiration 单位为毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
channel.basicPublish("exchange.test.msg", "key.msg", properties, messageToSend.getBytes());
  • Setting the QUEUE TTL
Map<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 15000);
        true, false, false, args);
If the current queue already exists, you need to delete it and create it again before setting the expiration time. Otherwise, an error occurs.

6. Dead-letter queues

A message with an expiration time set is discarded directly after the expiration date.

The purpose of a dead letter queue is to collect expired messages

What is a dead letter queue

Production messages -> Exchange -> Queue -> DL Exchange -> DL Queue -> Exception listening processing

When does it become a dead letter

  • Message rejected and Requeue =false(not requeued)
  • Message expiration
  • The queue length reaches the maximum. Procedure

Set the method

  • 1. Configure the switch and queue for forwarding and receiving dead letters
    • Exchange:dlx.exchange
    • Queue:dlx.queue
    • RoutingKey:#
  • 2. Set parameters for the dead-letter queue
    • x-dead-letter-exchange=dlx.exchange
/ / create receive dead-letter switches channel. ExchangeDeclare (" exchange. The DLX, "BuiltinExchangeType TOPIC, true, false, null); // Create a queue to receive dead letters channel.queueDeclare("queue.dlx", true, false, false, null); Channel.queuebind ("queue.dlx", "exchange.dlx", "*"); // Set dead-letter queue Map<String, Object> args = new HashMap<>(16); args.put("x-dead-letter-exchange", "exchange.dlx"); channel.queueDeclare("queue.restaurant", true, false, false, args);Copy the code