1. Confirmation mechanism of the sender

– Why is the sender confirmation mechanism needed?

After the message is sent, the sender does not know whether RabbitMQ has actually received the messageCopy the code

– What is the sender confirmation mechanism?

After the RabbitMQ message is sent, if RabbitMQ receives the message, it sends a reply to the sender to confirm that the message is successfully sentCopy the code

– Three confirmation mechanisms

  • 1. Confirm a single synchronization

    • 1.1 Configuring a Channel To enable the confirm mode

      channel.confirmSelect();

    • 1.2 After sending a message, call waitForConfirms() and wait for confirmation. Return true to confirm that RabbitMQ has received the message

      channel.waitForConfirms();

  • 2. Confirm multiple synchronization

    • 2.1 Configuring a Channel and enabling the confirm mode

      channel.confirmSelect();

    • 2.2 After sending many guarantees, call waitForConfirms() and wait for confirmation. Returning true means that RabbitMQ has received all guarantees, but returning false does not mean that all guarantees have failed. It is possible that some guarantees have failed.

  • 3. Confirm asynchronously

    • 3.1 Configuring a Channel and enabling the confirm mode

      channel.confirmSelect();

    • 3.2 Add addConfirmListener to a channel, and when a message is sent, this method is called back to confirm whether it was sent successfully, but RabbitMQ asynchronously confirms either single or multiple messages

Asynchronous acknowledgment code

ConnectionFactory connectionFactory = new ConnectionFactory(); 192.168.10.233 connectionFactory. SetHost (" "); try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { channel.confirmSelect(); ConfirmListener confirmListener = new ConfirmListener() { @Override public void handleAck(long deliveryTag, Boolean multiple) throws IOException {// Confirm success // deliveryTag: Number of messages before confirm // Multiple :true } @override public void handleNack(long deliveryTag, Boolean multiple) throws IOException {// Confirm failure}}; channel.addConfirmListener(confirmListener); String msgToSend = "Message content "; BasicPublish ("exchange.test.msg", "key.msg", null, msgTosend.getBytes (standardCharsets.utf_8)); }Copy the code

:: Single synchronization confirmation is recommended ::


2. Message return mechanism

– Why do you need a message return mechanism?

After the sending end sends a message, it is not known whether the message is correctly routedCopy the code

– What is the message return mechanism

After the sender sends a message, RabbitMQ routes the message and, if no target queue is found, returns a ReturnListener to notify the senderCopy the code

– Enable the message return mechanism

Set Mandatory to true, or false, RabbitMQ discards messages that cannot be routedCopy the code

BasicPublish has a method with three different parameters:

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
    this.delegate.basicPublish(exchange, routingKey, props, body);
}

public void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException {
    this.delegate.basicPublish(exchange, routingKey, mandatory, props, body);
}

public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException {
    this.delegate.basicPublish(exchange, routingKey, mandatory, immediate, props, body);
}
Copy the code

Boolean mandatory (true) ¶ Boolean mandatory (true) ¶

First, new ReturnListener()

channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replayCode, String replayText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, Byte [] body) throws IOException {// Message cannot be routed}}); String msgToSend = objectMapper.writeValueAsString(orderMessage); / / message channel. BasicPublish (" exchange. Order. Restaurant ", "key. The restaurant", true, null, msgToSend.getBytes(StandardCharsets.UTF_8));Copy the code

New ReturnCallback()

channel.addReturnListener(new ReturnCallback() {
    @Override
    public void handle(Return returnMessage) {

    }
});
Copy the code

There is no difference between the two methods, except that new ReturnCallback() encapsulates the returned data in the Return class and returns the same data.

The data returned are replayCode, replayText, Exchange, routingKey, basicProperties, and Body


3. Confirmation mechanism at the consumer end

By default, when the consumer receives a message, it automatically acknowledges it (ACK)

– ACK type of the consumer

    1. Automatic ACK: The consumer automatically signs the message after receiving it
    1. Manual ACK: The message is not automatically signed for and needs to be signed for explicitly in the code

– Manual ACK type

    1. Single manual ACK: multiple=false
    1. Multiple manual ACK: multiple=true

A single manual ACK is recommended

DeliverCallback deliverCallback = (consumerTag, message) -> {
    String messageBody = new String(message.getBody());
    channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
Copy the code

4. Consumption end traffic limiting mechanism

– Why do you need to limit traffic

During service peak hours, too many messages are sent to the receiving end and the service crashes. Enable the traffic limiting mechanism to limit the message push speed to ensure service stabilityCopy the code

– QoS (Quality of Service assurance)

The QoS function ensures that no new messages are consumed until a certain number of messages are acknowledgedCopy the code

QoS prerequisite: Automatic confirmation is not used

Specific parameter Settings

PrefetchCount: indicates the maximum number of unacknowledged messages to be pushed by a consumer. Global: true: indicates the traffic limit for the entire consumer. False: specific to the current channel. PrefetchSize: Indicates the maximum size of a single message, usually 0Copy the code

RabbitMQ does not currently implement global and prefetchSize

// Enable channel limiting channel.basicqos (10); channel.basicConsume("queue.msg", false, deliverCallback, consumerTag -> {});Copy the code

5. Message expiration mechanism

By default, messages are queued and remain in the queue until consumed

The expiration Time of RabbitMQ is called TTL (Time to Live).

It is divided into message TTL (expiration time of a single message) and queue TTL (expiration time of all messages in a queue).

Using TTL alone is not recommended because it results in deleting the message directly

How do I set the appropriate TTL

The TTL should be longer than the average service restart time and longer than the peak service timeCopy the code

Method of use

  • Example Set the TTL of a single message
// expiration 单位为毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("15000").build();
channel.basicPublish("exchange.test.msg", "key.msg", properties, messageToSend.getBytes());
Copy the code
  • Setting the QUEUE TTL
Map<String, Object> args = new HashMap<>(16);
args.put("x-message-ttl", 15000);
channel.queueDeclare("queue.test",
        true, false, false, args);
Copy the code

If the current queue already exists, you need to delete it and create it again before setting the expiration time. Otherwise, an error occurs.

6. Dead-letter queues

A message with an expiration time set is discarded directly after the expiration date.

The purpose of a dead letter queue is to collect expired messages

What is a dead letter queue

Queue with DLX attribute set (dead-letter-exchange)Copy the code

Production messages -> Exchange -> Queue -> DL Exchange -> DL Queue -> Exception listening processing

When does it become a dead letter

  • Message rejected and Requeue =false(not requeued)
  • Message expiration
  • The queue length reaches the maximum. Procedure

Set the method

  • 1. Configure the switch and queue for forwarding and receiving dead letters
    • Exchange:dlx.exchange
    • Queue:dlx.queue
    • RoutingKey:#
  • 2. Set parameters for the dead-letter queue
    • x-dead-letter-exchange=dlx.exchange
/ / create receive dead-letter switches channel. ExchangeDeclare (" exchange. The DLX, "BuiltinExchangeType TOPIC, true, false, null); // Create a queue to receive dead letters channel.queueDeclare("queue.dlx", true, false, false, null); Channel.queuebind ("queue.dlx", "exchange.dlx", "*"); // Set dead-letter queue Map<String, Object> args = new HashMap<>(16); args.put("x-dead-letter-exchange", "exchange.dlx"); channel.queueDeclare("queue.restaurant", true, false, false, args);Copy the code