An overview of

Note: The binding of a service queue to a dead-letter switch is specified in the form of parameters (X-dead-letter-exchange and X-dead-letter-routing-key) when building a service queue.

The concept of dead faith

“Dead Letter” is a message mechanism in RabbitMQ. Dead letter messages are processed by RabbitMQ specially. If a dead letter queue is configured, they will be placed in the queue, if not, they will be discarded.

Dead letter situation

  1. The message is rejected (using channel.basicNack or channel.basicReject) and the Requeue property is set to false;
  2. The TTL of the message in the queue exceeded the specified TTL.
  3. The number of messages in the message queue has exceeded the maximum queue length.

How do I configure a dead letter queue

  1. Configure service queues and bind them to service switches.
  2. Configure dead-letter switch and route key for service queues.
  3. Configure a dead letter queue for a dead letter switch.

Note: You don’t declare a public dead-letter queue and then all the dead-letter messages go to the dead-letter queue. Instead, you configure a dead letter switch for each business queue that needs to use it. Dead-letter switches for the same project can share one and then assign a separate routing key to each business queue.

The sample

  1. Configuration information
@Configuration
public class RabbitMQConfig {
    public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";
    public static final String BUSINESS_QUEUE_A_NAME = "business.queue_a";
    public static final String BUSINESS_QUEUE_B_NAME = "business.queue_b";
    public static final String DEAD_LETTER_EXCHANGE = "deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dl_queue_a.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dl_queue_b.routingkey";
    public static final String DEAD_LETTER_QUEUE_A_NAME = "queue_a";
    public static final String DEAD_LETTER_QUEUE_B_NAME = "queue_b"; // Declare the business Exchange @bean ("businessExchange")
    public FanoutExchange businessExchange() {returnnew FanoutExchange(BUSINESS_EXCHANGE_NAME); } // declare dead letter exchange@bean ("deadLetterExchange")
    public DirectExchange deadLetterExchange() {returnnew DirectExchange(DEAD_LETTER_EXCHANGE); } // declare the business queue a@bean ("businessQueueA")
    public Queue businessQueueA(){ Map<String, Object> args = new HashMap<>(2); // Declare the dead-letter switch binding to the current queue args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // Declare the current queue dead letter "routing key" args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY);
        returnQueueBuilder.durable(BUSINESS_QUEUE_A_NAME).withArguments(args).build(); } // declare business queue b@bean ("businessQueueB")
    public Queue businessQueueB(){ Map<String, Object> args = new HashMap<>(2); // Declare the dead-letter switch binding to the current queue args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); // Declare the dead-letter routing key of the current queue args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        returnQueueBuilder.durable(BUSINESS_QUEUE_B_NAME).withArguments(args).build(); } // declare A dead-letter queue a@bean ("deadLetterQueueA")
    public Queue deadLetterQueueA() {returnnew Queue(DEAD_LETTER_QUEUE_A_NAME); } // declare a dead-letter queue b@bean ("deadLetterQueueB")
    public Queue deadLetterQueueB() {returnnew Queue(DEAD_LETTER_QUEUE_B_NAME); @qualifier (@qualifier ();"businessQueueA") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        returnBindingBuilder.bind(queue).to(exchange); } @qualifier (@qualifier) @qualifier (@qualifier)"businessQueueB") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        returnBindingBuilder.bind(queue).to(exchange); } @qualifier (@qualifier) @qualifier (@qualifier)"deadLetterQueueA") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange){
        returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY); @qualifier (@qualifier) @qualifier (@qualifier)"deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        returnBindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY); }}Copy the code
  1. Consumers of business queues
@Slf4j
@Component
public class BusinessMessageReceiver {
    // 监听器处理方法(监听业务队列A——BUSINESS_QUEUE_A_NAME)
    @RabbitListener(queues = BUSINESS_QUEUE_A_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("Received business message A: {}", msg);
        boolean ack = true; Exception exception = null; Try {// If the message contains a "deadletter", an exception is thrown and a "Nack" acknowledgement is returned for the received message -- putting the message in the dead-letter queueif (msg.contains("deadletter")){
                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }
        if(! ack){ log.error("Message consumption error MSG :{}", exception.getMessage(), exception); / / abnormal "nack confirmation channel. BasicNack (message. GetMessageProperties () getDeliveryTag (),false.false);
        } else{/ / normal "ack". Confirm the channel basicAck (message. GetMessageProperties () getDeliveryTag (),false); }} @rabbitListener (queues = BUSINESS_QUEUE_B_NAME) public void (queues = BUSINESS_QUEUE_B_NAME) receiveB(Message message, Channel channel) throws IOException { System.out.println("Received business message B:"+ new String(message.getBody())); "Ack" confirm / / for normal channel, basicAck (message. GetMessageProperties () getDeliveryTag (),false); }}Copy the code
  1. Dead-letter queue of consumers
@ Component public class DeadLetterMessageReceiver {/ / listener processing method (listening dead-letter queue A - DEAD_LETTER_QUEUE_A_NAME) @ RabbitListener (the queues  = DEAD_LETTER_QUEUE_A_NAME) public void receiveA(Message message, Channel channel) throws IOException { System.out.println("Received A dead letter A:"+ new String(message.getBody())); "Ack" confirm / / for normal channel, basicAck (message. GetMessageProperties () getDeliveryTag (),false);
    }
    
    // 监听器处理方法(监听死信队列B——DEAD_LETTER_QUEUE_B_NAME)
    @RabbitListener(queues = DEAD_LETTER_QUEUE_B_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("Received a dead letter B:"+ new String(message.getBody())); "Ack" confirm / / for normal channel, basicAck (message. GetMessageProperties () getDeliveryTag (),false); }}Copy the code
  1. producers
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {
    @Autowired
    private BusinessMessageSender sender;

    @RequestMapping("sendmsg") public void sendMsg(String msg){ sender.sendMsg(msg); }} @component public class BusinessMessageSender {@autoWired private RabbitTemplate RabbitTemplate; public void sendMsg(String msg){ rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME,"", msg); }}Copy the code

Application scenarios of dead letter queues

Generally, it is used in important service queues to ensure that messages that are not correctly consumed are not discarded. The possible causes of abnormal consumption are as follows:

  1. An exception is caused by an error in the message itself.
  2. Parameter verification is abnormal during processing.
  3. Abnormal query caused by network fluctuation and so on;

By configuring a dead letter queue, you can temporarily store incorrectly processed messages in another queue. After troubleshooting, you can write corresponding processing code to process the dead letter messages, which is much better than manual data recovery.