A,

Query the payment result. Due to the complexity of the payment system, the money may not be received in real time after the customer pays. At this point, delayed task polling is required to query the payment results.

In similar scenarios where results cannot be obtained directly in real time, delayed tasks can be used to complete the query of result status.

Second, the plan

Normal switch + dead letter switch. The message expiration time is set according to the delay requirement. When the message expires, it enters the dead letter queue, and the consumer listens to the dead letter queue to realize the delayed task.

Three, concepts,

To understand how a normal exchange and a private exchange implement a delayed task, it is necessary to understand RabbitMQ’s message lifetime TTL and the concept of a dead-letter switch.

1. Time to Live(TTL)

RabbitMQ can set TTL for queues and messages separately. Setting the queue means that the queue has no retention time for consumer connections, or it can be set separately for each individual message. If TTL is set for both queues and messages, a smaller time is taken. When the duration of a message in a queue exceeds the configured TTL, the message is said to be dead. Note that the same message that is routed to different queues will have different expiration times or will never expire. This depends on the queue in which the message exists. A death message in one queue does not affect the life cycle of similar messages in other queues. So if a message is routed to a different queue, it may die at a different time (different queue Settings). I’ll focus on TTL for a single message, because that’s the key to implementing deferred tasks. You can set the TTL of the message by using the expiration time field of the message

    rabbitTemplate.convertAndSend(delayExchangeName, delayRoutingKeyName, reqDTO, message -> {
            // Set the expiration time to the expected delay time, and enter the dead-letter queue to implement the delayed task
            message.getMessageProperties().setExpiration("5000");
            return message;
        });
Copy the code

The above message has an expiration time of 10000ms, so if it is not consumed after 10 seconds, the message is dead, that is, “dead letter”. Dead Letter alone is not enough to complete the delayed task. Dead Letter Exchanges are also required.

Dead Letter Exchanges

A message in a queue may become dead-lettered, and then be resent to a switch when any of the following events occur:

  • The consumer validates the message using basic.reject or basic.nack with the Requeue parameter value set to false (negativelyAcknowledged)
  • The message expired because of per-message TTL
  • The message was discarded because the queue exceeded its length limit

Note that the expiration of a queue does not cause messages in it to expire. Dead letter switches (DLXs) are common switches. They can be of any type or can be declared in a common way. When configuring a dead-letter switch for a queue, you can use the optional parameter “X-dead-letter -exchange” when declaring a queue. The value must be the name of the switch on the same virtual host as the queue. Note that the dead letter switch is not required to have been declared at the time the queue is declared, but the switch must exist when the message requires a dead letter route, otherwise the message will be discarded. You can also specify a routing key to use in dead-letter routing, or if not, the original routing key of the message itself will be used.

    /** * delay queue *@return* /
    @Bean
    public Queue delayQueue(a) {
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", exchangeName);
        params.put("x-dead-letter-routing-key", routingKeyName);
        return new Queue(delayQueueName, true.false.false, params);
    }
Copy the code

The code above sets the dead letter switch and dead letter routing key to queue delayQueueName with two optional parameters, that is, when a dead letter occurs in queue delayQueueName, the dead letter will be forwarded to the destination queue based on the configured dead letter switch and dead letter routing key. The consumer listens to the dead-letter queue and realizes the delayed task.

Four, code implementation

  1. This sample code implements a delayed wake up operation. The producer sends a wake up message, and the consumer listens for the wake up message in the dead letter queue. If the number of wakes up is less than 6, the consumer sends the wake up message again and waits for the next wake up. When the number of awakenings is 4, the consumer is awakened and no longer sends the message.
  2. Code structure:
  • SendWakeUpMsgReqDTO: parameter for sending messages
  • RabbitMqConfig: specifies the MQ configuration
  • RabbitMqProviderImpl: Message producer
  • RabbitMqConsumerImpl: Message consumer
  • AsyncTaskServiceImpl: Sends messages asynchronously
SendWakeUpMsgReqDTO:
import lombok.Data;

import java.io.Serializable;

/ * * *@Author Nemo Wang
 * @Date 2021/6/17 19:46
 * @DescriptionSend message parameter */
@Data
public class SendWakeUpMsgReqDTO implements Serializable {
    private static final long serialVersionUID = 6298050708365621926L;

    /** ** source */
    private String sourceName;

    /** ** send times */
    private int checkTimes;

    /** * Delay time */
    private String delayTime;

}
Copy the code
RabbitMqConfig:
/ * * *@Author Nemo Wang
 * @Date2021/6/17 summer *@DescriptionRabbitMQ configuration class */
@Configuration
public class RabbitMqConfig {

    @Value("${mq.delayQueueName}")
    private String delayQueueName;
    @Value("${mq.delayExchangeName}")
    private String delayExchangeName;
    @Value("${mq.delayRoutingKeyName}")
    private String delayRoutingKeyName;

    @Value("${mq.queueName}")
    private String queueName;
    @Value("${mq.exchangeName}")
    private String exchangeName;
    @Value("${mq.routingKeyName}")
    private String routingKeyName;

    /** * delay queue *@return* /
    @Bean
    public Queue delayQueue(a) {
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", exchangeName);
        params.put("x-dead-letter-routing-key", routingKeyName);
        return new Queue(delayQueueName, true.false.false, params);
    }

    /** * Normal execution queue *@return* /
    @Bean
    public Queue processQueue(a) {
        return new Queue(queueName, true);
    }

    /** * delay switch *@return* /
    @Bean
    public DirectExchange delayEchange(a) {
        return new DirectExchange(delayExchangeName, true.false);
    }
    /** * Common execution switch *@return* /
    @Bean
    public DirectExchange processEchange(a) {
        return new DirectExchange(exchangeName, true.false);
    }

    /** * binding binds the delay queue to the delay switch and sets it to match the key: delayRoutingKeyName *@return* /
    @Bean
    public Binding delayBinding(a) {
        return BindingBuilder.bind(delayQueue())
                .to(delayEchange())
                .with(delayRoutingKeyName);
    }

    /** * A normal queue is bound to a normal switch and set to match the routingKeyName * key@return* /
    @Bean
    public Binding processBinding(a) {
        returnBindingBuilder.bind(processQueue()) .to(processEchange()) .with(routingKeyName); }}Copy the code
RabbitMqProviderImpl:
/ * * *@Author Nemo Wang
 * @Date 2021/6/17 19:52
 * @DescriptionMessage producer */
@Slf4j
@Component
public class RabbitMqProviderImpl implements RabbitMqProvider {

    @Value("${mq.delayExchangeName}")
    private String delayExchangeName;
    @Value("${mq.delayRoutingKeyName}")
    private String delayRoutingKeyName;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /** * Send wake up message *@param reqDTO
     */
    @Override
    public void sendWakeUpMsg(SendWakeUpMsgReqDTO reqDTO) {
        log.info("Enter RabbmitMqProviderImpl.sendWakeUpMsg reqDTO={}", reqDTO);
        rabbitTemplate.convertAndSend(delayExchangeName, delayRoutingKeyName, reqDTO, message -> {
            // Set the expiration time to the expected delay time, and enter the dead-letter queue to implement the delayed task
            message.getMessageProperties().setExpiration(reqDTO.getDelayTime());
            returnmessage; }); }}Copy the code
RabbitMqConsumerImpl:
/ * * *@Author Nemo Wang
 * @Date2021/6/17 "*@DescriptionMessage consumer */
@Slf4j
@Component
public class RabbitMqConsumerImpl implements RabbitMqConsumer {

    @Autowired
    private AsyncTaskService asyncTaskService;

    /** * Listen for messages in the dead letter queue * The message in the delayed task queue expires and enters the dead letter queue. The method is invoked every time a message is heard. Determine whether CNT is 4, if ==4, wake up return; If! =4, continue to enter the MQ queue, waiting for the next wake up *@param reqDTO
     */
    @RabbitListener(queues = "${mq.queueName}")
    @Override
    public void waitingForWakeUp(SendWakeUpMsgReqDTO reqDTO) {
        log.info("Enter RabbitMqConsumerImpl.waitingForWakeUp reqDTO={}", reqDTO);

        log.info("The Consumer is sleeping :[{}]. Waiting to be awakened", reqDTO.getCheckTimes());

        if (4 == reqDTO.getCheckTimes()) {
            log.info("Consumer has been awakened.");
            return;
        }

        if (reqDTO.getCheckTimes() < 6) {
            // Re-enter the MQ queue to wake up up to 6 timessendMsg(reqDTO); }}private void sendMsg(SendWakeUpMsgReqDTO reqDTO) {
        // Number of entries to the MQ queue
        reqDTO.setCheckTimes(reqDTO.getCheckTimes() + 1);
        // Set the delay to 5 seconds
        reqDTO.setDelayTime("5000");
        log.info("RabbitMqConsumerImpl. WaitingForWakeUp. SendMsg again into the mq queue reqDTO. GetCheckTimes () = {}", reqDTO.getCheckTimes()); asyncTaskService.sendAsyncMqWakeup(reqDTO); }}Copy the code
AsyncTaskServiceImpl:
/ * * *@Author Nemo Wang
 * @Date2021/6/19 why do *@DescriptionAsynchronous calls send messages */
@Slf4j
@Component
public class AsyncTaskServiceImpl implements AsyncTaskService {

    @Autowired
    private RabbitMqProvider rabbitMqProvider;

    @Async("asyncThreadPoolTaskExecutor")
    @Override
    public void sendAsyncMqWakeup(SendWakeUpMsgReqDTO reqDTO) {
        log.info("AsyncTaskServiceImpl.sendAsyncMqWakeup reqDTO={}", reqDTO); rabbitMqProvider.sendWakeUpMsg(reqDTO); }}Copy the code