The introduction
RabbitMQ messages can be consumed in two authentication modes: automatic and manual
Automatic confirmation: The Broker (RabbitMQ server) removes a message from the queue after it has been sent to the consumer, regardless of whether the consumer has successfully consumed it. Messages can be lost if the business code fails while the consumer is consuming or if the system goes down while the consumer is still consuming.
Manual acknowledgement: A consumer manually sends an acknowledgement to the Broker, which removes the message from the queue after receiving the acknowledgement.
Due to the defects of automatic confirmation, manual confirmation is generally used to ensure the reliability of message services for some important messages.
So in the case of manual validation, how to ensure the reliability of messages in consumer business? Here we introduce a method that uses retry + manual confirmation + dead-letter queue to ensure that the consumption information is not lost.
Dead-letter queue
A Dead Letter is a message that cannot be properly processed by a consumer. There are two main reasons why a business program throws an exception when a consumer consumes it. First, the message itself is problematic (the main reason), such as the bank card number transmitted in the payment message does not exist. 2. Due to network fluctuation and other reasons, the third party service that consumers rely on is invoked abnormally, for example, the third party interface fails to be invoked, the database fails to access due to the failure to obtain connection, etc. These messages are typically placed in RabbitMQ’s dead-letter queue, processed by a dedicated consumer, or manually compensated.
The source of the dead-letter queue
BasicNack or channel.basicReject is used when the message is rejected and the Requeue attribute is set to false.
2. The time of the message in the queue exceeded the time to live (TTL).
The number of messages exceeded the capacity limit of the queue.
When the messages in a queue meet any of the above three conditions, the changed messages are moved from the original queue to the dead letter queue. If the changed queue is not bound to the dead letter queue, the message is discarded.
Tips: A dead letter queue is exactly like a normal business queue, except that a business creates a queue to store messages for processing failures. Therefore, it works in the same way as a service queue. A dead letter still needs to be forwarded by the switch to reach a dead letter queue.
Configuration of dead letter queues
1. Bind a dead letter switch to a business queue (that is, the switch used to forward dead letters in the queue).
2. Bind the dead letter queue to the dead letter switch.
Code examples:
@Configuration public class RabbitMQConfig { public static final String BUSINESS_EXCHANGE_NAME = "business-exchange"; public static final String DEAD_LETTER_EXCHANGE_NAME = "dead-letter-exchange"; public static final String BUSINESS_QUEUE_NAME = "business-queue"; public static final String DEAD_LETTER_QUEUE_NAME = "dead-letter-queue"; public static final String ROUTING_KEY = "routing-key"; @bean public DirectExchange businessExchange(){return new DirectExchange(BUSINESS_EXCHANGE_NAME); @bean public DirectExchange deadLetterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME); } @bean public Queue businessQueue(){Map<String, Object> args = new HashMap<>(2); Args. Put ("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME); return QueueBuilder.durable(BUSINESS_QUEUE_NAME).withArguments(args).build(); } @bean public Queue deadLetterQueue(){return new Queue(DEAD_LETTER_QUEUE_NAME); } @bean public Binding bindBusinessQueue(){return BindingBuilder.bind(businessQueue()).to(businessExchange()).with(ROUTING_KEY); } @bean public Binding bindDeadLetterQueue(){return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ROUTING_KEY); }}Copy the code
In the configuration example above, we declared a business queue, a business switch, a dead letter queue, and a dead letter switch. The x-dead-letter-exchange parameter when declaring a service queue specifies the queued dead-letter switch. If the message is deemed dead, the Broker will automatically forward it to the configured dead-letter switch.
Producer code
We define a Controller interface to test message sending, the body of which is passed in as interface parameters.
@RestController
@RequestMapping("/rabbitmq")
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/send")
public void send(@RequestParam String msg){ rabbitTemplate.convertAndSend(RabbitMQConfig.BUSINESS_EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, msg); }}Copy the code
Call the producer interface with postman:
http://localhost:5006/rabbitmq/send?msg=normal meaage
Copy the code
The RabbitMQ console shows that the declared switch queues have been created and messages sent to the service queues.
Consumer manual confirmation + retry
Consumer configuration
Host =192.168.44.104 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin Spring. The rabbitmq. Manual confirmation password = 123456 # open consumers spring. The rabbitmq. Listener. Simple. Acknowledge - mode = manualCopy the code
@Service
@Slf4j
@RabbitListener(queues = "business-queue")
public class RabbitConsumer {
/** * specifies the queue to consume */
@RabbitHandler
public void consume(String msg, Message message, Channel channel){
boolean success = false;
int retryCount = 3;
while(! success && retryCount-- >0) {try {
// Process the message
log.info("Received message: {}, deliveryTag = {}", msg, message.getMessageProperties().getDeliveryTag());
if(message.equals("dead-letter")) {throw new RuntimeException("Received a dead letter.");
}
// Confirm manually after normal processing is complete
success = true;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
log.error("Program exception: {}", e.getMessage()); }}// Failed to consume after the maximum number of retries was reached
if(! success){// Manually delete and move to the dead-letter queue
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.false);
} catch(IOException e) { e.printStackTrace(); }}}}Copy the code
The consumer is started, the message is manually acknowledged after normal consumption, and the message is deleted from the queue
We are going to send a dead letter
http://localhost:5006/rabbitmq/send?msg=dead-letter
Copy the code
From the figure above, we can see that the consuming code retries three times before negating the message. The Broker determines that the message is dead letter, sends it to the dead letter switch, and finally forwards it to the dead letter queue.
The figure shows that the dead letter was indeed forwarded to the dead letter queue. Depending on the actual business situation, we can create special dead-letter consumers to handle the dead-letter, or make manual compensation.
Tips: Database transactions are not covered in the code examples. If a consuming program uses the declarative transaction @Transactional, it manually rolls back the transaction after catching an exception. The diagram below: