1 introduction

The last article introduced messaging middleware, and this article focuses on the implementation of RabbitMQ

Message middleware details

Rabbitmq is an open source message queue system developed in Erlang and implemented on AMQP as an application-to-application communication method. Applications communicate by reading and writing messages in and out of queues without the need for dedicated connections to link them. Messaging refers to applications communicating with each other by sending data in messages, rather than by direct calls, which is usually the technique of remote procedure calls.

2 Core Composition

  • Rabbitmq-server: Broker that receives connections from clients, implements AMQP entity services, and installs Rabbitmq-Server
  • TCP/IP/ three-way handshake and four-way wave
  • Channel: Network Channel in which almost all operations are performed. A Channel is a Channel for reading and writing messages. A client can establish multiple channels, each representing a session task.
  • Message: A Message, data transmitted between a service and an application. It consists of Properties, which modify the Message with advanced features such as priority, latency, and Body, which is the content of the Message Body.
  • Virtual Host: a Virtual address used for logical isolation. It is the upper-layer message route. A Virtual Host can have multiple Exchanges and queues
  • Exchange: switch that receives messages and sends messages to bound queues based on routing keys (no message storage capability)
  • Bindings: Virtual links between Exchanges and queues. Bindings can store multiple routing keys
  • Routing key: is a Routing rule that a virtual machine can use to determine how to route a particular message
  • Queue: A Queue, also known as a Message Queue, that stores messages and forwards them to consumers

3 Rabbitmq message mode

3.1 Simple model

Simple model is one of the most Simple mode, by a producer, a queue, a consumer, producers will message through switches (at this point, the concept of figure and there is no switch, if not define switches, will use the default switch) stored in the message queue, consumers take out the message from the queue for processing.

Implement this pattern with Java Demo

Productor

public class Send {
    private final static String QUEUE_NAME = "queue1";

    public static void main(String[] args) {
        // Create a connection project
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.96.109");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;

        try {
            // create a connection and channel
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 3
            channel.queueDeclare(QUEUE_NAME, false.false.false.null);
            // Message content
            String message = "Hello world";
            // 4. Send the message to the specified queue
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        } finally {
            // Close the channel
            if(channel ! =null && channel.isOpen()) {
                try {
                    channel.close();
                } catch(Exception e) { e.printStackTrace(); }}// Close the connection
            if(connection ! =null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
Copy the code

Customer

public class Recv {
    private final static String QUEUE_NAME = "queue1";

    public static void main(String[] args) throws IOException, TimeoutException {
        // Create a connection project
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.96.109");
        factory.setVirtualHost("/");

        // get Connection and Channel
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 3
        channel.queueDeclare(QUEUE_NAME, false.false.false.null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }}Copy the code

Looking at the visual interface, you can see that messages are first written to the queue and then consumed by the consumer.

3.2 the Fanout mode

Fanout — Publish and subscribe, a broadcast mechanism.

This pattern includes: one producer, one exchange, multiple queues, and multiple consumers. The producer sends the message to the switch, which does not store the message but stores the message to a queue from which the consumer retrieves the message. If a producer sends a message to a switch that is not bound to a queue, the message will be lost.

Implement this pattern with Java Demo

Productor

public class Productor {
   private static final String EXCHANGE_NAME = "fanout_exchange";

   public static void main(String[] args) {
       // Create a connection project
       ConnectionFactory factory = new ConnectionFactory();
       factory.setHost("192.168.96.109");
       factory.setUsername("admin");
       factory.setPassword("admin");
       factory.setVirtualHost("/");

       Connection connection = null;
       Channel channel = null;
       try {
           // get the connection and channel
           connection = factory.newConnection();
           channel = connection.createChannel();
           // Message content
           String message = "hello fanout mode";
           // Specify the route key
           String routeKey = "";
           String type = "fanout";
           // 3
           channel.exchangeDeclare(EXCHANGE_NAME, type);
           // 4
           channel.queueDeclare("queue1".true.false.false.null);
           channel.queueDeclare("queue2".true.false.false.null);
           channel.queueDeclare("queue3".true.false.false.null);
           channel.queueDeclare("queue4".true.false.false.null);
           // bind channel to queue
           channel.queueBind("queue1", EXCHANGE_NAME, routeKey);
           channel.queueBind("queue2", EXCHANGE_NAME, routeKey);
           channel.queueBind("queue3", EXCHANGE_NAME, routeKey);
           channel.queueBind("queue4", EXCHANGE_NAME, routeKey);
           // 6
           channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
           System.out.println("Message sent successfully!");
       } catch (IOException | TimeoutException e) {
           e.printStackTrace();
           System.out.println("Message sending exception");
       }finally {
           // Close the channel and connection......}}}Copy the code

Customer

public class Customer {
    private static Runnable runnable = new Runnable() {
        @Override
        public void run(a) {
            // Create a connection factory
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.96.109");
            factory.setUsername("admin");
	        factory.setPassword("admin");
	        factory.setVirtualHost("/");

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                // Get connections, channels
                connection = factory.newConnection();
                channel = connection.createChannel();

                Channel finalChannel = channel;
                finalChannel.basicConsume(queueName, true.new DeliverCallback() {
                    @Override
                    public void handle(String consumerTag, Delivery delivery) throws IOException {
                        System.out.println(delivery.getEnvelope().getDeliveryTag());
                        System.out.println(queueName + ": Received the message:" + new String(delivery.getBody(), "UTF-8")); }},new CancelCallback() {
                    @Override
                    public void handle(String consumerTag) throws IOException {}}); System.out.println(queueName +": Start receiving messages");
            } catch (IOException |
                    TimeoutException e) {
                e.printStackTrace();
            } finally {
                // Close the channel and connection......}}};public static void main(String[] args) throws IOException, TimeoutException {
    	// Create threads to fetch messages from each of the four queues
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
        new Thread(runnable, "queue4").start(); }}Copy the code

After Productor was executed, one message was added to each of the four queues, and after Customer was executed, all the messages in the four queues were consumed by consumers.

3.3 Direct mode

In Direct mode, routing keys are added on the basis of Fanout mode. In Fanout (publish/subscribe) mode, the switch stores messages to all bound queues. In Direct mode, filtering conditions are added on this basis. The switch stores the message only to the queue that meets the routing key.

In the figure above, we can see that the switch is bound to two queues. The routing key bound to queue Q1 is “orange”, and the routing key bound to queue Q2 is “black” and “green”. In this setup, a message published with routing key “orange” will be routed to Q1, and a message published with routing key “black” or “green” will be routed to Q2

Bind the queue to a routing_key in RabbitMQ. The routing_key must be a list of words

Implement this pattern with Java Demo

Productor

public class Productor {
    private static final String EXCHANGE_NAME = "direct_exchange";

    public static void main(String[] args) {
        // Create a connection project
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.96.109");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // get the connection and channel
            connection = factory.newConnection();
            channel = connection.createChannel();
            // Message content
            String message = "hello direct mode";
            // Specify the route key
            String routeKey = "email";
            String type = "direct";
            // 3
            channel.exchangeDeclare(EXCHANGE_NAME, type);
            // 4
            channel.queueDeclare("queue1".true.false.false.null);
            channel.queueDeclare("queue2".true.false.false.null);
            channel.queueDeclare("queue3".true.false.false.null);
            // bind channel to queue
            channel.queueBind("queue1", EXCHANGE_NAME, "email");
            channel.queueBind("queue2", EXCHANGE_NAME, "sms");
            channel.queueBind("queue3", EXCHANGE_NAME, "vx");
			// 6
            channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
            System.out.println("Message sent successfully!");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            System.out.println("Message sending exception");
        } finally {
            // Close the channel and connection......}}}Copy the code

You can see the routing_key binding for each queue on the visual page

Since the routing_key is set to “email”, only Queue1 should store a message.



Customer is the same as the fanout example above.

3.4 Topic schema

In Topic mode, after the producer stores the message to the queue through the switch, the switch matches the wildcard according to the value of the routing key of the bound queue. If the match passes, the message will be stored in the queue. If the value of the routing key matches multiple queues, the message will be sent to multiple queues. If none of the queues matches, the message is lost.

Routing_key must be a list of words, separated by dots, where * and # mean:

  • * : 1 word
  • # : Zero or more words

Implement this pattern with Java Demo

Productor

public class Productor {
    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) {
        // Create a connection project
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.96.109");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
           // get the connection and channel
            connection = factory.newConnection();
            channel = connection.createChannel();
            // Message content
            String message = "hello topic mode";
            // Specify the route key
            String routeKey = "com.order.test.xxx";
            String type = "topic";
            // 3
            channel.exchangeDeclare(EXCHANGE_NAME, type);
            // 4
            channel.queueDeclare("queue5".true.false.false.null);
            channel.queueDeclare("queue6".true.false.false.null);
            // bind channel to queue
            channel.queueBind("queue5", EXCHANGE_NAME, "*.order.#");
            channel.queueBind("queue6", EXCHANGE_NAME, "#.test.*");
            // 6
            channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes("UTF-8"));
            System.out.println("Message sent successfully!");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            System.out.println("Message sending exception");
        } finally {
            // Close the channel and connection......}}}Copy the code

After Productor is executed, the routing_key bound to queue can be seen from the visual page

Since the routing_key is “com.order.test.xxx” in the above example, both Queue5 and Queue6 will receive the message.

Customer As in the above example, after Customer is executed, queue information is checked again and messages for queue5 and Queue6 are consumed.

3.5 the Work mode

When there are multiple consumers, there are two main models for how to balance the amount of messages consumed by message holders:

  • Polling distribution: Polling distribution is sequential, with each consumer getting the same number of messages
  • Fair distribution: fair distribution according to the consumption capacity of consumers, processing fast processing more, processing slow processing less, distribution according to work

3.5.1 Polling Distribution

In this mode, RabbitMQ uses polling to assign tasks to multiple consumers, but it is possible that if the task assigned to one consumer is complex and the task assigned to one consumer is light, some consumers will be busy and some will be idle. Rabbitmq doesn’t know this is happening and blindly assigns tasks regardless of the number of unconfirmed messages to the consumer.

Implement this pattern with Java Demo

Productor

public class Productor {
    public static void main(String[] args) {
        // Create a connection project
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.96.109");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // get the connection and channel
            connection = factory.newConnection();
            channel = connection.createChannel();

            // publish 20 messages to Queue1
            for (int i = 0; i < 20; i++) {
                String msg = "feiyangyang: " + i;
                channel.basicPublish(""."queue1".null, msg.getBytes(StandardCharsets.UTF_8));
            }
            System.out.println("Message sent successfully!");
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
            System.out.println("Message sending exception");
        } finally {
            // Close the channel and connection......}}}Copy the code

Worker1

public class Worker1 {
    public static void main(String[] args) {
        // create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.96.109");
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            // Get connections, channels
            connection = factory.newConnection();
            channel = connection.createChannel();
            Channel finalChannel = channel;
            finalChannel.basicConsume("queue1".true.new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
                    System.out.println("Worker1" + ": Received the message:" + new String(delivery.getBody(), "UTF-8"));
                    try {
                        Thread.sleep(2000);
                    } catch(InterruptedException e) { e.printStackTrace(); }}},new CancelCallback() {
                @Override
                public void handle(String consumerTag) throws IOException {}}); System.out.println("Worker1 starts receiving messages");
            System.in.read();
        } catch (IOException |
                TimeoutException e) {
            e.printStackTrace();
        } finally {
            // Close the channel and connection......}}}Copy the code

Worker2 is the same as Worker1

Let’s take a look at the message distribution result:

Worker1: indicates that the message is received. Feiyangyang: 0 Worker1: indicates that the message is received. Feiyangyang: 2 4 Worker1: receives the message: feiyangyang: 6 Worker1: receives the message: feiyangyang: 8 Worker1: receives the message: feiyangyang: 10 12 Worker1: receives the message: feiyangyang 14 Worker1: receives the message: feiyangyang 16 Worker1: receives the message: feiyangyang 18 Worker2: receives the message. Worker2: receives the message. Feiyangyang: 1 Worker2: receives the message. 5 Worker2: receives the message: feiyangyang 7 Worker2: receives the message: feiyangyang 9 Worker2: receives the message: feiyangyang 11 Worker2: receives the message: feiyangyang 13 Worker2: receives the message: feiyangyang: 15 Worker2: receives the message: feiyangyang: 17 Worker2: receives the message: feiyangyang: 19Copy the code

As you can see, the polling distribution model distributes messages evenly among all consumers.


3.5.2 Equitable distribution

To solve the Work polling distribution problem, RabbitMQ uses the basicQos method with perfetchCount = 1. No new messages are sent to this consumer until the consumer accepts the processing and confirms the previous message, and they are allocated to other idle consumers.

The Productor code is the same as the above polling pattern, with minor modifications in Customer

Worker1

// Channel uses Qos mechanism
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1".false.new DeliverCallback() {
    @Override
    public void handle(String consumerTag, Delivery delivery) throws IOException {
        System.out.println("Worker1" + ": Received the message:" + new String(delivery.getBody(), "UTF-8"));
        try {
            Thread.sleep(1000);
            // Change to manual reply
            finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        } catch(InterruptedException e) { e.printStackTrace(); }}},new CancelCallback() {
    @Override
    public void handle(String consumerTag) throws IOException {}});Copy the code

Compared with the polling distribution mode, Qos mechanism is added in the above example. The value is set to 1, which means that the consumer obtains several messages from the queue every time. The sleep time of Worker1 is set to 1s and that of Worker2 to 2s, and the message distribution result is checked

Worker1: indicates that the message is received. Feiyangyang: 0 Worker1: indicates that the message is received. Feiyangyang: 2 4 Worker1: receives the message: feiyangyang 5 Worker1: receives the message: feiyangyang 7 Worker1: receives the message: feiyangyang 8 Worker1: receives the message: feiyangyang 10 Worker1: receives the message: feiyangyang 11 Worker1: receives the message: feiyangyang 13 Worker1: receives the message: feiyangyang 14 Worker1: receives the message: feiyangyang 16 Worker1: receives the message: feiyangyang 17 Worker1: receives the message: feiyangyang 19 Worker2: starts to receive the message Worker2: receives the message: feiyangyang 1 Worker2: receives the message: feiyangyang: 3 Worker2: receives the message: feiyangyang: 6 Worker2: receives the message: feiyangyang: 9 Worker2: receives the message: feiyangyang: 12 Worker2: receives the message: feiyangyang: 15 Worker2: receives the message: feiyangyang: 18Copy the code

When the Work fair distribution mode is used, set consumers to manual reply and enable Qos.

4 Message loss prevention mechanism

4.1 Message Confirmation

It may take a few seconds for a consumer to complete a task, and if one of the consumers starts a long task and only partially completes the task and dies, if autoAck is set to true, RabbitMQ will mark it as deleted as soon as the message is passed to the consumer, in which case, We will lose all messages that have been assigned to this particular consumer but have not yet been processed.

If one of the consumers goes down, RabbitMQ can distribute its messages to the other consumers. To ensure that the message is not lost, RabbitMQ uses message acknowledgement. The consumer sends back an acknowledgement message telling RabbitMQ that the message has been received and processed, at which point RabbitMQ can safely delete the message.

If the consumer goes down without sending an ACK, RabbitMQ will understand that the message has not been processed by the consumer, and if there are other consumers online, it will be quickly redelivered to them, ensuring that the message will not be lost.

By default RabbitMQ will enable manual message acknowledgement, meaning autoAck is false by default. Once a task is completed it will need to be manually acknowledged, so the autoAck will need to be set to false and the manual reply will be performed as follows.

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
Copy the code


4.2 persistence

Rabbitmq’s message acknowledgement mechanism ensures that messages are not lost, but if the RabbitMQ server stops, our task will still be lost.

When RabbitMQ exits or crashes, queues and messages disappear if not persisted. Two things need to be done to ensure that messages are not lost, marking both the queue and the message as persistent.

  1. Set queue persistence
boolean durable = true;
channel.queueDeclare("hello", durable, false.false.null);
Copy the code
  1. Set message Persistence
channel.basicPublish(""."task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Copy the code

Marking a message as persistent is not a complete guarantee that the message will not be lost, there is still a very short window of time for the message to be lost when rabbitMQ receives it and does not save it, and if a stronger guarantee is required, use the publisher confirmation mechanism.

5 Application Scenarios

  • Decoupling, peak clipping, asynchrony

The decoupling

In microservice architecture, microservice A needs to communicate with microservice B. Traditionally, A calls THE interface of B. However, if system B is inaccessible or the connection times out, system A needs to wait until system B responds and there is A serious coupling between SYSTEM A and SYSTEM B. If a message queue is introduced to communicate with system AB, the flow looks like this:

  • System A stores the message to the message queue and returns A success message
  • System B obtains the message from the queue and processes it

System A queues messages and does not care about other things such as whether system B can fetch them, thus decoupling the two systems.

Usage Scenarios:

  • Notification by SMS or email

Peak clipping

If system A receives 100 requests per second, the system can run stably. However, if the number of concurrent requests per second reaches 1W during the seckill activity, the maximum processing capacity of the system can only handle 1000 requests per second, the system server will break down during the seckill activity. This problem can be solved if MQ is introduced. 1 w per request will lead to system crashes, we allow users to send requests are stored in the queue, because the system maximum capacity is 1000 requests per second, make the system A pull from the queue only 1000 requests per second, guarantee the system can run steady, during the seconds kill, request A lot into the queue, backlog to MQ, The system processes only 1,000 requests per second from the queue. This temporary peak backlog is fine because once the peak is over, the number of requests per second declines rapidly, and the system is still processing 1,000 requests per second from the queue, quickly consuming the backlog of messages.

Usage Scenarios:

  • Seconds kill activity
  • Mass of musical activity

asynchronous

User registration, need to send registration email and registration SMS, the traditional approach has two kinds: serial, parallel.

  • Serial mode: After writing the registration information to the library (50ms), send an email (50ms), and then send a short message (50ms). After completing the task, return to the client, total time (150ms)
  • Parallel mode: After writing the registration information to the library (50ms), start the child thread to send emails and SMS simultaneously (50ms) and return to the client, total time (100ms)
  • MQ is introduced, the registration information is written to the library (50ms), the operation of sending emails and short messages is written to the queue (5s), and returned to the client. When the consumer takes the message from the queue for processing, it does not care, the total time (55ms)

Usage Scenarios:

  • Asynchronously process business logic that does not have to wait for the result of the response