RabbitMQ implements delayed queues

preface

When we were designing a large shopping website, there was such a scene that if we did not pay for a long time after placing an order, the order would be cancelled over time and the inventory would be restored. This is what we often say that the inventory will be carried out after the order expires.

The design of distributed seckill system can refer to my blog: Design of distributed seckill system

Use RabbitMQ in SpringBoot projects see my blog: Use RabbitMQ in SpringBoot projects

The body of the

Delay queue

The scenario of returning inventory after order expiration:

  • Taobao seven day automatic confirmation of receipt: After we sign for the receipt of goods, the logistics system will delay seven days to send a message to the payment system, notifies the payment system to remit the money to the merchant.
  • 12306 Ticket payment confirmation page: We have internal countdown before selecting the ticket and completing the payment. If the order is not confirmed within 30 minutes, the order will be automatically cancelled.

Potential solutions for restocking expired orders:

  • Use background threads to constantly scan the database, performance is very low, deprecated
  • Store order dataRedisAnd set the expiration time, considering to retain the order information, discard
  • usedelayedQueueDelay queue, set the expiration time, in which the object can only be removed from the queue when the expiration, expiration operation, but does not support distributed, abandoned
  • Distributed scheduled tasks are used to process related orders, but the time interval is difficult to set and the data volume is huge, so use with caution
  • useRabbitMQDelay queue to implement, and use a scheduled task to deal with possible message loss caused by the inventory cannot be released, adopted

RabbitMQ delay queue

Implementation of RabbitMQ delay queue

  • inThe RabbitMQ 3.6 xWe used to do thatDead letter queue +TTL expiration timeTo implement delay queuing.
  • inThe RabbitMQ 3.6 xTo start,RabbitMQThe official delay queue plug-in can be downloaded and placed intoRabbitMQRoot directorypluginsUnder.

This section uses RabbitMQ 3.6.6 as an example to describe how to install the delay plug-in

Step 1: Download the RabbitMQ delay queue plugin

Step 2: Place the corresponding plugins in the directory

cd /rabbitmq/lib/rabbitmq_server-3.66./plugins
Copy the code

Step 3: Start the plug-in

whereis rabbitmq
cd /usr/local/rabbitmq/bin
sh rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Copy the code

Step 4: Restart the RabbitMQ server for the plug-in to take effect

Service the rabbitmq server restart or/usr/local/rabbitmq/bin/rabbitmqctl stop/usr/local/rabbitmq/bin/the rabbitmq server. - -detachedCopy the code

Step 5: Check whether the plug-in is installed properly

sh rabbitmq-plugins list
Copy the code

SpringBoot uses RabbitMQ to implement delay queues

Rely on

<! -- RabbitMQ message queue --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>Copy the code

The properties, application of springboot configuration class

# configure rabbitmq spring.rabbitmq.host=127.0. 01.
spring.rabbitmq.port=5672Spring. The rabbitmq. Username = guest spring. The rabbitmq. Password = # guest custom parameters: concurrent consumers initialization value spring. The rabbitmq. Listener. Concurrency =10# customization parameters: the maximum concurrent consumers spring. The rabbitmq. Listener. Max - concurrency =20# customization parameters: each customer every time listening to take the number of messages processed spring korah. The rabbitmq. Listener. Prefetch =5Name =string.queue. Name2 string.exchange.name =string.exchange.name2 string.routing.key.name=string.routing.key.name2Copy the code

RabbitmqConfig. Class: the rabbitmq configuration class

  • directExchange.setDelayed(true): Switch enable supports delay
/** * Rabbitmq configuration class */
@Configuration
public class RabbitmqConfig {

    @Autowired
    private Environment env;


    @Autowired
    private CachingConnectionFactory connectionFactory;


    @Autowired
    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;


    /** * single consumer, i.e. queue message **@return* /
    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer(a) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        factory.setPrefetchCount(1);
        factory.setTxSize(1);
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }

    /** * RabbitTemplate tools * RabbitTemplate will use channels * channels are multiplexed two-way data channels to reduce TCP connections **@return* /
    @Bean
    public RabbitTemplate rabbitTemplate(a) {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }


    /** Generate rabbitMQ component * queue: data structure used to store messages, located on disk or in memory. Switch: component that receives messages sent to RabbitMQ and decides to post them to that queue; * Binding: A set of rules that tell the exchange which queue messages should be stored in. * * * * /


    /** * character message **/
    // Define the queue
    @Bean(name = "stringQueue")
    public Queue stringQueue(a) {
        return new Queue(env.getProperty("string.queue.name"), true);
    }


    // Define switch DirectExchange as a queued message exchange
    @Bean
    public DirectExchange stringExchange(a) {
        DirectExchange directExchange =new DirectExchange(env.getProperty("string.exchange.name"), true.false);
        directExchange.setDelayed(true);
        return directExchange;
    }


    // Define the binding
    @Bean
    public Binding stringBinging(a) {
        return BindingBuilder.bind(stringQueue()).to(stringExchange()).with(env.getProperty("string.routing.key.name")); }}Copy the code

CommonMqListener. Class: the RabbitMQ consumers

/** * here is the consumer of the message queue */
@Component
@Slf4j
public class CommonMqListener {

    @Autowired
    RedisTemplate redisTemplate;

    /** * listen for consumption messages **@param message
     */
    @RabbitListener(queues = "${string.queue.name}", containerFactory = "singleListenerContainer")
    public void consumeStringQueue(@Payload byte[] message) {
        try {
            log.info("Listen for consumption, current time."+ DateTimeUtils.getSystemTime() +"Listen on message: {}".new String(message, "UTF-8"));
        } catch (Exception e) {
            log.error("Listen to consume order message message exception {},{}",e.getStackTrace(), e); }}}Copy the code

AppForTest. Class: test class

  • message.getMessageProperties().setDelay(60000): Sets the delay to 1 minute. The default unit is ms
@RunWith(SpringJUnit4ClassRunner.class)
/ / start of Spring
@SpringBootTest(classes = App.class)
public class AppForTest {

   @Autowired
   private RabbitTemplate rabbitTemplate;

   @Autowired
   private Environment env;

   @Test
    public void ceshi(a){
      rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
      rabbitTemplate.setExchange(env.getProperty("string.exchange.name"));
      rabbitTemplate.setRoutingKey(env.getProperty("string.routing.key.name"));
      try {
         String msg="Send message: current time"+ DateTimeUtils.getSystemTime();
         Message message = MessageBuilder.withBody(msg.getBytes("UTF-8")).build();
         // Set the request encoding format
         message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON);
        // Set a delay of 60 seconds
         message.getMessageProperties().setDelay(60000);
         rabbitTemplate.convertAndSend(message);
      } catch (Exception e) {
         throw new BusinessException(CouponTypeEnum.OPERATE_ERROR,"String message producer failed to send message :"+ e.getMessage()); }}}Copy the code

Final demo result:

  • After the producer sends a message, it is received and consumed by the consumer one minute later