@ (toc) variety of timing tasks, common tasks such as log backups regularly, we may at 3 a.m. every day to backup, this kind of fixed time timing task we usually use cron expression can be easily implemented, and some special tasks regularly, to see the time bomb in the movie, 3 minutes after the blast, This kind of timed task is not very well described by Cron, because the start time is uncertain, and we sometimes encounter similar requirements during development, for example:

  • In the e-commerce project, when we place an order, we generally need to pay within 20 minutes or 30 minutes. Otherwise, the order will enter the exception processing logic and be cancelled. Then entering the exception processing logic can be regarded as a delay queue.
  • I bought a smart casserole, can be used to cook porridge, before going to work to put the material into the pot, and then set a few minutes to start cooking porridge, so after work you can drink delicious porridge, then the porridge instruction can also be regarded as a delay task, in a delay queue, time to execute again.
  • The company’s conference reservation system will notify all users who have scheduled a meeting half an hour before the meeting starts.
  • If the safety work order is not processed within 24 hours, it will automatically pull up the wechat group of the enterprise to remind the relevant responsible person.
  • After placing an order for takeout, the user will remind the delivery boy that the time limit is about to expire 10 minutes later.
  • .

There are many scenarios where we need delay queues.

This article uses RabbitMQ as an example to talk about delayed queuing gameplay.

In general, there are two ways to implement scheduled tasks on RabbitMQ:

  • RabbitMQ uses the message expiration and private message queue mechanisms to implement scheduled tasks.
  • The rabbitMQ_delayed_message_exchange plugin for RabbitMQ is simpler.

Let’s look at the two ways.

1. With the plug-in

1.1 Installing Plug-ins

The rabbitmq_delayed_message_exchange plugin is available on GitHub and can be downloaded directly:

  • Github.com/rabbitmq/ra…

Choose the version that suits you, in this case the latest version 3.9.0.

After the download is complete, run the following command on the command line to copy the downloaded file to the Docker container:

Docker cp. / rabbitmq_delayed_message_exchange - 3.9.0. Ez some - rabbit: / pluginsCopy the code

The first argument is the location of the file on the host, and the second argument is the location to copy to the container.

Then run the following command to enter the RabbitMQ container:

docker exec -it some-rabbit /bin/bash
Copy the code

After entering the container, execute the following command to enable the plug-in:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Copy the code

After successfully enabling, you can also view all the installed plug-ins by running the following command to see if there are any plug-ins we just installed, as follows:

rabbitmq-plugins list
Copy the code

The following figure shows the complete execution process of the command:

OK, now that the configuration is complete, exit the RabbitMQ container. Then start coding.

1.2 Sending and Receiving Messages

Next, start sending and receiving messages.

First we create a Spring Boot project to introduce Web and RabbitMQ dependencies as follows:

After the RabbitMQ project is successfully created, configure the following basic information in application.properties:

spring.rabbitmq.host=localhost
spring.rabbitmq.password=guest
spring.rabbitmq.username=guest
spring.rabbitmq.virtual-host=/
Copy the code

A configuration class for RabbitMQ is provided:

@Configuration
public class RabbitConfig {
    public static final String QUEUE_NAME = "javaboy_delay_queue";
    public static final String EXCHANGE_NAME = "javaboy_delay_exchange";
    public static final String EXCHANGE_TYPE = "x-delayed-message";

    @Bean
    Queue queue(a) {
        return new Queue(QUEUE_NAME, true.false.false);
    }

    @Bean
    CustomExchange customExchange(a) {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type"."direct");
        return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true.false,args);
    }
    
    @Bean
    Binding binding(a) {
        returnBindingBuilder.bind(queue()) .to(customExchange()).with(QUEUE_NAME).noargs(); }}Copy the code

Note that the definitions of switches are different.

The switch we are using here is CustomExchange, which is a switch provided in Spring. There are five parameters to create CustomExchange, with the following meanings:

  • Switch name.
  • Switch type, this place is fixed.
  • Whether the switch is persistent.
  • If no queue is bound to the switch, the switch is deleted.
  • Other parameters.

The last arGS parameter specifies the switch message distribution type, known as direct, FANout, Topic, and header, which will be used in the future.

Let’s create another message consumer:

@Component
public class MsgReceiver {
    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void handleMsg(String msg) {
        logger.info("handleMsg,{}",msg); }}Copy the code

Print the message content.

Write another unit test method to send the message:

@SpringBootTest
class MqDelayedMsgDemoApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads(a) throws UnsupportedEncodingException {
        Message msg = MessageBuilder.withBody(("Hello jiangnan a little rain"+new Date()).getBytes("UTF-8")).setHeader("x-delay".3000).build(); rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.QUEUE_NAME, msg); }}Copy the code

Set the delay time for the message in the message header.

Now, start the Spring Boot project and run the unit test method to send the message. The final console logs are as follows:

You can see from the log that message delay has been implemented.

2. DLX implements delay queue

2.1 Implementation idea of delay queue

The idea of delay queue implementation is also very simple, that is, the last article we said DLX (dead-letter switch) +TTL (message timeout time).

We can think of a dead letter queue as a delay queue.

To be specific:

If a message needs to be executed 30 minutes later, we set the message to be valid for 30 minutes and configure a dead-letter switch and dead-letter for the messagerouting_key, and do not set up consumers for the message queue, then 30 minutes later, the message is not consumed by consumers and entered the dead letter queue, at this time we have a consumer “squatting” in the dead letter queue, the message entered the dead letter queue, immediately consumed.

This is the idea of implementing delay queue, isn’t it very simple?

2.2 case

Next Songge through a simple case, and we demonstrate the specific implementation of the delay queue.

Start with a RabbitMQ startup.

Then we create a Spring Boot project to introduce RabbitMQ dependencies:

Then configure RabbitMQ’s basic connection information in application.properties:

spring.rabbitmq.host=localhost
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
Copy the code

Next we configure two message queues: a normal queue and a dead letter queue:

@Configuration
public class QueueConfig {
    public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
    public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
    public static final String JAVABOY_ROUTING_KEY = "javaboy_routing_key";
    public static final String DLX_QUEUE_NAME = "dlx_queue_name";
    public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";
    public static final String DLX_ROUTING_KEY = "dlx_routing_key";

    /** * dead letter queue *@return* /
    @Bean
    Queue dlxQueue(a) {
        return new Queue(DLX_QUEUE_NAME, true.false.false);
    }

    /** * dead letter switch *@return* /
    @Bean
    DirectExchange dlxExchange(a) {
        return new DirectExchange(DLX_EXCHANGE_NAME, true.false);
    }

    /** * Bind a dead letter queue to a dead letter switch *@return* /
    @Bean
    Binding dlxBinding(a) {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange())
                .with(DLX_ROUTING_KEY);
    }

    /** * Normal message queue *@return* /
    @Bean
    Queue javaboyQueue(a) {
        Map<String, Object> args = new HashMap<>();
        // Set the message expiration time
        args.put("x-message-ttl".1000*10);
        // Set a dead-letter switch
        args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
        // Set a dead letter routing_key
        args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        return new Queue(JAVABOY_QUEUE_NAME, true.false.false, args);
    }

    /** * Common switch *@return* /
    @Bean
    DirectExchange javaboyExchange(a) {
        return new DirectExchange(JAVABOY_EXCHANGE_NAME, true.false);
    }

    /** * Bind the normal queue to the corresponding switch *@return* /
    @Bean
    Binding javaboyBinding(a) {
        returnBindingBuilder.bind(javaboyQueue()) .to(javaboyExchange()) .with(JAVABOY_ROUTING_KEY); }}Copy the code

This configuration code is a bit longer, but the principle is simple.

  • The configuration can be divided into two groups. The first group is configured with a dead letter queue, and the second group is configured with a normal queue. Each group consists of message queues, message switches, and bindings.
  • When configuring a message queue, specify a dead letter queue for the message queue. Portal: Do messages in RabbitMQ expire? .
  • The default time unit for setting the expiration time of messages in a queue is milliseconds.

Next we configure a consumer for the dead-letter queue as follows:

@Component
public class DlxConsumer {
    private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class);

    @RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)
    public void handle(String msg) { logger.info(msg); }}Copy the code

Print out the message when you receive it.

That’s it.

Start the project.

Finally, we send a message in the unit test:

@SpringBootTest
class DelayQueueApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    void contextLoads(a) {
        System.out.println(new Date());
        rabbitTemplate.convertAndSend(QueueConfig.JAVABOY_EXCHANGE_NAME, QueueConfig.JAVABOY_ROUTING_KEY, "hello javaboy!"); }}Copy the code

There is nothing more to be said for this, just a normal message is sent and 10 seconds later the message is printed out in the dead-letter queue of consumers.

3. Summary

These are the two ways we can use RabbitMQ for delay queuing