The RabbitMQ SpringBoot integration

Introduction of the RabbitMQ

RabbitMQ is an open source implementation of Advanced Message Queuing Protocol (AMQP) developed by Erlang.

The core concept

Message

A message, which is anonymous, consists of a header and a body. The body of the message is opaque, and the header consists of a set of optional attributes, including routing-key, priority (priority over other messages), delivery-mode (indicating that the message may require persistent storage), and so on.

Publisher

The producer of messages is also a client application that publishes messages to the exchange.

Exchange

A switch that receives messages sent by producers and routes them to queues in the server.

There are four types of Exchange: Direct (default), Fanout, Topic, and headers. Different types of Exchange have different policies for forwarding messages

Queue

Message queues, used to hold messages until they are sent to consumers. It is the container and destination of the message. A message can be put into one or more queues. The message remains in the queue, waiting for the consumer to connect to the queue to pick it up.

Binding

Binding for association between message queues and exchanges. A binding is a routing rule that connects a switch to a message queue based on a routing key, so a switch can be thought of as a routing table made up of bindings.

Exchange and Queue bindings can be many-to-many.

Connection

A network connection, such as a TCP connection.

Channel

Channel: an independent two-way data channel in a multiplexing connection. A channel is a virtual connection established in a real TCP connection. AMQP commands are sent through the channel. No matter publishing messages, subscribing to queues or receiving messages, these actions are completed through the channel. Because it is very expensive for an operating system to establish and destroy TCP, the concept of a channel was introduced to reuse a TCP connection.

Consumer

Message consumer, representing a client application that retrieves a message from a message queue.

Virtual Host

Virtual host, representing a batch of exchanges, message queues, and related objects. A virtual host is a separate server domain that shares the same authentication and encryption environment. Each Vhost is essentially a mini RabbitMQ server with its own queue, switch, binding and permission mechanism. Vhost is the basis of the AMQP concept and must be specified at connection time. The default vhost for RabbitMQ is /.

Broker

Represents the message queue server entity

Operation mechanism

The producer of the message publishes the message to the Exchange and, based on the Exchange type and Binding, decides which queue the message on the Exchange will be sent to.

Exchange type

direct

If the routing key matches exactly the bindingKey in a Binding, the message is sent to the corresponding queue.

fanout

The message is broadcast to all binding queues

topic

The switch uses pattern matching to allocate routing key attributes of messages to match routing keys to a pattern. In this case, queues must be bound to a pattern. It splits the strings of routing and binding keys into words separated by dots. It also recognizes two wildcards: the symbol “#” and the symbol “”. # matches zero or more words, ***** matches one word.

If the binding has modes dog.# and *. MSG, if you send a message to the switch with the routing key dog. MSG, then both queues will receive the message with the routing key dog.

headers

Matching the headers of AMQP messages instead of routing keys, headers and Direct exchanges match exactly the same, but perform much worse and are now almost useless

Integrating the RabbitMQ

Introduction of depend on

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

RabbitMQ server connection

(application.yml) The default password for rabbitMQ is guest/guest

spring:
  rabbitmq:
    host: 192.16837.100.
    port: 5672
    username: guest
    password: guest
Copy the code

Test using

Start with RabbitTemplate and RabbitAdmin

package org.fall;

import org.fall.entity.Person;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringbootAmqpApplicationTests {

    // MESSAGE sending and processing components of MQ
    @Autowired
    RabbitTemplate rabbitTemplate;

    // The management component of MQ
    @Autowired
    RabbitAdmin rabbitAdmin;


    /** * Use rabbitAdmin to initialize switches, queues, bindings, etc. */
    @Test
    void contextLoads(a) {
        // Create a switch
        // Pass in the name of the switch to create
        rabbitAdmin.declareExchange(new DirectExchange("exchange.direct"));
        rabbitAdmin.declareExchange(new FanoutExchange("exchange.fanout"));
        rabbitAdmin.declareExchange(new TopicExchange("exchange.topic"));

        MSG learn. MSG Queue constructor passes in the name of the Queue to be created */

        rabbitAdmin.declareQueue(new Queue("fall"));
        rabbitAdmin.declareQueue(new Queue("fall.other"));
        rabbitAdmin.declareQueue(new Queue("fall.msg"));
        rabbitAdmin.declareQueue(new Queue("learn.msg"));

        // Bind queues and switches
        /** * Binding constructor: Public Binding(* String destination, // DestinationType DestinationType, // the type of binding (QUEUE/EXCHANGE) * String EXCHANGE, // the name of the bound EXCHANGE * String routingKey, // the routingKey *@NullableMap<String, Object> arguments)// Can be null * {*/ / processing code omitted *} */
        rabbitAdmin.declareBinding(new Binding("fall", Binding.DestinationType.QUEUE, "exchange.direct"."fall".null));
        rabbitAdmin.declareBinding(new Binding("fall.other", Binding.DestinationType.QUEUE, "exchange.direct"."fall.other".null));
        rabbitAdmin.declareBinding(new Binding("fall.msg", Binding.DestinationType.QUEUE, "exchange.direct"."fall.msg".null));
        rabbitAdmin.declareBinding(new Binding("learn.msg", Binding.DestinationType.QUEUE, "exchange.direct"."learn.msg".null));


        rabbitAdmin.declareBinding(new Binding("fall", Binding.DestinationType.QUEUE, "exchange.fanout"."fall".null));
        rabbitAdmin.declareBinding(new Binding("fall.other", Binding.DestinationType.QUEUE, "exchange.fanout"."fall.other".null));
        rabbitAdmin.declareBinding(new Binding("fall.msg", Binding.DestinationType.QUEUE, "exchange.fanout"."fall.msg".null));
        rabbitAdmin.declareBinding(new Binding("learn.msg", Binding.DestinationType.QUEUE, "exchange.fanout"."learn.msg".null));

        rabbitAdmin.declareBinding(new Binding("fall", Binding.DestinationType.QUEUE, "exchange.topic"."fall".null));
        rabbitAdmin.declareBinding(new Binding("fall.other", Binding.DestinationType.QUEUE, "exchange.topic"."fall.#".null));
        rabbitAdmin.declareBinding(new Binding("fall.msg", Binding.DestinationType.QUEUE, "exchange.topic"."fall.#".null));
        rabbitAdmin.declareBinding(new Binding("fall.msg", Binding.DestinationType.QUEUE, "exchange.topic"."*.msg".null));
        rabbitAdmin.declareBinding(new Binding("learn.msg", Binding.DestinationType.QUEUE, "exchange.topic"."*.msg".null));

    }

    /** * Test using RabbitTemplate to send simple data (such as strings) */
    @Test
    public void testRabbitTemplate(a) {
        // There are several ways to send messages to RabbitMQ:
        // create a Message
        //rabbitTemplate.send(exchange,routingKey,message);

        // 2. Use convertAndSend, whenever an object is passed in to send, it is automatically serialized and sent to MQ
        rabbitTemplate.convertAndSend("exchange.direct"."fall"."hello fall~");
    }

    /** * Test sending object data * Prerequisites: Public void convertAndSend(String Exchange, String routingKey, Object Object) * exchange: the name of the destination exchange to which the message is sent * routingKey: the routingKey of the message * Object: the content of the message to be sent */
    @Test
    public void testRabbit02(a) {
        rabbitTemplate.convertAndSend("exchange.topic"."fall.xx".new Person(1."lis"));
        System.out.println("finish");
    }

    /** * gets the message in the queue */ from the specified queue
    @Test
    public void testRabbit03(a) {
        Object o = rabbitTemplate.receiveAndConvert("fall.msg");    // The name of the incoming queue
        System.out.println("class: " + o.getClass());
        System.out.println("toString: "+ o); }}Copy the code

Use custom MessageConverter

@Configuration
public class MyRabbitMQConfig {

    // Configure a custom MessageConverter instead of the default to automatically convert the object to JSON format when it is put into the message queue
    @Bean
    public MessageConverter messageConverter(a) {
        return newJackson2JsonMessageConverter(); }}Copy the code

Once the custom MessageConverter is added to the IOC container, the default is disabled and the custom is used to render the object in JSON format when it is put into the message queue.

RabbitListener Easy to use

To use RabbitListener, you must first EnableRabbit on the main boot class (@enablerabbit) :

@SpringBootApplication
@EnableRabbit       // Enable RabbitMQ message queuing
public class SpringbootAmqpApplication {
    public static void main(String[] args) { SpringApplication.run(SpringbootAmqpApplication.class, args); }}Copy the code

The @RabbitListener annotation can be used in business code to trigger the annotation when data is passed into the corresponding message queue.

And when this method fires, the message in the queue is also taken out, instead of remaining in the queue.

@Service
public class MyService {

    RabbitListener annotation A method that listens to a queue and is triggered when a message enters the queue
    @RabbitListener(queues = "fall.msg")
    public void listener(a) {
        System.out.println("Fall. MSG New message coming in...");
    }

    // Spring automatically injects the Message object into the method while listening to it. You can view Message information in the method
    @RabbitListener(queues = "fall.other")
    public void listenerMessage(Message message) {
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println(messageProperties);
        // Get a message in byte array format using the getBody() method and a JSON object using new String(byte [] bytes).
        byte[] bytes = message.getBody();
        System.out.println(newString(bytes)); }}Copy the code