What is a message queue?

We can think of a message queue as a container for storing messages that can be taken out of the container for consumption when they are needed. As shown below:

                          

2. Background of message queue generation

In today’s Internet environment, with the rapid growth of user traffic, because the traditional application in the system interface and service processing module level still continue to use “high coupling” and “synchronous” way to handle it, cause the interface due to the thread block extended the overall response time, namely interface response “because”, leading to poor user experience.

The use of message queue can decouple multiple modules, realize asynchronous communication, and reduce the response time of system integration. Message queuing can also perform peak shaving/limiting on interfaces with high concurrent requests.

Here’s an example:

Down the activity of the electric business platform, the traditional process: in seconds kill activities began at that moment, will produce a great user snapping up the request, the request is almost at the same time reach the backend system interface, this system dealing with too much pressure, resulting in a large number of failure or even collapse, which is user request pressure = business systems.

After introducing the message queue, when users are snapping up front of high concurrent requests, does not like headless flies to the backend system interface, but, like the current limiting subway to work every day, will these requests according to the rules of the first to join the MQ queue, thus realize the seconds kill activity the peak clipping of the interface/current limit.

3. Common message queues and their application scenarios

There are several common message queues, such as ActiveMQ, RabbitMQ, RacketMQ and Kafka. Their differences are shown in the following figure:

As you can see, RabbitMQ is fully functional and has an active community that can be relied upon to solve bugs encountered during development. It is suitable for small and medium sized software companies. In view of RabbitMQ’s popularity, the following introduces the core concepts of RabbitMQ and integrates them with SpringBoot to enable RabbitMQ to send and receive messages.

4. The core concept of RabbitMQ is analogous to sending and picking up packages

The core concepts of RabbitMQ: producer, message model, consumer.

The message model is made up of switches, routes and queues.

To understand the concept of RabbitMQ, let’s combine a real world delivery and pickup scenario.

Xiao Qin lives in Wuhan and needs to send a express package to Xiao Wang in Shenzhen. The specific situation of sending and receiving express is as follows:

Qin gives the package to the Courier, who then sends it from Wuhan to Shenzhen by some means of transportation. Xiao Wang picks up the package from the Shenzhen delivery point.

By analogy, the corresponding relation can be obtained as follows:

Celery: The producer

Express package: Message

Courier: Switch

Some form of transport: routing

Wuhan to Shenzhen: queue name

Xiao Wang: Consumers

Therefore, the life scene of sending and receiving express can be regarded as the following message transmission process:

The producer sends the message to the switch, and then the switch sends the message to the queue from Wuhan to Shenzhen through routing. The consumer takes out the message for processing. Producer-message-message model-consumer schematic, as shown below:

5. RabbitMQ is integrated with SpringBoot to send and receive messages

5.1. The SpringBoot project poM file introduces the RabbitMQ starter dependency

<! -- Initial dependencies for RabbitMQ, And spring-boot into a Jar package -- > < the dependency > < groupId > org. Springframework. Boot < / groupId > < artifactId > spring - the boot - starter - it < / artifactId > < version > 1.5. 6.RELEASE</version></dependency>Copy the code

5.2. Configure host and port information for RabbitMQ in the configuration file

Add host host address, port port number, username, password,

Virtual-host, as shown below: Rabbitmq. virtual-host=/# Configuration of the host, port, user name, password, spring. The rabbitmq. Host = 127.0.0.1 spring. The rabbitmq. Port = 5672 spring. The rabbitmq. Username = guestspring. The rabbitmq. Pa ssword=guestCopy the code

Then configure the queue, switch, route information:

| | | | | | | | | | | | | | String information mq. Basic. Info. Queue. Name = ${mq. Env}.. Middleware mq. Basic. Info. Queue. Demo1mq. Basic. Info. Exchange. The name = ${mq. Env}. The middle ware.mq.basic.info.exchange.demo1mq.basic.info.routing.key.name=${mq.env}.middleware.mq.basic.info.routing.key.demo1Copy the code

5.3. Customize injection Bean related components

RabbitMQ can be configured and used incorrectly on a real project and can lead to a variety of headaches, often asked by interviewers: How do you prevent message loss? How to ensure that consumption is not repeated consumption?

I had a message loss problem that I solved at the cost of a handful of hair.

To do this, I followed RabbitMQ tutorials and videos, and RabbitMQ has three guidelines for ensuring high availability and confirmation consumption.

That is, if we want to ensure high availability and confirmation consumption of messages, we need to follow these three guidelines.

(1) Sending confirmation mechanism of producers;

(2) Set persistence mode when creating queue and switch messages;

(3) Ack mechanism of consumer confirmation consumption.

(1) Sending confirmation mechanism of producers

Located in RabbitmqConfig# rabbitTemplate ()

@bean (name = "rabbitMQTemplate")public RabbitTemplate RabbitTemplate () {// Producers confirmation message is sent over the connectionFactory. SetPublisherConfirms (true); / / producer after sending a message, return feedback message connectionFactory. SetPublisherReturns (true); RabbitTemplate RabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); // If the producer sends a message successfully, We print the message is sent successfully log information rabbitTemplate. SetConfirmCallback (new rabbitTemplate. ConfirmCallback () {public void confirm(CorrelationData correlationData, boolean ack, String cause) {log. The info (" message is sent successfully: correlationData ({}), an ack ({}), cause ({}) ", correlationData, ack, cause); }}); // If the producer fails to send the message, The output "messages" failure log information rabbitTemplate. SetReturnCallback (new rabbitTemplate. ReturnCallback () {public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {the info (the "message loss: exchange ({}), the route ({}), replyCode ({}), replyText ({}), message: {}", exchange, routingKey, replyCode, replyText, m essage); }}); return rabbitTemplate; }Copy the code

(2) Create queues, switches, messages, and set persistence mode

RabbitmqConfig# basicQueue (), DirectExchange basicExchange ()

BasicPublisher#sendMsg()

1) Create a queue and set persistence

@bean (name = "basicQueue")public Queue basicQueue() {return new Queue(env.getProperty("mq.basic.info.queue.name"), true); }Copy the code

2) Create switch Settings for persistence

3) Create the message and set the persistence mode

// Set the Message persistence mode MessageBuilder.withBody(messageStr.getBytes("utf-8")).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();Copy the code

(3) Ack mechanism of consumer confirmation consumption

There are three confirmation mechanisms for consumers: None, Auto, and Manual

None: No confirmation message is sent, that is, the consumer sends any feedback to the MQ server;

Auto: the consumer automatically confirms the purchase. After the consumer processes the message, it needs to send an automatic ACK message to the MQ server, after which the message is removed from the MQ queue. The underlying implementation logic is that components built into RabbitMQ automatically send confirmation feedback.

“Manual” : manually confirm the consumption mechanism. After the consumer processes the message, it needs to manually send an ACK feedback to the MQ server “in code”.

RabbitmqConfig#listenerContainer()

/*** 3 Consumer acknowledgement Ack mechanism * single consumer - Ack acknowledgement Ack mode AUTO* @return*/ @bean (name = "singleListenerContainer")public SimpleRabbitListenerContainerFactory listenerContainerAuto(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); / / set confirmation consumption patterns for automatic consumption - AUTOfactory. SetAcknowledgeMode (AcknowledgeMode. AUTO); return factory; }Copy the code

5.4 RabbitMQ send and receive combat

(1) Define the producer

@Component@Slf4jpublic class BasicPublisher {@Autowiredprivate RabbitTemplate rabbitTemplate; @AutowiredEnvironment env; Public void sendMsg(String messageStr) {if (! Strings.isNullOrEmpty(messageStr)) {try {rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(env.getProperty("mq.basic.info.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("mq.basic.info.routing.key.name")); // Set the Message persistence mode MessageBuilder.withBody(messageStr.getBytes("utf-8")).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); rabbitTemplate.convertAndSend(message); Log.info (" Basic message model - producer - send message: {}", messageStr); } the catch (UnsupportedEncodingException e) {log. The error (" the basic message model - producers - send a message an exception occurs: {} ", messageStr, e.f illInStackTrace ()); }}}}Copy the code

(2) Create consumers

@Component@Slf4jpublic class BasicConsumer {/*** * listen to messages in queues */ @rabbitListener (queues = "${mq.basic.info.queue. Name}", containerFactory = "singleListenerContainer")public void consumerMsg(@Payload byte[] msg) {try {String messageStr = new String(msg, "utf-8"); Log.info (" Basic message model - consumer - listen for and consume messages: {}", messageStr); } the catch (UnsupportedEncodingException e) {log. The error (" basic message model - consumers - an exception occurs: ", e.f illInStackTrace ()); }}}Copy the code

(3) Write a unit test to send a string of characters to a message queue

@Slf4j@RunWith(SpringJUnit4ClassRunner.class)@SpringBootTestpublic class RabbitMQTest {@Autowiredprivate BasicPublisher basicPublisher; @testPublic void testBasicMessageModel() {String msgStr = "~~~ this is a String of String messages ~~~~"; basicPublisher.sendMsg(msgStr); }}Copy the code

(4) Run unit tests

For the written unit tests, the test results are shown below:

As you can see from the figure, the producer sends a message: “~~~ this is a string message ~~~~” to the message queue; The consumer listens and processes the message, and then the consumer prints the message content.

  1. This completes the sending and receiving of RabbitMQ messages and configures RabbitMQ for high availability and confirmation consumption. Follow these three rules:
  • Producer sending confirmation mechanism;

  • Set persistence mode when creating queue and switch messages.

  • Consumer confirmation Ack mechanism.

RabbitMQ send and receive message combat, the project link author placed in the code cloud warehouse:

Gitee.com/qinstudy/sp…

I have now completed the RabbitMQ sending and receiving practice and will continue with other RabbitMQ tutorials, such as RabbitMQ dead-letter queues.