1. Introduction

RabbitMQ is a message queue, you may have heard of it, for asynchronous and decoupled application services, as well as for peak load, load, and message distribution.

One of the main functions of message queues is the decoupling of application services. Messages are transferred from message producers to message queues, and consumers get messages from message queues and consume them. Producers do not need to care about who consumes messages, and consumers do not need to care about who produces messages. In distributed systems, message queues can also be used in other places, such as distributed transaction support, represented by alibaba’s open source RocketMQ.

Of course, this article will focus on RabbitMQ.

2. The RabbitMQ is introduced

RabbitMQ is a type of messaging middleware that implements AMQP (Advanced Message Queuing Protocol). It originated in the financial system and is used to store and forward messages in distributed systems. RabbitMQ has excellent usability, scalability, and high availability. RabbitMQ is primarily implemented for bidirectional decoupling between systems. When producers churn out data and consumers can’t consume it quickly, you need an intermediate layer. Save this data.

AMQP (Advanced Message Queuing Protocol) is an open standard of application-layer protocols designed for message-oriented middleware. Message-oriented middleware is primarily used for decoupling between components so that the sender of a message does not need to be aware of the message consumer and vice versa. The main characteristics of AMQP are message orientation, queue, routing (including point-to-point and publish/subscribe), reliability, and security.

RabbitMQ is an open source IMPLEMENTATION of AMQP. The server is written in Erlang and supports a variety of clients, such as Python, Ruby,.NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP, and AJAX. It is used to store and forward messages in distributed systems, and has good performance in ease of use, scalability, and high availability.

3. Concept introduction

In the design of a normal message queue, there are several concepts: producer, consumer, and our queue. In RabbitMQ, however, a layer is added, called the Exchange, so that messages are not sent to the queue by the producer. Instead, messages are sent directly to the switch, which decides which queue to send the message to according to the switch’s scheduling policy. As shown in figure:

  • The P on the left represents the producer of the message
  • The purple X represents the switch
  • The red one on the right represents the queue

4. Exchange

So why do we need Exchange instead of sending messages directly to queues?

The core idea of THE AMQP protocol is the decoupling of producers and consumers. Producers never send messages directly to queues. Producers usually do not know whether a message will be sent to the queue, but simply send the message to a switch. It is received by Exchange and then forwarded by Exchange to Queue for storage according to a specific policy.

When Exchange receives a message, how does it know which Queue to send it to? This is where you need to understand the concepts of Binding and RoutingKey:

A Binding represents the relationship between an Exchange and a Queue, or we can simply assume that the Queue is interested in the messages on the switch, and the Binding can come with an additional parameter, RoutingKey. Exchange uses this RoutingKey to match the Binding of all the current Exchange bindings. If a match is met, a message is sent to the Exchange bound Queue. It can be distributed to different queues. The meaning of a RoutingKey depends on the type of switch.

Let’s take a look at the three main types of Exchange: Fanout, Direct, and Topic.

4.1 Direct Exchange

Direct Exchange is the default Exchange for RabbitMQ and routes messages entirely based on a RoutingKey. A RoutingKey (usually Queue Name) must be specified when Binding an Exchange and Queue, and the same RoutingKey must be specified when sending messages. The messages will be routed to the corresponding Queue.

4.2 Topic Exchange

Topic exchanges, like Direct Exchanges, need a RoutingKey to route messages. The difference is that Direct Exchange matches the RoutingKey exactly. Topic Exchange supports fuzzy matching. The wildcard characters * and # are supported respectively. * indicates that one word is matched, and # indicates that no or more words are matched.

4.3 Headers Exchange

The Headers Exchange ignores the RoutingKey and decides which queues to route to based on the Headers in the message and the Arguments specified when creating the binding.

The Performance of Headers Exchange is poor, and Direct Exchange can completely replace it, so it is not recommended.

4.4 the Default Exchange

Default Exchange is a special Direct Exchange. When you manually create a queue, the background automatically binds the queue to a Direct Exchange with an empty name. The binding RoutingKey is the same as the queue name. With this default switch and binding, we only care about the queue layer, which is suitable for some simple applications.

5. Spring Boot integrates RabbitMQ

Spring Boot RabbitMQ integration is very simple, if only for simple use with very little configuration, Spring Boot provides spring-boot-starter-AMQP project support for various messages.

5.1 Simple Use

Introduction of depend on

Code listing: spring-boot-rabbitmq/ pop.xml ***

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>Copy the code

The configuration file application.yml is as follows:

Code listing: spring – the boot – the rabbitmq/SRC/main/resources/application. The yml * * *

server:
  port: 8080
spring:
  application:
    name: spring-boot-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: adminCopy the code

The queue configuration

Code listing: spring – the boot – the rabbitmq/SRC/main/Java/com/springboot/springbootrabbitmq/config/QueueConfig Java * * *

@Configuration public class QueueConfig { @Bean public Queue simpleQueue() { return new Queue("simple"); } @Bean public Queue simpleOneToMany() { return new Queue("simpleOneToMany"); }}Copy the code

Message provider

Code listing: spring – the boot – the rabbitmq/SRC/main/Java/com/springboot/springbootrabbitmq/simple/SimpleSend Java * * *

@Component public class SimpleSend { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private AmqpTemplate amqpTemplate; public void send() { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String message = "Hello Spring Boot " + simpleDateFormat.format(new Date()); amqpTemplate.convertAndSend("simple", message); Logger. info(" Message push succeeded!" ); }}Copy the code

Message consumer

Code listing: spring – the boot – the rabbitmq/SRC/main/Java/com/springboot/springbootrabbitmq/simple/SimpleReceive Java * * *

@Component @RabbitListener(queues = "simple") public class SimpleReceive { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @RabbitHandler public void process(String message) { logger.info("Receive :{}", message); }}Copy the code

test

Code listing: spring – the boot – the rabbitmq/SRC/test/Java/com/springboot/springbootrabbitmq/DemoApplicationTests Java * * *

@RunWith(SpringRunner.class) @SpringBootTest public class DemoApplicationTests { @Autowired SimpleSend simpleSend; @Test public void simpleSend() { simpleSend.send(); }}Copy the code

5.2 One-to-many Usage

What happens if I have one producer of messages, and I have N consumers of messages?

Make a slight change to the above code to add a message consumer.

The test code is as follows:

@Test public void simpleOneSend() { for (int i = 0; i < 100; i ++) { simpleManySend.send(i); }}Copy the code

The test results show that the two consumers consume the messages produced by the producer on average.

5.3 Many-to-many

Let’s add another producer of the message. The test code is as follows:

@Test public void simpleManySend() { for (int i = 0; i < 100; i ++) { simpleManySend.send(i); simpleManySend1.send(i); }}Copy the code

The test results show that two consumers consume messages produced by two producers on average.

5.4 Topic Exchange

Configure Topic first, with the following configuration code:

Code listing: spring – the boot – the rabbitmq/SRC/main/Java/com/springboot/springbootrabbitmq/config/TopicConfig Java * * *

@Configuration public class TopicConfig { private final String message = "topic.message"; private final String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(this.message); } @Bean public Queue queueMessages() { return new Queue(this.messages); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); }}Copy the code

QueueMessages can match both route_keys, whereas queueMessages can only match topic.message.

The producer code for the message is as follows:

Code listing: spring – the boot – the rabbitmq/SRC/main/Java/com/springboot/springbootrabbitmq/topic/TopicSend Java * * *

@Component public class TopicSend { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private AmqpTemplate rabbitTemplate; public void send1() { String message = "message 1"; logger.info("send:{}", message); rabbitTemplate.convertAndSend("topicExchange", "topic.message", message); } public void send2() { String message = "message 2"; logger.info("send:{}", message); rabbitTemplate.convertAndSend("topicExchange", "topic.messages", message); }}Copy the code

A call to Send1 () message is forwarded by Exchange to both queues, while a call to send2() is forwarded only to Receive2.

5.5 the Fanout Exchange

Fanout is the familiar broadcast or subscription mode in which a message is sent to a Fanout switch and received by all queues bound to the switch.

Fanout configuration is as follows:

Code listing: spring – the boot – the rabbitmq/SRC/main/Java/com/springboot/springbootrabbitmq/config/FanoutConfig Java * * *

@Configuration public class FanoutConfig { @Bean public Queue MessageA() { return new Queue("fanout.A"); } @Bean public Queue MessageB() { return new Queue("fanout.B"); } @Bean public Queue MessageC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(MessageA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(MessageB).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue MessageC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(MessageC).to(fanoutExchange); }}Copy the code

The message producer code is as follows:

Code listing: spring – the boot – the rabbitmq/SRC/main/Java/com/springboot/springbootrabbitmq/fanout/FanoutSend Java * * *

@Component public class FanoutSend { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private AmqpTemplate rabbitTemplate; public void send() { String message = "Hello FanoutSend."; logger.info("send:{}", message); this.rabbitTemplate.convertAndSend("fanoutExchange","", message); }}Copy the code

The test code is as follows:

Code listing: spring – the boot – the rabbitmq/SRC/test/Java/com/springboot/springbootrabbitmq/DemoApplicationTests Java * * *

@Test
public void fanoutSend() {
    fanoutSend.send();
}Copy the code

The test result was that all queues bound to the FANout switch received messages.

6. sample code

Example code -Github

Example code -Gitee

Reference 7.

http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html

https://blog.csdn.net/y4x5M0nivSrJaY3X92c/article/details/80416996