RabbitMQ is a message queue used to asynchronize and decouple applications, as well as to buffer and distribute messages.

Messaging-oriented middleware is becoming more and more popular in Internet companies. I just read that Alibaba is donating RocketMQ to Apache, but of course RabbitMQ is the main topic today. The main function of message middleware is decoupling. The most standard usage of middleware is that producers produce messages and send them to queues, and consumers take messages from queues and process them. Producers do not care about who consumes them, and consumers do not care about who produces messages, so as to achieve the purpose of decoupling. In distributed systems, message queues are also used for many other purposes, such as support for distributed transactions, RPC calls, and so on.

ActiveMQ has been used in the past, but there are some small problems in the actual production and use. After looking up a lot of information on the network, we decided to try to use RabbitMQ to replace ActiveMQ. The characteristics of RabbitMQ such as high availability, high performance and flexibility attracted us, so we compiled this article by looking up some information.

Introduce the RabbitMQ

RabbitMQ is a type of messaging middleware that implements AMQP (Advanced Message Queuing Protocol). It originated in the financial system and is used to store and forward messages in distributed systems. RabbitMQ has excellent usability, scalability, and high availability. RabbitMQ is primarily implemented for bidirectional decoupling between systems. When producers churn out data and consumers can’t consume it quickly, you need an intermediate layer. Save this data.

AMQP (Advanced Message Queuing Protocol) is an open standard of application-layer protocols designed for message-oriented middleware. Message-oriented middleware is primarily used for decoupling between components so that the sender of a message does not need to be aware of the message consumer and vice versa. The main characteristics of AMQP are message orientation, queue, routing (including point-to-point and publish/subscribe), reliability, and security.

RabbitMQ is an open source IMPLEMENTATION of AMQP. The server is written in Erlang and supports a variety of clients, such as Python, Ruby,.NET, Java, JMS, C, PHP, ActionScript, XMPP, STOMP, and AJAX. It is used to store and forward messages in distributed systems, and has good performance in ease of use, scalability, and high availability.

Relevant concepts

When we talk about queued services, we usually have three concepts: sender, queue and receiver. RabbitMQ abstracts this basic concept further by adding an Exchange between the sender and queue. In this way, there is no direct contact between the sender and the queue. Instead, the sender sends the message to the exchange, which in turn sends the message to the queue according to the scheduling policy.

  • The P on the left represents the producer, the program that sends messages to RabbitMQ.
  • In the middle is RabbitMQ,This includes switches and queues.
  • The C on the right represents the consumer, the program that takes the message to RabbitMQ.

There are four important concepts: virtual host, switch, queue, and binding.

  • Virtual host: A virtual host holds a set of switches, queues, and bindings. Why multiple virtual hosts? Very simple, RabbitMQ,Users can only control permissions at the granularity of the virtual host.Therefore, if you want to deny group A access to group B’s switches/queues/bindings, you must create A virtual host for both group A and group B. Each RabbitMQ server has a default virtual host /.
  • Switch:Exchange is used to forward messages, but it does not store themIf there is no Queue bind to the Exchange, it simply dismisses messages sent by the Producer.

    Here’s an important concept:The routing key. When the message arrives at the switch, the interaction will forward it to the corresponding queue, which queue will be forwarded according to the route key.
  • Binding: That is, the switch needs to be bound to queues, which, as shown in the figure above, is many-to-many.

Switches (Exchange)

The switch receives messages and forwards them to the bound queue. The switch does not store messages. If the ACK mode is enabled, the switch returns an error if it cannot find the queue. There are four types of switches: Direct, Topic, Headers and Fanout

  • Direct: The behavior of the Direct type is “match first, then deliver “. When a routing_key is set at binding time, messages that match the routing_key will be sent to the binding queue by the exchange.
  • Topic: Forwarding messages according to rules (most flexible)
  • Headers: switch for which the header attribute type is set
  • Fanout: Forwards messages to all bound queues

Direct Exchange Direct Exchange is the default switch mode for RabbitMQ. It is also the simplest mode.

The first X-Q1 has a binding key named orange; X-q2 has two binding keys named black and green. When the routing key in the message corresponds to the binding key, then we know which queue the message is going to.

Ps: Why are there two binding keys for X to Q2: black and green? – This is mainly because there may be Q3 again, and Q3 accepts only black’s information, while Q2 accepts not only black’s information but also Green’s information.

Topic Exchange

Topic Exchanges forward messages primarily based on wildcards. On this switch, the binding of queues and switches defines a routing pattern, and wildcards must match the routing pattern and the routing key before the switch can forward messages.

In this switch mode:

  • The routing key must be a string of characters, terminated by a period (.)..), such as inspiration.us, or inspiration.eu. Stockholm, etc.
  • Routing mode must contain an asterisk (*), a word that matches the location specified by the routing key. For example, a routing pattern looks like this: agreements.. B.*, then only the following routing keys are matched: the first word is agreements, and the fourth word is b. A hashtag (#) is used to indicate the equivalent of one or more words. For example, if the matching pattern is entrops.eu.berlin.#, then routes starting with entrops.eu.berlin are acceptable.

The code is sent the same way: the first parameter represents the switch, the second parameter represents the routing key, and the third parameter is the message. As follows:

rabbitTemplate.convertAndSend("testTopicExchange"."key1.a.c.key2"." this is RabbitMQ!");Copy the code

Topic is similar to direct, except that it matches “patterns”. In the routing_key form of “dots “, you can use two wildcards:

  • *Means a word.
  • #Means zero or more words.

Headers Exchange

Headers also matches by rule, and headers is a type of custom matching rule, as opposed to direct and topic, which use a routing_key. When a queue is bound to an exchange, a set of key-value pair rules are set, and a message contains a set of key-value pairs (headers attributes). When one or all of these key-value pairs match, the message is posted to the corresponding queue.

Fanout Exchange

Fanout Exchange message broadcast mode, regardless of routing key or routing mode, sends messages to all queues bound to it, and is ignored if routing_key is configured.

The RabbitMQ springboot integration

Springboot integration with RabbitMQ is very simple and provides a variety of spring-boot-starter-AMQP project support for messages, if only for simple use with minimal configuration.

Simple to use

1, configure poM package, mainly add spring-boot-starter-AMQP support

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>Copy the code

2. Configuration files

Configure the rabbitMQ installation address, port, and account information

Spring. The application. The name = spirng - the boot - the rabbitmq spring. The rabbitmq. Host = 192.168.0.86 spring. The rabbitmq. Port = 5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123456Copy the code

3. Queue configuration

@Configuration
public class RabbitConfig {

    @Bean
    public Queue Queue(a) {
        return new Queue("hello"); }}Copy the code

3. Sender

RabbitTemplate is the default implementation provided by SpringBoot

public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(a) {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("hello", context); }}Copy the code

4. Receiver

@Component
@RabbitListener(queues = "hello")
public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver : "+ hello); }}Copy the code

5, test,

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {

    @Autowired
    private HelloSender helloSender;

    @Test
    public void hello(a) throws Exception { helloSender.send(); }}Copy the code

Note that the queue names of the sender and receiver must be the same, otherwise they cannot receive

Many-to-many

What happens to one sender, N receivers or N senders and N receivers?

Receiver1 and Receiver2 are registered on the Receiver side. The sender side adds the parameter count, and the Receiver side prints the received parameter. The following is the test code, sending 100 messages, to observe the execution effect of the two receivers

@Test
public void oneToMany(a) throws Exception {
    for (int i=0; i<100;i++){
        neoSender.send(i);
    }
}Copy the code

The results are as follows:

Receiver 1: spirng boot neo queue ****** 11
Receiver 2: spirng boot neo queue ****** 12
Receiver 2: spirng boot neo queue ****** 14
Receiver 1: spirng boot neo queue ****** 13
Receiver 2: spirng boot neo queue ****** 15
Receiver 1: spirng boot neo queue ****** 16
Receiver 1: spirng boot neo queue ****** 18
Receiver 2: spirng boot neo queue ****** 17
Receiver 2: spirng boot neo queue ****** 19
Receiver 1: spirng boot neo queue ****** 20Copy the code

According to the returned results, the following conclusions are drawn

One sender, N receivers, is tested to evenly send messages to N receivers

Send many-to-many

Made a copy of the sender, tagged it, and sent it alternately in a hundred loops

@Test
    public void manyToMany(a) throws Exception {
        for (int i=0; i<100;i++){
            neoSender.send(i);
            neoSender2.send(i);
        }
}Copy the code

The results are as follows:

Receiver 1: spirng boot neo queue ****** 20
Receiver 2: spirng boot neo queue ****** 20
Receiver 1: spirng boot neo queue ****** 21
Receiver 2: spirng boot neo queue ****** 21
Receiver 1: spirng boot neo queue ****** 22
Receiver 2: spirng boot neo queue ****** 22
Receiver 1: spirng boot neo queue ****** 23
Receiver 2: spirng boot neo queue ****** 23
Receiver 1: spirng boot neo queue ****** 24
Receiver 2: spirng boot neo queue ****** 24
Receiver 1: spirng boot neo queue ****** 25
Receiver 2: spirng boot neo queue ****** 25Copy the code

Conclusion: As with one-to-many, the receiver still receives the message evenly

Use advanced

Object support

Springboot and perfect support for sending and receiving objects require no special configuration.

/ / the sender
public void send(User user) {
    System.out.println("Sender object: " + user.toString());
    this.rabbitTemplate.convertAndSend("object", user); }.../ / the recipient
@RabbitHandler
public void process(User user) {
    System.out.println("Receiver object : " + user);
}Copy the code

The results are as follows:

Sender object: User{name='neo', pass='123456'}
Receiver object : User{name='neo', pass='123456'}Copy the code

Topic Exchange

Topic is one of the most flexible forms of RabbitMQ, freely binding different queues according to the routing_key

First configure the Topic rules, which are tested using two queues

@Configuration
public class TopicRabbitConfig {

    final static String message = "topic.message";
    final static String messages = "topic.messages";

    @Bean
    public Queue queueMessage(a) {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages(a) {
        return new Queue(TopicRabbitConfig.messages);
    }

    @Bean
    TopicExchange exchange(a) {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); }}Copy the code

Use queueMessages to match both queues. QueueMessages only match “topic.message” queues

public void send1(a) {
    String context = "hi, i am message 1";
    System.out.println("Sender : " + context);
    this.rabbitTemplate.convertAndSend("exchange"."topic.message", context);
}

public void send2(a) {
    String context = "hi, i am messages 2";
    System.out.println("Sender : " + context);
    this.rabbitTemplate.convertAndSend("exchange"."topic.messages", context);
}Copy the code

Sending send1 matches both topic.# and topic.message, while sending send2 matches only topic

Fanout Exchange

Fanout is the familiar broadcast or subscription mode in which a message is sent to a Fanout switch and received by all queues bound to the switch.

Fanout configuration

@Configuration
public class FanoutRabbitConfig {

    @Bean
    public Queue AMessage(a) {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage(a) {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage(a) {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange(a) {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        returnBindingBuilder.bind(CMessage).to(fanoutExchange); }}Copy the code

Queues A, B, and C are bound to the Fanout switch, and any characters written to the routing_key on the sending end are ignored:

public void send(a) {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("fanoutExchange"."", context);
}Copy the code

The results are as follows:

Sender : hi, fanout msg 
...
fanout Receiver B: hi, fanout msg 
fanout Receiver A  : hi, fanout msg 
fanout Receiver C: hi, fanout msgCopy the code

It turns out that the queues bound to the FANout switch all received messages

The sample code

reference

RabbitMQ Usage Reference

RabbitMQ: Spring integrates RabbitMQ with its concepts, message persistence, and ACK mechanisms


Like my article, please follow my official account