RabbitMq is a message queue that we often use during development. Today we are going to investigate the use of rabbitMq.

RabbitMq’s official website: rabbitmq.com/

The rabbitMq installation will be skipped here, as I have tried several times and failed, and I will post a detailed article when it is successfully installed. The current debugging is done using the company’s environment.

1. Some concepts

RabbitMQ is an open source message broker and queue server that enables data sharing between application services (cross-platform, cross-language). RabbitMQ is written in the Erlang language and implemented over the AMQP protocol.

Abstractly, all message queue product models are similar processes. A producer creates a message and then publishes it to a message queue for consumption by consumers.

RabbitMQ is similar, with producer and consumer roles. Its internal structure is shown in the figure below.

So let’s take a look at these concepts in RabbitMQ.

1. Message:

Messages, which are the information we need to pass and share, consist of a list of optional attributes, including routing keys, priority, persistence, and so on

2. Publisher

The message producer is also a client application that publishes messages to the switch.

3. Exchange:

Switch, a very important concept in RabbitMQ, in RabbitMQ, messages generated by producers are not sent directly to queues, but to the switch, which is bound to queues and routed to queues in the peer server.

4. Binding:

Binding for the association between switches and message queues. A binding is a routing rule that connects a switch to a message queue based on a routing-key. So you can think of a switch as a routing table with bindings.

5. Queue:

Message queues, used to hold messages until they are sent to consumers. It is the container and destination of the message. A message can be placed in one or more queues. Messages remain in the queue waiting for consumers to connect to the queue to consume them.

6. Connection:

A network connection, such as a TCP connection.

7. Channel

Channel: an independent two-way data channel in a multiplexing connection. A channel is a virtual connection within a real TCP connection. The AMQP command is sent through the channel, whether it is to publish a message, subscribe to a queue, or receive a message, all these actions are done through the channel. Because it was too expensive for the operating system to establish and destroy TCP, the concept of channels was introduced to reuse a TCP connection.

8. Consumer

Message consumer, representing a client application that retrieves a message from a message queue.

9. Virtual Host

A virtual host that identifies a batch of switches, message queues, and related objects. A virtual host is a separate server domain with the same authentication and encryption environment. Each Vhost is essentially a mini rabbitMQ server with its own queue, switch, binding and permission mechanism. Vhost is the basis of the AMQP concept and must be specified at connection time. The default vhost for RabbitMQ is /.

10. Broker

Identifies the message queue server entity.

2. Exchange types

Exchange distributes messages according to different distribution policies. Currently, the Exchange distributes messages of the following types: Direct, FANout, Topic, and headers. Headers matches the header of an AMQP message rather than the routing key. In addition, the Headers and Direct switches do not perform as well as the direct switches, so they are almost useless.

2.1 Direct Switch

If the routing key in the message matches the Bing key in Binding, the switch sends the message to the queue. Routing keys must match exactly and propagate individually.

2.2 the fanout

Every message sent to a FANout switch is sent to all bound queues. Fanout switches do not deal with routing keys, but simply bind queues to the switch, and every message sent to the switch is forwarded to all queues bound to the switch. Much like subnet broadcasting, each host in a subnet gets a copy of the message. Fanout type forwarding messages is the fastest.

2.3 the topic

Topic switches use pattern matching to match routing keys to a pattern. In this case, queues must be bound to a pattern. It splits the strings of the routing and binding keys into words that are used between them. Separated. It also recognizes two wildcards: # and *. # matches zero or more words, * matches one word

3. SpringBoot integrates RabbitMQ

SpringBoot is relatively easy to integrate with rabbitMQ because SpringBoot uses RabbitTemplate to encapsulate common operations.

Let’s look at the integration process. First import the dependencies.

<! -- No need to add parent config file -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

Then configure the rabbitMQ connection information in the springBoot configuration file application.yml

server:
  port: 7890

spring:
  rabbitmq:
    host: 172.1533.52.
    port: 5672
    username: root
    password: 123456
Copy the code

Next we will demonstrate three types of switches.

3.1 direct

The first is the configuration class, where we need to declare switches, queues, and bindings.

@Configuration
public class DirectExchangeConfig {

    public static final String DIRECT_QUEUE = "directQueue";
    public static final String DIRECT_QUEUE2 = "directQueue2";
    public static final String DIRECT_EXCHANGE = "directExchange";
    public static final String DIRECT_ROUTING_KEY = "direct";

    @Bean
    public Queue directQueue(a) {
        return new Queue(DIRECT_QUEUE, true);
    }

    @Bean
    public Queue directQueue2(a) {
        return new Queue(DIRECT_QUEUE2, true);
    }

    @Bean
    public DirectExchange directExchange(a) {
        return new DirectExchange(DIRECT_EXCHANGE, true.false);
    }

    @Bean
    public Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
    }

    @Bean
    public Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) {
        returnBindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY); }}Copy the code

Here we create a switch called directExchange, bind directQueue and directQueue2 with the routing key direct.

The producer of the message, which we simulate with a Controller, refers directly to the rabbitTemplate

@RestController
@Slf4j
@RequestMapping("/direct")
public class DirectController {

    private final RabbitTemplate rabbitTemplate;

    public DirectController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /** * Direct Switches deliver messages to the corresponding queue according to the routing key carried by messages@return* /
    @GetMapping("send")
    public Object sendMsg(a) {
        rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "Send a test message: direct");
        return "Direct message sent successfully!!";
    }
Copy the code

When I access the connection in the browser, I will produce a message to the directExchange switch with the route key: direct and the message content: Send a test message: direct

Next, let’s look at consumers of news.

package com.lsqingfeng.action.rabbitmq.direct;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/ * * *@className: DirectQueueListener
 * @description: Directly connected switch listener *@author: sh.Liu
 * @date: the 2021-08-23 16:03 * /
@Slf4j
@Component
public class DirectQueueListener {

    /** * DirectReceiver consumer receives a message 1: sends a test message: Direct * DirectReceiver consumer receives a message 2: Send a test message: Direct * DirectReceiver Consumer receives a message 1: send a test message: Direct * DirectReceiver Consumer receives a message 2: send a test message: Direct * * A switch can be bound to multiple queues. If multiple queues can be matched through the routing key, only one queue can be consumed *@param testMessage
     */
    @RabbitHandler
    @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)
    public void process(String testMessage) {
        System.out.println("DirectReceiver Consumer receives message 1:" + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)
    public void process2(String testMessage) {
        System.out.println("DirectReceiver Consumer receives message 2:" + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2)
    public void process3(String testMessage) {
        System.out.println("DirectReceiver Consumer receives message 3:"+ testMessage); }}Copy the code

When we access the browser production message, watch the console result:

DirectReceiver The consumer receives a test message. DirectReceiver The consumer receives a test message. DirectReceiver The consumer receives a test message

In sending once:

DirectReceiver Consumer receives message 3: sends a test message: Direct DirectReceiver Consumer receives message 2: sends a test message: Direct

Since we have two queues bound to the switch and the routeKey is the same, we print two. Note that direct can only be consumed if the routeKey matches exactly, and messages in each queue can only be consumed once.

3.2 the fanout

The configuration class:

@Configuration
public class FanoutExchangeConfig {

    public static final String FANOUT_QUEUE = "fanoutQueue";
    public static final String FANOUT_QUEUE2 = "fanoutQueue2";
    public static final String FANOUT_QUEUE3 = "fanoutQueue3";
    public static final String FANOUT_EXCHANGE = "fanoutExchange";
    public static final String FANOUT_ROUTING_KEY = "fanout";

    @Bean
    public Queue fanoutQueue(a) {
        return new Queue(FANOUT_QUEUE, true);
    }

    @Bean
    public Queue fanoutQueue2(a) {
        return new Queue(FANOUT_QUEUE2, true);
    }

    @Bean
    public FanoutExchange fanoutExchange(a) {
        return new FanoutExchange(FANOUT_EXCHANGE, true.false);
    }

    @Bean
    public Binding bindingFanoutExchange(Queue fanoutQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }

    @Bean
    public Binding bindingFanoutExchange2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
Copy the code

Note that there is no need to specify a routing-key in this mode because all the bound queues will receive messages.

The producer code is as follows:

@RestController
@Slf4j
@RequestMapping("/fanout")
public class FanoutController {

    private final RabbitTemplate rabbitTemplate;

    public FanoutController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /** * Messages are sent to all bound queues. *@return* /
    @GetMapping("send")
    public Object sendMsg(a) {
        rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null."Send a test message: fanout");
        return "Fanout message sent successfully!!"; }}Copy the code

Consumers of information:

@Slf4j
@Component
public class FanoutQueueListener {

    /** * Fanout switch: this switch does not have the concept of routing keys, even if you bind routing keys is ignored. When the switch receives a message, it forwards it directly to all queues bound to it * the same queue listens multiple times and consumes only once. * Multiple queues bound to the switch can receive messages *@param testMessage
     */
    @RabbitHandler
    @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE)
    public void process(String testMessage) {
        System.out.println("FanoutReceiver Consumer receives message 1:" + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE)
    public void process2(String testMessage) {
        System.out.println("FanoutReceiver Consumer receives message 2:" + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE2)
    public void process3(String testMessage) {
        System.out.println("FanoutReceiver Consumer receives message 3:"+ testMessage); }}Copy the code

Print result:

FanoutReceiver The consumer receives a test message: FANout. The consumer receives a test message: FANout. The consumer receives a test message: FANout

Because methods 1 and 2 listen on the same queue, only one can consume successfully. Execute multiple times, alternating between the two methods.

3.3 the topic

The topic switch sends messages to the queue that meets the routing-key matching rules.

The configuration class:

/ * * *@className: TopicExchangeConfig
 * @description: * * (asterisk) is used to indicate a word (must appear) * # (hash sign) is used to indicate any number (zero or more) words *@author: sh.Liu
 * @date: the 2021-08-23 15:49 * /
@Configuration
public class TopicExchangeConfig {

    public static final String TOPIC_QUEUE = "topicQueue";
    public static final String TOPIC_QUEUE2 = "topicQueue2";
    public static final String TOPIC_QUEUE3 = "topicQueue3";
    public static final String TOPIC_EXCHANGE = "topicExchange";
    public static final String TOPIC_ROUTING_KEY = "topic*";

    @Bean
    public Queue topicQueue(a) {
        return new Queue(TOPIC_QUEUE, true);
    }

    @Bean
    public Queue topicQueue2(a) {
        return new Queue(TOPIC_QUEUE2, true);
    }

    @Bean
    public Queue topicQueue3(a) {
        return new Queue(TOPIC_QUEUE3, true);
    }

    @Bean
    public TopicExchange topicExchange(a) {
        return new TopicExchange(TOPIC_EXCHANGE, true.false);
    }

    @Bean
    public Binding bindingTopicExchange(Queue topicQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#");
    }

    @Bean
    public Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue2).to(topicExchange).with("test.#");
    }

    @Bean
    public Binding bindingTopicExchange3(Queue topicQueue3, TopicExchange topicExchange) {
        return BindingBuilder.bind(topicQueue3).to(topicExchange).with("#"); }}Copy the code

Note our bind pipe relationship here. #, test.*, #

#: represents all, * represents one and only.

The sender of the message, we’ll take the routingKey argument so we can see the effect:

@RestController
@Slf4j
@RequestMapping("/topic")
public class TopicController {

    private final RabbitTemplate rabbitTemplate;

    public TopicController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @GetMapping("send")
    public Object sendMsg(String routingKey) {
        rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, routingKey, "Send a test message: topic");
        return "Topic message sent successfully!!";
    }
Copy the code

Consumers of information:

/ * * *@className: TopicQueueListener
 * @description: Topic switch listener *@author: sh.Liu
 * @date: the 2021-08-23 16:03 * /
@Slf4j
@Component
public class TopicQueueListener {

    /** * topic: topic switch *@param testMessage
     */
    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE)
    public void process(String testMessage) {
        System.out.println("TopicReceiver Consumer receives message 1:" + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE)
    public void process2(String testMessage) {
        System.out.println("TopicReceiver Consumer receives message 2:" + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE2)
    public void process3(String testMessage) {
        System.out.println("TopicReceiver Consumer receives message 3:" + testMessage);
    }

    @RabbitHandler
    @RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE3)
    public void process4(String testMessage) {
        System.out.println("TopicReceiver Consumer receives message 4:"+ testMessage); }}Copy the code

Request: http://localhost:7890/topic/send? routingKey=test.a

Results:

TopicReceiver Consumer receives a message 3: sends a test message: Topic TopicReceiver consumer receives a message 4: sends a test message: topic

Indicates: test.* and # match the route key successfully

Request: http://localhost:7890/topic/send? routingKey=topic.123

TopicReceiver Consumer receives message 1: sends a test message: Topic TopicReceiver consumer receives message 4: sends a test message: topic

Topic.# and # matched successfully

Request: http://localhost:7890/topic/send? routingKey=test

TopicReceiver The consumer receives message 4: sends a test message: Topic

Test.* must have a word after it

Request: http://localhost:7890/topic/send? routingKey=test.aaa

TopicReceiver Consumer receives a message 4: sends a test message: Topic TopicReceiver consumer receives a message 3: sends a test message: topic

Test.* and # matched successfully

Request: http://localhost:7890/topic/send? routingKey=test.aaa.b

TopicReceiver The consumer receives message 4: sends a test message: Topic

Only # matches because test.* matches only one word, and aaa. B represents two