This is the 15th day of my participation in the August More text Challenge. For details, see: August More Text Challenge


Related articles

MyBatis series summary: MyBatis series


preface

  • So far, the basics of MQ have been covered, and this is the last of the basics! The following will integrate SpringBoot to the actual combat!

  • Dead letter queues, as the name suggests, are messages that cannot be consumed.

  • What are the reasons why messages can’t be consumed?

    • The TTL of the message expired.
    • Queue length is when the queue is full and no new messages can be added to the MQ queue.
    • The message is rejected, basic.reject or basic.nack, and is rejected to be put back on the original queue (Requeue =false).
  • Application scenarios

    • To ensure that the message data of the order service is not lost, it is necessary to use RabbitMQ’s dead-letter queuing mechanism, which puts the message into the queue when the consumption of the message is abnormal.
    • After the user has placed an order in the mall and clicked to pay, it will automatically expire if the payment is not made at the specified time.
    • If you buy a train ticket, the order will be invalid if you don’t pay in half an hour.
  • Logical architecture diagram

1. Dead letter queues

① The TTL of the message is expired

  • producers

  • /** * This is a test producer *@author DingYongJun
     *@date2021/8/6 * /
    public class DyProducerTest_dead {
    
        private static final String EXCHANGE_NAME = "normal_exchange";
        /** * For convenience, we use the main function to test *@param args
         */
        public static void main(String[] args) throws Exception {
            publishMessageIndividually();
        }
    
        public static void publishMessageIndividually(a) throws Exception {
            // Use the utility class to create the channel
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            // Set the expiration time (unit: ms)
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
            for (int i=0; i<5; i++){ String msg = i+"News";
                // Send a message to the specified switch
                channel.basicPublish(EXCHANGE_NAME,"zhangsan",properties,msg.getBytes());
                System.out.println("I'm the producer, I sent a plus."+msg); }}}Copy the code
  • consumers

  • /** * This is a test for consumers **@author DingYongJun
     * @date2021/8/6 * /
    public class DyConsumerTest_dead01 {
    
        // Common switch
        private static final String EXCHANGE_NAME = "normal_exchange";
        // A dead-letter switch that consumes messages from private message queues
        private static final String DEAD_NAME = "dead_exchange";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            // Declare two switches first
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);/ / ordinary
            channel.exchangeDeclare(DEAD_NAME, BuiltinExchangeType.DIRECT);/ / dead letter
            // Declare the dead letter queue
            String deadQueueName = "dead_queue";
            channel.queueDeclare(deadQueueName,false.false.false.null);
            // Bind the dead-letter queue to the switch
            channel.queueBind(deadQueueName,DEAD_NAME,"lisi");
            // The normal queue is bound to the dead-letter queue
            Map<String, Object> params = new HashMap<>();
            // Normal queue set dead letter switch parameter key is fixed value
            params.put("x-dead-letter-exchange", DEAD_NAME);
            // Set the dead-letter routing-key parameter to a fixed value
            params.put("x-dead-letter-routing-key"."lisi");
    
            String normalQueue = "normal-queue";
            channel.queueDeclare(normalQueue, false.false.false, params);
            channel.queueBind(normalQueue, EXCHANGE_NAME, "zhangsan");
            System.out.println("Waiting for message.....");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer01 gets a message."+message);
            };
            channel.basicConsume(normalQueue, true, deliverCallback, consumerTag -> { }); }}Copy the code
  • Start the consumer first, then stop, otherwise the message will be consumed directly, for simulation so that the message can expire after ten seconds

  • Looking at the background page, you can see that both the normal queue and the dead letter queue started successfully

  • At this point the producer is started and five messages are sent

  • As you can see, five messages were successfully received in the common queue.

  • Wait 10 seconds, because we have closed the consumer, so the message will expire, and you can see that all five messages are in the dead letter queue.

  • Why are there 10 dead letters? Because I’ve tested it before, and there were 5 messages.

  • At this point, we write consumers who specialize in consuming private message queues

  • /** * This is a test for consumers *@author DingYongJun
     *@date2021/8/1 * /
    public class DyConsumerTest_dead02 {
        private static final String DEAD_EXCHANGE = "dead_exchange";
        public static void main(String[] argv) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
            String deadQueue = "dead_queue";
            channel.queueDeclare(deadQueue, false.false.false.null);
            channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
            System.out.println("Waiting to receive a dead-letter queue message.....");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("Consumer02 receives messages from the dead letter queue" + message);
            };
            channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> { }); }}Copy the code
  • Start the consumer to see the results

  • You can see that success is consumed.

② The queue is too long

  • The producer commented out the expiration time parameter

  • For normal consumers, increase the queue length limit to 3

  • Note that you need to delete the original queue at this time, because the parameters have changed, do not delete the start error.

  • Start the general queue consumer and producer

  • The normal queue holds only three messages, while the dead letter queue has seven. Proof that the capacity of the queue is exceeded will also enter the dead letter queue.

③ The message was rejected

  • The producer code doesn’t change

  • Consumer emulation accepts message rejection

  • /** * This is a test for consumers **@author DingYongJun
     * @date2021/8/6 * /
    public class DyConsumerTest_dead01 {
    
        // Common switch
        private static final String EXCHANGE_NAME = "normal_exchange";
        // A dead-letter switch that consumes messages from private message queues
        private static final String DEAD_NAME = "dead_exchange";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            // Declare two switches first
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);/ / ordinary
            channel.exchangeDeclare(DEAD_NAME, BuiltinExchangeType.DIRECT);/ / dead letter
    
            // Declare the dead letter queue
            String deadQueueName = "dead_queue";
            channel.queueDeclare(deadQueueName,false.false.false.null);
            // Bind the dead-letter queue to the switch
            channel.queueBind(deadQueueName,DEAD_NAME,"lisi");
    
            // The normal queue is bound to the dead-letter queue
            Map<String, Object> params = new HashMap<>();
            // Normal queue set dead letter switch parameter key is fixed value
            params.put("x-dead-letter-exchange", DEAD_NAME);
            // Set the dead-letter routing-key parameter to a fixed value
            params.put("x-dead-letter-routing-key"."lisi");
            // Set the queue length limit to 3
    // params.put("x-max-length",3);
    
            String normalQueue = "normal-queue";
            channel.queueDeclare(normalQueue, false.false.false, params);
            channel.queueBind(normalQueue, EXCHANGE_NAME, "zhangsan");
            System.out.println("Waiting for message.....");
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                if (message.equals("3 message")){
                    System.out.println("Received this message but I don't want it:"+message);
                    // Set requeue to false to deny reenqueueing. If the queue is configured with a dead letter queue, the exchange will send it to the dead letter queue. Set true to automatically recolumn.
                    channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
                }else {
                    // Other messages are answered normally
                    System.out.println("Consumer01 gets a message."+message);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}; channel.basicConsume(normalQueue,true, deliverCallback, consumerTag -> { }); }}Copy the code
  • After the producer is executed

  • After starting the consumer

2. Delay queue

  • The delay queue, the queue is ordered inside, the most important feature is its delay attribute, the elements in the delay queue are expected to be retrieved and processed after or before the specified time.

  • In simple terms, a delay queue is a queue that holds elements that need to be processed at a specified time.

  • Usage scenarios

    • The order will be automatically cancelled if it is not paid within ten minutes
    • After a meeting is scheduled, inform all participants to attend the meeting ten minutes before the scheduled time
  • Each of these scenarios has a feature that requires a task to be completed at a specified point in time after or before an event

  • Set the TTL

  • Methods a

    • Set the expiration time for each message.
    • Set the expiration time of the queue.
  • Here is no code demo, the following integration of the actual SpringBoot article will be explained!

  • Just to give you an idea

    • We have a regular team.
    • A private message queue.
    • Set the expiration time for messages in the normal queue.
    • If the message enters the dead letter queue after expiration.
    • Then there are dedicated consumers consuming messages in the queue of private messages.
    • Is this equivalent to a delay queue?
  • As for why not say here, because this thing is queue, first in first out, do not use MQ plug-in to achieve, see the effect, stay in actual combat article contrast with a detailed explanation!


I see no ending, but I will search high and low

If you think I blogger writes good! Writing is not easy, please like, follow, comment to encourage the blogger ~hahah