SpringBoot integrates RabbitMQ to implement delayed messaging

Like is the best encouragement to create!

The environment that

Erlang version: 21.3

RabbitMQ version: 3.7.14

SpringBoot version: 2.3.3.release

Rabbitmq_delayed_message_exchange plug-in

Download address: www.rabbitmq.com/community-p…

Rabbitmq-delayed – message-Exchange v3.8 is suitable for RabbitMQ3.7.x. The plugin must be compatible with the RabbitMQ version, otherwise it will encounter incompatibility issues when using delayed messages.

Put the plugins in the RabbitMQ installation directory and run the following command to start the plugin:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Copy the code
The following plugins have been configured:
	rabbitmq_delayed_message_exchange
Copy the code

After starting the plugin, restart RabbitMQ for the plugin to take effect

Restart the RabbitMQ service using two commands:

rabbitmqctl Stop: stop the RabbitMQ
rabbitmq-server Restart: restart the RabbitMQ
    
Note: There is no restart command for rabbitmqctl, so you need to run the above commands to restart RabbitMQ
Copy the code

Integrated the RabbitMQ

Add a RabbitMQ dependency to the pom.xml file

<! -- Integrate message queues -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
Copy the code

Configure RabbitMQ connection information

spring:
  rabbitmq:
    host: 127.0. 01.
    port: 5672
    username: guest
    password: guest
    virtual-host: /
Copy the code

Define ConnectionFactory and RabbitTemplate

package com.ozx.rabbitmqconsumer.config;

import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/ * * *@ClassName: RabbitMqConfig
 * @Description: RabbitMQ configuration *@Author Gxin
 * @Date 2021/6/24 16:06
 * @Version: 1.0 * * /
@Data
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
    private String host;
    private int port;
    private String userName;
    private String password;

    @Bean
    public ConnectionFactory connectionFactory(a) {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port);
        cachingConnectionFactory.setUsername(userName);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setVirtualHost("/");
        cachingConnectionFactory.setPublisherConfirms(true);
        return cachingConnectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(a) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        returnrabbitTemplate; }}Copy the code

Configure Queue, switch, and routing keys

package com.ozx.rabbitmqconsumer.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/ * * *@ClassName: QueueConfig
 * @Description: Define queue, routing key, switch, routing key bound to switch, switch dispatch message corresponding queue *@Author Gxin
 * @Date2021/6/23 now *@Version: 2.0 * * /
@Configuration
public class QueueConfig {
    /** * Dispatch the common message switch */
    @Bean
    public TopicExchange topicExchange(a){
       return  new TopicExchange("ordinary_exchange".true.false);
    }

    @Bean
    public Queue queue(a) {
        Queue queue = new Queue("ordinary_queue".true);
        return queue;
    }

    @Bean
    public Binding binding(a) {
        return BindingBuilder.bind(queue()).to(topicExchange()).with("ordinary_queue");
    }

    /** * Dispatch delay message switch */
    @Bean
    public CustomExchange delayExchange(a){
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("x-delayed-type"."direct");
        return new CustomExchange("delay_exchange"."x-delayed-message".true.false,paramMap);
    }

    @Bean
    public Queue delayQueue(a){
        Queue delayQueue = new Queue("delay_queue".true);
        return delayQueue;
    }

    @Bean
    public Binding delayMessagebinding(a){
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay_queue").noargs(); }}Copy the code

Pay attention to

Deferred messages use CustomExchange instead of DirectExchange, TopicExchange, and the CustomExchange must be of type X-delayed – Message

Implementing delayed messages

package com.ozx.rabbitmqconsumer.service.impl;

import com.ozx.rabbitmqconsumer.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

/ * * *@ClassName: MessageServiceImpl
 * @Description: Producer production message *@Author Gxin
 * @Date 2021/6/23 17:01
 * @Version: 2.0 * * /
@Service
@Slf4j
public class MessageServiceImpl implements MessageService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void sendMsg(String queueName,String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("Message sent time :"+sdf.format(new Date()));
        rabbitTemplate.convertAndSend("ordinary_exchange", queueName, msg);
    }

    /** * Implement delayed messages *@param queueName
     * @param message
     */
    @Override
    public void sendDelayMessage(String queueName, String message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("Message sending time: {}",sdf.format(new Date()));
        rabbitTemplate.convertAndSend("delay_exchange", queueName, message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setHeader("x-delay".3000);
                returnmessage; }}); }}Copy the code

Pay attention to

To send a message, add a Header request Header. X-delay sets the delay time to 3s

The informant consumes the message

package com.ozx.rabbitmqconsumer.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/ * * *@ClassName: MessageReceiver
 * @Description: Consumer receives and processes messages *@Author Gxin
 * @Date2021/6/23 now *@Version: 2.0 * * /
@Component
@Slf4j
public class MessageReceiver {
    @RabbitListener(queues = "ordinary_queue")
    public void receive(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("Message received time :{}, received message :{}",sdf.format(new Date()),msg);
    }

    @RabbitListener(queues = "delay_queue")
    public void receiveDelayMessage(String message) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("Message received time :{}, received message :{}",sdf.format(newDate()),message); }}Copy the code

The Controller layer

package com.ozx.rabbitmqconsumer.controller;

import com.ozx.rabbitmqconsumer.common.ApiRest;
import com.ozx.rabbitmqconsumer.common.BaseController;
import com.ozx.rabbitmqconsumer.service.MessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/ * * *@ClassName: MessageController
 * @Description: Integrated with RabbitMQ for delayed messaging *@Author Gxin
 * @Date2021/6/23 * roar,@Version: 2.0 * * /
@RestController
public class MessageController extends BaseController {

    @Autowired
    private MessageService messageService;

    @GetMapping("send")
    public ApiRest sendMessage(String queueName, String msg){
        messageService.sendMsg(queueName,msg);
        return this.success();
    }

    @GetMapping("delaySend")
    public ApiRest sendDelayMessage(String queueName,String message){
        messageService.sendDelayMessage(queueName, message);
        return this.success(); }}Copy the code

Using Postman to debug the sending message interface, the results are as follows

The console output logs are as follows:

The results of implementing delayed messages are as follows:

After a delay of 3S, the message is received and processed by the sender