An overview of the


Once went to netease interview, the interviewer asked me a question, said

After placing an order, what can I do if the user does not pay and needs to cancel the order

My answer was to scan the DB table with a scheduled task. The interviewer was not satisfied and suggested:

The quasi-real-time notification cannot be achieved with scheduled tasks. Is there any other way?

My answer was:

You can use a queue. After the order is placed, send a message to the queue and specify the expiration time. When the time is up, execute the callback interface.

After listening to the interview questions, I stopped asking them. In fact, I was on the right track, but it was not very professional. The technical term is to take advantage of delayed messages.

In fact, there are some problems with timed tasks. Originally, the business system hoped that if the order was not paid after 10 minutes, it would immediately cancel the order and release the inventory. However, once the amount of data is large, it will prolong the time to obtain the data of unpaid orders. Some orders will be canceled after less than 10 minutes, maybe 15 minutes, 20 minutes and so on. In this case, the inventory can not be released in time, and will affect the singular. With the delay message, it is theoretically possible to cancel the order according to the set time.

Most of the articles on implementing delayed messages using RabbitMQ are about using RabbitMQ’s dead-letter queue. The implementation seems to be very complicated, and the original RabbitMQ Client API is even more verbose.

Spring Boot has wrapped the RabbitMQ Client API to make it much simpler to use. Here is how to use rabbitMQ_delayed_message_exchange and Spring Boot to implement delayed messages.


Software to prepare


erlang

Please refer to installing Erlang in Windows 10

The version used in this article is:

Erlang 20.3


RabbitMQ

For details, see Installing RabbitMQ in Windows 10

This article uses the Windows version of RabbitMQ:

3.7.4


Rabbitmq_delayed_message_exchange plug-in

Plug-in download address:

www.rabbitmq.com/community-p…

After opening the url, CTRL + F, searchrabbitmq_delayed_message_exchange.

Be sure to select the version number as I am using RabbitMQ 3.7.4 so the corresponding rabbitmq_delayed_message_exchange plugin must be 3.7.x.

If you don’t choose the right version, you can run into all sorts of weird problems when using delayed messages, and there are no online solutions. I’ve been wrestling with this problem all night. Remember to choose the right version of the plug-in.

Once you have downloaded the plug-in, place it in the plugins directory of the RabbitMQ installation directory and start it with the following command:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

If the startup is successful, the following information is displayed:

The following plugins have been enabled: rabbitmq_delayed_message_exchange

After successfully starting the plug-in, remember to restart RabbitMQ for it to take effect.


Integrated the RabbitMQ


This is as simple as adding it directly to the Maven project’s POM.xml file

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>Copy the code

For the Spring Boot version I used 2.0.1.release.

Next add the Redis configuration to the application.properties file:

Spring. The rabbitmq. Host = 127.0.0.1 spring. The rabbitmq. Port = 5672 spring. The rabbitmq. Username = guest spring. The rabbitmq. Password = guestCopy the code

Define ConnectionFactory and RabbitTemplate


Also very simple, the code is as follows:

package com.mq.rabbitmq; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMqConfig { private String host; private int port; private String userName; private String password; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(host,port); cachingConnectionFactory.setUsername(userName); cachingConnectionFactory.setPassword(password); cachingConnectionFactory.setVirtualHost("/"); cachingConnectionFactory.setPublisherConfirms(true); return cachingConnectionFactory; } @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; }}Copy the code

Exchange and Queue configurations


package com.mq.rabbitmq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class QueueConfig { @Bean public CustomExchange delayExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("test_exchange", "x-delayed-message",true, false,args); } @Bean public Queue queue() { Queue queue = new Queue("test_queue_1", true); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(delayExchange()).with("test_queue_1").noargs(); }}Copy the code

It is important to note that you are using CustomExchange, not DirectExchange, and that the CustomExchange type must be X-delayed – Message.


Implement message sending


package com.mq.rabbitmq; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; @Service public class MessageServiceImpl { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsg(String queueName,String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(" format(new Date())); rabbitTemplate.convertAndSend("test_exchange", queueName, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay",3000); return message; }}); }}Copy the code

Note that you must send a header

x-delay

In this case, I set the delay to 3 seconds.


Message consumer


package com.mq.rabbitmq;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class MessageReceiver {

    @RabbitListener(queues = "test_queue_1")
    public void receive(String msg) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息接收时间:"+sdf.format(new Date()));
        System.out.println("接收到的消息:"+msg);
    }
}Copy the code

Run the Spring Boot program and send messages


Spring Boot will automatically parse the MessageReceiver class by running Spring Boot directly in the main method.

Now you just need to run the interface that sends the message with Junit.

package com.mq.rabbitmq; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private MessageServiceImpl messageService; @Test public void send() { messageService.sendMsg("test_queue_1","hello i am delay msg"); }}Copy the code

After running, you can see the following information:

Message sent time :2018-05-03 12:44:53

After 3 seconds, the Spring Boot console will print:

Received message :hello I am delay MSG