MQ is introduced

  1. MQ: Message Queue. A producer produces a message and stores it in a queue, and a consumer listens to the queue and consumes the message

  2. Advantages: The production and consumption of messages in MQ is asynchronous, with non-intrusive, low coupling between producer and consumer

  3. Common MQ frameworks

    # ActiveMQ
    Apache produced, the old message bus, comply with the JMS specification, provide a rich API. # Kafka Apache is an open source publishing and subscription messaging system with no transaction support, no strict control over errors and loss, high throughput, and data collection services suitable for large amounts of data. # RocketMQ ali open source messaging middleware, pure Java development, high throughput, high availability, suitable for large-scale distributed system applications. The idea started with Kafka. RabbitMQ is an open source message queue system based on the AMQP protocol. It is message, queue and routing oriented with reliability and security. Applicable to scenarios that require high data consistency and stabilityCopy the code

Introduce the RabbitMQ

  1. RabbitMQ website: www.rabbitmq.com/

  2. The advantages of the RabbitMQ

    -Using Erlang language development, Erlang is a powerful Socket programming language-Based on AMQP protocol, it is cross-platform-Easy integration with SpringBoot-Very data consistency friendlyCopy the code
  3. RabbitMQ related concepts

    # closer agreement
    Proposed in 2003, is a kind of advanced message protocol, does not limit the API layer, directly defines the network exchange data format, has the natural cross-platform. A virtual host has a set of switches, queues, and bindings that are used to divide RabbiMQ services. Generally, different services are configured with different virtual hosts. Users have permission control in the granularity of virtual hosts. Each RabbitMQ server is configured with a default virtual host '/' # switch for forwarding messages to queues. A channel object is created from the Connection object, and the data is transferred using the channel object. A channel can be regarded as a virtual connection, avoiding the overhead of creating real connections frequently
    Copy the code
  4. To obtain the RabbitMQ

    • For Windows, download the RabbitMQ installation package from the official website and start it with the Erlang environment

    • Linux installs RabbitMQ through the package manager

    • Linux uses Dcoker to pull the RabbitMQ image and start it

  5. Manage the RabbitMQ

    RabbitMQ provides a Web management page on port 15672 by default. You can log in to the Web page to manage RabbitMQ service configurations

    Such as adding a virtual host

    If you add a user, you can set the user’s access to the virtual host

Use the RabbitMQ

The following RabbitMQ version is 3.8.23

Create channels

 // Get the factory object of the MQ connection object
ConnectionFactory connectionFactory = new ConnectionFactory();
// Set the connection IP
connectionFactory.setHost("127.0.0.1");
// Set the port number
connectionFactory.setPort(5672);
// Set the virtual host
connectionFactory.setVirtualHost("demoMQ");
// Set the user name and password
connectionFactory.setUsername("demoUser");
connectionFactory.setPassword("xxxxxxx");
// Get the connection object
Connection connection = connectionFactory.newConnection();
// Create a channel object
Channel channel = connection.createChannel();
Copy the code

Create a queue

channel.queueDeclare("demoQueue".false.false.false.null);
/** * Parameter 1 Queue: name of the message queue * Parameter 2 Durable: Whether the queue is persistent (excluding messages in the queue) * Parameter 3 EXCLUSIVE: Whether the current connection is exclusive to the queue * Parameter 4 autoDelete: Whether the queue is automatically deleted after message consumption is complete and the connection is disconnected * argument 5 Argument: additional argument */
Copy the code

Work queue model

  1. Work queue model

    With a message queue, there are one or more consumers, each of whom retrieves a different message for consumption

  2. news

    channel.basicPublish(""."demoQueue".null."demo queue".getBytes());
    /** * parameter 1 exchange: switch name * parameter 2 routingKey: routing * parameter 3 props; Additional arguments * Argument 4 Body: Byte array for the message */
    Copy the code

    If the switch name is null, the default switch is used

    Each queue automatically binds its Routing Key with the same name to the default switch

    The channel and connection must be closed after being used

  3. News consumption

    // bind the same queue as the producer
    channel.queueDeclare("demoQueue".false.false.false.null);
    // Start message listening
    /** * Parameter 1 queue: queue name * parameter 2 autoAck: automatic acknowledgement * parameter 3 callback: callback interface object */
    channel.basicConsume("demoQueue".true.new DefaultConsumer(channel) {
        		// Body is the message content
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           BasicProperties properties,
                                           byte[] body) throws IOException {
                    System.out.println(newString(body)); }});Copy the code
  4. Distribution of the message

    In the work queue model, messages are evenly distributed across consumers by default

    If manual acknowledgment is used and only one message is consumed on each channel, the next message can be consumed only after acknowledgment

    In this way, the number of messages allocated is related to the speed of processing

    Manual acknowledgement message

    // Set the channel to consume only one message at a time
    // The channel does not get the allocation of the new message until it acknowledges the message
    channel.basicQos(1);
    // autoAck false
    channel.basicConsume("demoQueue".false.new DefaultConsumer(channel) {
        @SneakyThrows
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   BasicProperties properties,
                                   byte[] body) {
            System.out.println(new String(body));
            Thread.sleep(3000);
            // Argument 1: the flag of the message, obtained through envelope. GetDeliveryTag ()
            // Parameter 2: Whether to enable multi-message acknowledgement
            channel.basicAck(envelope.getDeliveryTag(), false); }});Copy the code

Publish and subscribe model

  1. Radio model

    Each consumer has its own queue, and each queue is bound to the switch, and the producer sends messages to the switch for distribution

  2. news

    // Declare a switch
    // Parameter 1: indicates the switch name
    // Parameter 2: switch type, fanout broadcast
    channel.exchangeDeclare("demoExchange"."fanout");
    // Send a message
    channel.basicPublish("demoExchange"."".null."demo exchange".getBytes());
    Copy the code
  3. News consumption

    // Declare a switch
    channel.exchangeDeclare("demoExchange"."fanout");
    // Get the temporary queue name
    String tempQueueName = channel.queueDeclare().getQueue();
    // Bind queues to switches
    channel.queueBind(tempQueueName, "demoExchange"."");
    // Start message listening
    channel.basicConsume(tempQueueName, false.new DefaultConsumer(channel) {
        @SneakyThrows
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,
                                   BasicProperties properties,
                                   byte[] body) {
            System.out.println(new String(body));
            channel.basicAck(envelope.getDeliveryTag(), false); }});Copy the code

    In the publish-subscribe model, producers only need to declare switches, consumers declare temporary queues and bind to switches, Routing keys all use empty strings, and once the switch receives the message, it forwards it to each bound queue

Routing topic model

  1. Routing topic model

    In the publish-subscribe model, all consumers can get the same message

    In the routing topic model, the switch no longer forwards messages to every queue, but matches more routes and topics

  2. news

    // Declare a switch
    channel.exchangeDeclare("routeExchange"."direct");
    // Send a message
    // Publish a message with the Routing Key info
    channel.basicPublish("routeExchange"."info".null."demo route info".getBytes());
    // Publish a message with the Routing Key error
    channel.basicPublish("routeExchange"."error".null."demo route error".getBytes());
    Copy the code
  3. News consumption

     // Declare a switch. The type is direct
    channel.exchangeDeclare("routeExchange"."direct");
    // Get the temporary queue name
    String tempQueueName = channel.queueDeclare().getQueue();
    // Bind the switch, queue, and Routing Key
    channel.queueBind(tempQueueName, "routeExchange"."info");
    // Start message listening
    channel.basicConsume(tempQueueName, false.new DefaultConsumer(channel) {
        @SneakyThrows
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, 
                                   BasicProperties properties,
                                   byte[] body) {
            System.out.println(new String(body));
            channel.basicAck(envelope.getDeliveryTag(), false); }});Copy the code

    Channel. queueBind can be called multiple times to bind multiple Routing keys

  4. Dynamic routing

    Dynamic Routing can use wildcards in Routing keys, with # representing any string and * representing a word

    Producer releases message

    // Declare a switch
    channel.exchangeDeclare("topicExchange"."topic");
    // Send several messages with different routes
    channel.basicPublish("topicExchange"."log.error".null."log.error".getBytes());
    channel.basicPublish("topicExchange"."log.error.file".null."log.error.file".getBytes());
    channel.basicPublish("topicExchange"."log.info".null."log.info".getBytes());
    channel.basicPublish("topicExchange"."log.info".null."user.info".getBytes());
    Copy the code

    Consumer binding queue

    // Declare the switch, topic type
    channel.exchangeDeclare("topicExchange"."topic");
    // Get the temporary queue name
    String tempQueueName = channel.queueDeclare().getQueue();
    // Bind queues and routes
    channel.queueBind(tempQueueName, "topicExchange"."log.*");
    /** * log.* can match log.error, log.info, log.error. File **. Info can match log.info, user.info */
    Copy the code

    Summary of three models:

    Work queue model: No switch single queue, content-message

    Publish subscribe model: switch empty routing, same message

    Routing topic model: There are switches with routes, by route

Integrated SpringBoot

  1. Introduction of depend on

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    Copy the code
  2. The configuration application. Yaml

    spring:
      rabbitmq:
        host: 127.0. 01.
        port: 5672
        username: demoUser
        password: xxxxx
        virtual-host: demoMQ
    Copy the code

    Once configured, the RabbitTemplate is available. RabbitTemplate encapsulates a set of RabbitMQ operations

  3. Work queue model

    news

    @SpringBootTest
    public class RabbitProviderTest {
        @Autowired
        RabbitTemplate rabbit;
        // Emulate the RabbitConsumer object
        // Prevent RabbitConsumer consumption messages instantiated during test startup
        @MockBean
        public RabbitConsumer consumer;
        
        @Test
        public void queueProvide(a) {
            // Send messages to the queue
            rabbit.convertAndSend("demoQueue"."hello demo queue"); }}Copy the code

    News consumption

    @Component
    // @queue Specifies parameters such as durable and exclusive
    QueueDeclare ("demoQueue", true, false, false, null); // The default is channel.queueDeclare("demoQueue", true, false, false, null);
    @RabbitListener(queuesToDeclare = @Queue(name = "demoQueue"))
    public class RabbitConsumer {
        // The message handler can get the message body directly
        @RabbitHandler
        public void queueConsume(String message) { System.out.println(message); }}/ * * *@RabbitListenerIt can also be used on methods, denoted as message handler */
    Copy the code

    In Spring AMQP, the work queue model is fair consumption

  4. Publish and subscribe model

    news

    @SpringBootTest
    public class RabbitProviderTest {
        @Autowired
        RabbitTemplate rabbit;
        @MockBean
        public RabbitConsumer consumer;
        
        @Test
        public void subscribeProvide(a) {
            rabbit.convertAndSend("demoExchange".""."hello demo exchange"+ i); }}Copy the code

    News consumption

    @Component
    public class RabbitConsumer {
        // The first consumer
        // @queue creates a temporary Queue without specifying an argument
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "", exchange = @Exchange(name = "demoExchange", type = "fanout")))
        public void subscribeConsumer1(String message) {
            System.out.println("SubscribeConsume1 receives:" + message);
        }
        // The second consumer
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "", exchange = @Exchange(name = "demoExchange", type = "fanout")))
        public void subscribeConsumer2(String message) {
            System.out.println("SubscribeConsume2 receives:"+ message); }}Copy the code
  5. Routing topic model

    news

    @SpringBootTest
    public class RabbitProviderTest {
        @Autowired
        RabbitTemplate rabbit;
        @MockBean
        public RabbitConsumer consumer;
        
        @Test
        public void routeProvide(a) {
            rabbit.convertAndSend("routeExchange"."log.error"."log.error");
            rabbit.convertAndSend("routeExchange"."log.info"."log.info");
            rabbit.convertAndSend("routeExchange"."log.error.file"."log.error.file");
            rabbit.convertAndSend("routeExchange"."user.info"."user.info"); }}Copy the code

    News consumption

    @Component
    public class RabbitConsumer {
    
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "log.*", exchange = @Exchange(name = "routeExchange", type = "topic")))
        public void routeConsumer1(String message) {
            System.out.println("RouteConsumer1 receives:" + message);
        }
    
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "log.#", exchange = @Exchange(name = "routeExchange", type = "topic")))
        public void routeConsumer2(String message) {
            System.out.println("RouteConsumer2 receives:" + message);
        }
    
        @RabbitListener(bindings = @QueueBinding(value = @Queue, key = "*.info", exchange = @Exchange(name = "routeExchange", type = "topic")))
        public void routeConsumer3(String message) {
            System.out.println("RouteConsumer3 receives:"+ message); }}/** * To use the wildcard to implement dynamic routing, set type to topic */
    Copy the code

The RabbitMQ cluster

  1. Basic steps

    Prepare the RabbitMQ service and deploy two or more RabbitMQ services

    Configure the.erlang.cookie file..erlang.cookie is the key to join the service cluster and remains the same in the cluster service

    Run the cluster joining command on the secondary node

    Rabbitmqctl join_cluster --ram Rabbit @#-- RAM indicates a memory node. If no ram is added, it is a disk node by default
    Copy the code

    Set up a mirror queue on any node

    Strategy rabbitmqctl set_policy < name > "< name > queue" '{" ha - mode ":" < > mirror mode "}'#Parameter 1: indicates the policy name
    #Parameter 2: Matching rules for queue names, using regular expressions
    #Parameter 3: The principal rule of the mirror queue, a JSON string, with three attributes: ha-mode/ha-params/ha-sync-mode
    #Ha-mode: mirroring mode, all/exactly/ Nodes, all is stored on all nodes
    #--vhost Sets the virtual host
    Copy the code
  2. Docker RabbitMQ cluster

    Pull the RabbitMQ image

    Docker pull the rabbitmq: 3.8.23 - managementCopy the code

    Prepare the rabbitmq.conf configuration file and configure default accounts and virtual hosts

    loopback_users.guest = false
    listeners.tcp.default = 5672
    management.tcp.port = 15672
    default_user = cluster
    default_pass = xxxxx
    default_vhost = clusterMQ
    Copy the code

    Start three RabbitMQ containers and mount the same.erlang.cookie file

    # 1docker run \ --name rabbitmq1 \ -h rabbitmq1 \ -p 15673:15672 \ -p 5673:5672 \ -v /var/docker/rabbitmq_cluster/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v /var/docker/rabbitmq_cluster/data1:/var/lib/rabbitmq \ -v / var/docker/rabbitmq_cluster. Erlang. Cookies: / var/lib/rabbitmq /. Erlang. Cookies \ - d the rabbitmq: 3.8.23 - management# 2docker run \ --name rabbitmq2 \ -h rabbitmq2 \ -p 15674:15672 \ -p 5674:5672 \ -v /var/docker/rabbitmq_cluster/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v /var/docker/rabbitmq_cluster/data2:/var/lib/rabbitmq \ -v /var/docker/rabbitmq_cluster/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \ --link rabbitmq1:rabbitmq1 \ -d The rabbitmq: 3.8.23 - management# 3docker run \ --name rabbitmq3 \ -h rabbitmq3 \ -p 15675:15672 \ -p 5675:5672 \ -v /var/docker/rabbitmq_cluster/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v /var/docker/rabbitmq_cluster/data3:/var/lib/rabbitmq \ -v /var/docker/rabbitmq_cluster/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \ --link rabbitmq1:rabbitmq1 --link Rabbitmq2: rabbitmq2 \ - d the rabbitmq: 3.8.23 - managementCopy the code

    To join the cluster

    #Into the container
    docker exec -it rabbitmq2 bash
    #Stop the service
    rabbitmqctl stop_app
    #To add node 1, rabbit@ must use the host name, not the IP address
    #The host name is the parameter after docker run-h
    rabbitmqctl join_cluster --ram rabbit@rabbitmq1
    Copy the code

    Configure a policy to specify the virtual host as clusterMQ

    rabbitmqctl set_policy --vhost clusterMQ demoPolicy "^" '{"ha-mode":"all"}'
    #use"^"Mirrors all queues
    Copy the code