Write it up front

It has been a long time since I posted this article last time. In fact, I have not stopped writing during this period. I am just busy looking for a job and finishing school courses

  • This article is a bit long, so it is divided into two parts
  • PS: The Github Java q&A article is still being written and will be updated in the near future

Article Contents:

  • RabbitMQ from shallow to deep
  • RabbitMQ from shallow to deep

1. Come in a shallow way

1.1 What is Middleware?

The definition of IDC: Middleware is an independent system software service program. Distributed application software uses this software to share resources between different technologies. The middleware is located on the operating system of client server and manages computing resources and network communication.

First, middleware is a general term for a certain kind of software, not a specific kind of software. It is a kind of platform (hardware) operating system and application between the universal service, it blocked the complexity of the underlying operating system, easing the burden on developers technology, at the same time, its design is not for a specific goal, but has the universal characteristics of function module services, these services have standard program interfaces and protocols, Depending on the platform, there can be different implementations.

Popular examples (for reference only, not entirely consistent) :

  • I run A coffee shop, and THERE are n suppliers of coffee beans, such as A, B and C, around me. However, I will definitely choose beans with affordable prices and good quality. However, the market is subject to the fluctuations of various factors, and my current choice may not be the best one after A period of time. So I specially found a market agent and asked him to help me worry about this business. I only tell you the price and quality requirements, and you can go to find it. I don’t worry about the process at all. The concept of mediation is similar to that of middleware

1.1.1 Distributed Concepts (Supplement)

This paragraph is from my introduction to Dubbo article

The definitions in Baidu and Wiki are relatively professional and obscure. Most blogs and tutorials often use the definition in Distributed System Principles and Paradigms, that is, “DISTRIBUTED system is a collection of several independent computers, which are like a single related system to users”.

Below we use some space to explain popularly what is called distributed

1.1.1.1 What is a centralized System

Referring to distributed, must mention is the “centralized system”, this concept is best understood, it is the function, procedures and so on installed on the same device, by this host device to provide services

Here’s the simplest example: You take a PC host, modify it into a simple server, configure all kinds of content, you install MySQL, Web server, FTP, Nginx, etc., all in it, package and deploy the project, then you can provide services to the outside, but once the machine has software or hardware problems, The whole system will suffer serious implicating errors, eggs in one basket, if you want to hit all

1.1.12 What is a Distributed System

Now that the centralized system, there is a extremely important distributed one role, then, nature is to solve this problem, known as the definition, a distributed system in the user’s sensory experience, just like the traditional single system, some changes are to be carried out within the system itself, and nothing is too big for the users

Such as: Taobao, jingdong this large electric business platform, they are tens of thousands of the host, otherwise can’t deal with large amounts of data and the request, what specific including division, as well as the operation, we would say to the below, but for our users, we don’t need to also don’t want to care about these, we can still think, What we face is “Taobao” this “host”

So a relatively professional term for distributed is like this (process granularity) two or more programs, respectively running on different host processes, they coordinate with each other to complete the common function, then the system composed of several programs can be called distributed system

  • They are all the same program – distributed
  • These are all different programs — clusters

Message middleware, just as its name implies is used to process the message middleware services, it provides a system of communication interaction between channels, such as the sender only needs to want to transfer the information to the message middleware, and send the agreement, manner, appeared in the process of sending the network, the fault and so on, are handled by the middleware, It is therefore responsible for ensuring the reliable transmission of information.

Therefore, message middleware is a kind of technology used to accept data, store data and send data. It provides a variety of functions, which can realize the high availability and reliability of messages, and also provides a good fault tolerance mechanism. Can program on the occupation of system resources, as well as the improvement of transmission efficiency has a great help.

  • Different MQ has different characteristics and directions in which it is better, not better, but better.

1.2.1 Application Scenarios of Message Queues

According to the needs of the business, it can be applied in a variety of scenarios, such as decoupling, peak load cutting, broadcasting, etc. Let’s take two scenarios to comb through the simple process

1.2.1.1 Service decoupling

Recently, I am considering buying several books to read. For example, when I click buy, there may be such a string of business logic execution, ① minus the inventory capacity, ② generate an order, ③ pay, ④ update the order status, ⑤ send the message of successful purchase, ⑥ update the status of goods express delivery. In the early stage, we can make these services synchronous execution, but in the later stage, in order to improve efficiency, we can separate the tasks that need to be executed immediately and tasks that can be executed slightly slowly, such as ⑤ sending the message of successful purchase ⑥ updating the status of goods express delivery, and different execution can be considered. After the execution of the main process, these slower-running transactions can be determined to have executed by sending a message to MQ, ensuring that the flow ends first. Other business can then be performed asynchronously by pulling MQ messages or MQ active push.

1.2.1.2 Peak cutting and valley filling

For example, to send an announcement message with a read and unread identifier, you need to write such a message for each user, such as storing it in MongoDB. Even MongoDB cannot support instantaneous writing of millions or tens of millions of records, so you can consider using message queues. For example, we can use asynchronous multithreading on a Java backend system to send messages to message queue MQ so that the Web system can publish bulletin messages without the normal CRUD operations of the database. The system messages are stored in the message queue, which we use for peak clipping, and the system messages are ultimately stored in the database. So we can design this, when the user logs in the system, using asynchronous threads from message queue MQ, receive the user’s system message, and then store the system message in the database, and finally the message in message queue MQ automatically deleted. The task of writing messages to the database becomes an off-peak write because of off-peak logins by the user.

1.3 What is RabbitMQ

RabbitMQ is an open source message queue system written in Erlang and compliant with the AMQP protocol. It supports multiple clients (languages) and is used to store and forward messages in distributed systems. RabbitMQ is highly available, scalable, and easy to use.

For more details, please visit the official website:

  • www.rabbitmq.com/

Anyway, this is a common message queue, and its features will be explained in detail, starting with the getting started download and install section, and then using it.

2. Download and install

Generally speaking, there are manual installation and Docker installation. In most scenarios, Docker installation will be used. But as a learning stage, if you are not in a hurry, it is not a bad thing to learn manual installation.

Note: Both cloud servers and virtual machines can be used. The Linux version in the demo is CentOS 7.9

2.1 Manual Installation

2.1.1 Downloading the installation process

Note: you can install the file directly through yum in Linux. Here you choose to download the file on your Own Windows host first, and then upload it to Linux through FTP. You can avoid some of the virtual machine download problems caused by the network.

  1. First open the download directory on the official website, and then select the version based on your Linux version.

    • Address: www.rabbitmq.com/download.ht…
  1. Since RabbitMQ is written in Erlang, you will need to provide an Erlang environment and then download Erlang.

    • Address:www.erlang-solutions.com/downloads
      • A: This site is very slow to access, please be patient, or need to hang up A ladder
      • B: The Erlang version needs to match RabbitMQ (as shown, with minimum and maximum version restrictions)
        • Version view address: www.rabbitmq.com/which-erlan…
        • RabbitMQ 3.8.14 and Erlang 23.2.3 have been selected
[root@centos7 rabbitmq]# ls esl-erlang_23.2.3-1_centos_7_amd64. RPM rabbitmq-server-3.8.14-1. El7.noarch. RPM [root@centos7  rabbitmq]# pwd /usr/local/bin/rabbitmqCopy the code
  1. Install Erlang, Socat and RabbitMQ
    • Erlang and Socat are both dependent on RabbitMQ
#Install Erlang. After installation, run erl -v to display the version numberThe RPM - the ivh esl - erlang_23. 2.3 1 _centos_7_amd64. RPM
#Instead of downloading the source file, you can install Socat online via yum because it is not large
yum install -y socat

#Install the RabbitMQThe RPM - the ivh the rabbitmq server - 3.8.14-1. El7. Noarch. RPMCopy the code
  1. After the installation is complete, start the service and check whether RabbitMQ can be started successfully
#Start the service
systemctl start rabbitmq-server
#Boot from the rev.
systemctl enable rabbitmq-server
#Stop the service
systemctl stop rabbitmq-server
#Viewing Service Status
systemctl status rabbitmq-server.service
Copy the code

As shown in the figure, the installation is successful

If the installation is incorrect, troubleshoot the fault as follows:

  • Linux RabbitMQ installation problem handling
  • Rabbitmq ERROR: epMD ERROR for host deb: address (cannot connect to host/port

2.1.2 Configuring Web UI Management

The above installation is actually over, but RabbitMQ provides a web-based management interface, which is not available by default and needs to be installed.

  1. Install the Web administration plug-in, and restart the service
#Install command
rabbitmq-plugins enable rabbitmq_management

#Restart the service
systemctl restart rabbitmq-server
Copy the code
  1. Make sure port 15672 of the Linux firewall is open, otherwise it will not be accessible. You can even query commands to turn the firewall off during the learning phase
    • The corresponding server (Ali cloud, Tencent cloud, etc.) is to open 15672 ports in the security group
    • Access Linux IP:15672, for examplehttp://192.168.122.1:15672
#Check whether 15672 is enabled. The default value is No
firewall-cmd --query-port=15672/tcp
#Open the specified port 15672 
firewall-cmd --add-port=15672/tcp --permanent
#Reload the
firewall-cmd --reload
#Query again, and the result is yes
firewall-cmd --query-port=15672/tcp
Copy the code
  1. Example Add a remote login account

    • RabbitMQ has a default username and password guest that can only be accessed under localhost
#New user The user name and password are admin
rabbitmqctl add_user admin admin
Copy the code
  1. Add permissions for remote login accounts
    • Administrator: Log in to the console and view all information, operation users, and operation policies
    • Monitoring: Log in to the console and view all information
    • Policymaker: Log in to the console and specify policies
    • Managment (common administrator) : Log in to the console
#Set the assignment permission of the admin user to administrator
rabbitmqctl set_user_tags admin administrator
Copy the code
  1. Add resource permissions for users
    • Because admin is already the super administrator, so in fact, you can not assign resource permissions, will do by default.
#The command format is: set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
#The configuration file and read/write permissions are enabled for the admin user
rabbitmqctl set_permissions -p / admin ".*"".*"".*"
Copy the code
  1. Access Linux IP:15672, for examplehttp://192.168.122.1:15672Enter the user name and password admin
    • As shown in the figure, the access is successful

2.1.2.1 Command Summary

  1. Adding a user:rabbitmqctl add_user <username> <password>
  2. Change password:rabbitmqctl change_password <username> <newpass>
  3. Delete a user:rabbitmqctl delete_user <username>
  4. User List:rabbitmqctl list_users
  5. Set user roles:rabbitmqctl set_user_tags <username> <tag1,tag2>
  6. Delete all roles of a user:rabbitmqctl set_user_tags <username>
  7. Add resource permissions to users:set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

Use: Enter rabbitmqctl to prompt for possible commands, then use rabbitmqctl hepl < command > to view the specific command usage and parameters.

2.1.3 This section describes Web UI management

  • Connections: This is used to manage producers and consumers once they are connected to RabbitMQ
  • Channels: After a connection is established, Channels are formed, and message delivery and acquisition depend on Channels.
  • Exchanges: Used to route messages
  • Queues: Queues where messages are stored, waiting to be consumed and then removed from the queue.
  • Admin: used to set management users and their permissions, as shown in the following figure

Tags are used to specify the role of the user

  • Administrator: Log in to the console and view all information, operation users, and operation policies
  • Monitoring: Log in to the console and view all information
  • Policymaker: Log in to the console and specify policies
  • Managment (common administrator) : Log in to the console

2.2 Docker installation

Install RabbitMQ in Docker without having to worry about version, environment, etc. It is very convenient. The vm I showed you was a CentOS 7.9 bare machine, so we updated yum, Install Docker and RabbitMQ

2.2.1 configuration yum

  1. Update yum to the latest version
#Update the yum!
yum update

#Yum-utils provides yum-config-manager functionality, the latter two being used by Devicemapper
yum install -y yum-utils device-mapper-persistent-data lvm2
Copy the code
  1. Set the yum source to Ali Cloud
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
Copy the code

2.2.2 installation docker

2.2.2.1 steps

  1. Install Docker with yum
    • Docker-ce stands for Community edition, EE stands for enterprise edition
yum install docker-ce -y
Copy the code
  1. Check whether the installation is successful by viewing the version
docker -v
Copy the code
  1. Docker image acceleration (< your ID > = your own)
sudo mkdir -p /etc/docker sudo tee /etc/docker/daemon.json <<-'EOF' { "registry-mirrors": ["https://< your ID>.mirror.aliyuncs.com"]} EOF sudo systemctl daemon-reload sudo systemctl restart dockerCopy the code
  • Sometimes it is difficult to pull the image from DockerHub in China, so you can configure the image accelerator. Docker official and many domestic cloud service providers provide domestic accelerator services, such as:

    • Mirror: at hkust docker.mirrors.ustc.edu.cn/
    • Netease: hub-mirror.c.163.com/
    • Aliyun: https://< your ID>.mirror.aliyuncs.com
    • Seven niuyun accelerator: reg-mirror.qiniu.com

    After an accelerator address is configured, if the mirror cannot be drawn, switch to another accelerator address. All major cloud service providers in China provide Docker image acceleration service. It is recommended to select the corresponding image acceleration service according to the cloud platform running Docker.

    Address for obtaining Aliyun image: cr.console.aliyun.com/cn-hangzhou…

2.2.2.2 Common Docker Commands

2.2.2.2.1 Managing Commands

  • Start, stop, restart these simple commands can also use service, systemctl is a little more powerful
#Start the docker
systemctl docker start
#Stop the docker
systemctl docker stop
#Restart the docker
systemctl docker restart
#Check the docker status
systemctl status docker
#Boot from the rev.
systemctl enable docker
systemctl unenable docker
Copy the code

2.2.2.2.2 Mirroring Commands

#Importing an Image File
docker load < xxx.tar.gz
#View the installed image
docker images
#Remove the mirrorDocker rmI image nameCopy the code

2.2.3 Installing RabbitMQ (Optional)

Note: it would be better to use 2.2.3.2 for installation

2.2.3.1 Step by step installation

  1. Get the RabbitMQ image
docker pull rabbitmq:management
Copy the code
  1. Create and run the container (the parameters are described in 3)
Docker run-id --name Specifies the container name. -p 15672:15672 -p 5672:5672 RabbitMQ :managementCopy the code

2.2.3.2 One-sentence Installation

There will be a problem with creating the RabbitMQ image because we will install management, which is the web management for the RabbitMQ image. If we do not install the RabbitMQ image, there will be no users. The Docker Hub has given us an example of how to configure the user name and password using RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS

For more information, please refer to the section of Setting Default User and password in the official Docker Hub example

registry.hub.docker.com/_/rabbitmq/

  1. Perform the installation
docker run -di --name myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
Copy the code
  1. You can view the container status to check whether the container is running successfully
#View the container running status
docker ps -a
#Start theDocker start Specifies the container name#stopDocker stop container name#Exit the command line without stopping
exit
#Enter the Node container (if -t is enabled)Docker exec -it container name bashCopy the code

2.2.3.2.1 Parameter Description

The following are the descriptions of these parameters:

  • -i: running a container.

  • -t: indicates that the interaction mode (command line) is reserved for the container, that is, assigning a dummy terminal. So you’ll often see it.

  • –name: Name the container.

  • -v: indicates directory mapping (the former is a host directory, and the latter is a directory mapped to the host). You can use multiple -v to map multiple directories or files. Note: It is recommended to do directory mapping, make changes on the host, and then share to the container.

  • -d: indicates that a daemon container is created and run in the background (in this way, the container will not be automatically logged in after the container is created. If only two parameters of -i -t are added, the container will be automatically logged in after the container is created), that is, the back-end suspended operation.

  • -p: indicates port mapping. The former is a host port, and the latter is a mapped port in the container. Multiple -p can be used to perform multiple port mappings. Only port mappings can be performed to access external devices.

To give you an example:

#Create container, map container port 3000 to host port 3000, map /demo to host /demo face is a ready-made image I downloaded
docker run -d -it -p 3000:3000 -v /demo:/demo --name node face

#For example, if you have a Python program that needs to be executed in the image named Node, you can run the following command to go to the command line you just assigned to execute the program
docker exec -it node bash
Copy the code
  • Because -t is used, it is possible to assign a dummy terminal to the command line through the docker exec-it container name bash

  • After the -v directory is mapped, when you enter the container, you also have an identical demo folder where you can execute Python programs, for example

2.2.3.2.1 Ports Description

4369: Erlang found port

5672: Client communication port

15672: Management interface UI port

25672: Internal communication port between servers

61613: STOMP client without TLS and with TLS

1883: MQTT client for TLS is disabled and enabled

The key ones are 5672 and 15672

More details about the port can be found in the official documentation

  • www.rabbitmq.com/networking….

Note: If you want to access the remote connection, such as port 15672 for accessing the Web management page, or port 5672 for connecting the Java client, you must perform an open operation, otherwise the connection cannot be made.

  • The following is an example of opening port 15672 based on CentOS 7.9
#Check whether 15672 is enabled. The default value is No
firewall-cmd --query-port=15672/tcp
#Open the specified port 15672 
firewall-cmd --add-port=15672/tcp --permanent
#Reload the
firewall-cmd --reload
#Query again, and the result is yes
firewall-cmd --query-port=15672/tcp
Copy the code
  • Here are the commands to turn off the firewall
systemctl disable firewalld
systemctl stop firewalld   
Copy the code

3. RabbitMQ protocol and model

Once installed, it is time to move on to the various ways to implement RabbitMQ using Java or Springboot code, but understanding the protocol and architectural model is necessary to understand these routing and switching methods.

3.1 protocol

3.1.1 What is a Protocol?

Protocol, short for network protocol, is a set of conventions that both sides of a communicating computer must follow. How to establish a connection, how to recognize each other, etc. Only by following this convention can computers communicate with each other. Its three elements are: syntax, semantics and timing.

In order for data to travel from source to destination over the network, participants in the network communication must follow the same rules, called protocols, which are ultimately reflected in the format of packets transmitted over the network.

3.1.1.1 Three elements of network protocol

  1. Syntax: The structure and format of data and control information, and the order in which the data appears.
  2. Semantics: Explains the meaning of each part of the control message and specifies what control message needs to be sent and what response to the completed action.
  3. Sequence: A detailed description of the order in which events occur.

These three elements are graphically described as: what to do, how to do it, and in what order.

Take the HTTP protocol

Syntax: HTTP defines the format semantics of request messages and response messages. When a client initiates a request, it is called a request, and the server then returns data, which is called response timing. Each request corresponds to a response, and there is a request before a response

3.1.1.1.1 Interview question: Why does message-oriented middleware not use HTTP directly

For a message middleware, its main responsibility is responsible for data transfer, storage, distribution, high performance and simple is what we are after, the HTTP request message and reply message header is more complex, containing the Cookie, data encryption, windowsill, additional functions such as the response code, we don’t need so complicated function.

At the same time, HTTP is mostly short links. In the actual interaction process, a request to the response is likely to be interrupted. After the interruption, persistence will not be implemented, which will cause the loss of the request. This is not conducive to the business scenario of message-oriented middleware, because message-oriented middleware may be a long-term process of obtaining information, and data or messages need to be persisted when problems or failures occur, so as to ensure the highly reliable and robust operation of messages and data

3.1.2 AMQP protocol for RabbitMQ

RabbitMQ uses the Advanced Message Queuing Protocol (AMQP). It was first proposed in 2003 to solve the problem of messaging between different financial platforms.

AMQP is more accurately described as a binary wire-level protocol. This is an essential difference from JMS. AMQP does not qualify from the API layer, but directly defines the data format for network exchange. This makes providers (Producer) that implement AMQP naturally cross-platform.

Compared with other message protocols, its characteristics are:

  1. Distributed transaction support
  2. Persistent support for messages
  3. High performance and high reliability message processing benefits

3.1.3 Architectural model

The model diagram must be understood in order to learn the specific sending modes of the following messages, because these modes represent varying degrees of selection and reduction of the model

  • Connection: Network Connection between an application and a Broker.

  • Channel: A Channel is a Channel for information transmission. Multiple channels can be established, each representing a session task.

    • A channel is a virtual connection established within a TCP connection, through which the read and write of information is transmitted. Because it is very expensive for the operating system to establish and destroy TCP, the concept of a channel is introduced to reuse a TCP connection.
  • Broker(Server) : Identifies the message queue Server entity, for example, RabbitMQ Server.

  • Virtual Host: A Virtual Host. Multiple Virtual hosts can be set in a Broker to be used as permission isolation for different users.

    • A Broker can be understood as an entire database service, and a Virtual Host is the feel of each database. Different projects can correspond to different databases, with business tables to which the project belongs, and so on.
    • Each Virtual Host can have several exchanges and queues.
  • Exchange: a switch that receives messages sent by producers and then sends them to queues based on routing keys.

  • Binding: Virtual link between exchanges and queues. Binding can contain multiple Routing keys.

  • Routing key: A Routing rule that the virtual machine uses to determine how to route a particular message.

  • Queue: A message Queue. It is a container for messages. Each message can be sent to one or more queues to be consumed by consumers.

  • Consumer: The Consumer of the message (the program that receives the message).

4. Java implements RabbitMQ

4.1 Environment Construction

Website to introduce several kinds of models: www.rabbitmq.com/getstarted….

Up to now, the official website has provided a total of 7 models. We mainly introduce the first five basic modes. Some people classify Direct and Topic modes into Routing modes, which can also be regarded as four types.

4.1.1 Creating a Java Project

Start by creating a Maven project without a skeleton, then introduce RabbitMQ dependencies and unit test dependencies

<dependency>
   <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.11</version>
</dependency>
Copy the code

4.1.2 (Optional) Creating a Virtual Host

Here, we create a new Virtual Hosts to serve the Java project. You can also create a new user and enable access to this Virtual Hosts (that is, bind the Virtual host to the user). We’ll use admin (the admin user I created earlier) for the demo.

4.1.3 Creating a Connection Utility Class

Since we are going to show multiple examples later, and every time we acquire a connection, release a connection, close a resource, etc., the code is the same, to prevent code redundancy, optimize the code, make it easier to understand, extract a utility class, so that you can focus on the comparison of different implementations.

  • RabbitMqUtil tools
public class RabbitMqUtil {
    /** * The host name is the Linux IP address */
    private static String host = "";
    /** * The default port number for client access is 5672 */
    private static int port = 0;
    /** * Virtual host can be set to default/or create a specified virtual host */
    private static String virtualHost = "";
    /** * User name */
    private static String username = "";
    /** * Password */
    private static String password = "";

    // Assign a value to the Properties object using a static code block
    static {
        try {
            // instantiate the object
            Properties properties = new Properties();
            // Get the stream object of the properties file
            InputStream in = RabbitMqUtil.class.getClassLoader().getResourceAsStream("rabbitmq.properties");
            properties.load(in);
            // Get values, respectively
            host = properties.getProperty("host");
            port = Integer.parseInt(properties.getProperty("port"));
            virtualHost = properties.getProperty("virtualHost");
            username = properties.getProperty("username");
            password = properties.getProperty("password");

        } catch(Exception e) { e.printStackTrace(); }}/** * get connection **@returnConnect * /
    public static Connection getConnection(a) {
        try {
            // Create a connection factory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            // Set up the connection to the RabbitMQ host
            connectionFactory.setHost(host);
            // Set the port number
            connectionFactory.setPort(port);
            // Set up the connected virtual host (database feel)
            connectionFactory.setVirtualHost(virtualHost);
            // Set the user name and password for accessing the virtual host
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            // Return a new connection
            return connectionFactory.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /** * Close channels and release connections **@param channel    channel
     * @param connection connection
     */
    public static void close(Channel channel, Connection connection) {
        try {
            if(channel ! =null) {
                channel.close();
            }
            if(connection ! =null) { connection.close(); }}catch(Exception e) { e.printStackTrace(); }}}Copy the code
  • properties
host=192.168.122.1
port=5672
virtualHost=/rabbitmq_maven_01
username=admin
password=admin
Copy the code

4.2 Five implementation modes

Description:

  • Queue name, message and so on string content, it is recommended to define variables in the input, I am writing directly in the parameter, this magic value writing method, is not very beautiful.
  • Junit unit tests are used in producers, but written in the main function in consumers, because we expect consumers to be in a state of constant running wait, which would result in the program ending after execution.
    • In addition to writing in main, consider using sleep wait or while(true) to keep the program from terminating.

4.2.1 Simple Queue Mode (Hello Word)

  • Queue: message queue, understood as a container to which producers send messages, which are stored for consumption by consumers.
  • Consumer: consumer of the message (the program that receives the message)

4.2.1.1 What Can I Understand

As shown in the diagram, the simple queue pattern, one producer, passes through one queue, corresponding to one consumer. It can be regarded as a point-to-point transmission mode. Compared with the model diagram in 3.1.3, the most important feature is that Exchange (switch) and Routekey (routing key) cannot be seen. Just because this mode is simple, it does not involve complex conditional distribution, etc. Therefore, there is no need for users to explicitly consider the switch and routing keys.

  • Note, however, that instead of the producer directly connecting to the queue, the default switch is used. The default switch will send messages to the queue with the same name as Routekey, which is why we put the queue name in the routekey position in the following code

4.2.1.2 Code implementation

4.2.1.2.1 Producer code

public class Producer {
    @Test
    public void sendMessage(a) throws IOException, TimeoutException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        Channel channel = connection.createChannel();
        // Channels bind message queues
        channel.queueDeclare("queue1".false.false.false.null);
        // Publish the message
        channel.basicPublish(""."queue1".null."This is rabbitmq message 001 !".getBytes());
        // Close the channel and release the connection through the toolRabbitMqUtil.close(channel,connection); }}Copy the code
  1. Get the connection through the utility class
  2. Access to the connection channel: According to the model diagram in 3.1.3, the producer needs to obtain the channel after obtaining the connection to access the subsequent switch queue, etc.
  3. Channels bind message queues: Before binding queues, you should bind switches. However, in this mode, the concept of switches is hidden and the default switch is used. Therefore, queues are directly bound.
    • QueueDeclare method explanation
      • Parameter 1: Queue (queue name). If the queue does not exist, it will be created automatically.
      • Parameter 2: durable (Whether the queue is durable). Persistence ensures that the queue persists after the server restarts.
      • Parameter 3: EXCLUSIVE specifies whether the queue is exclusive. If true, the queue is visible only to the first connection that claims it and is automatically deleted when the connection is disconnected.
      • Parameter 4: autoDelete, which automatically deletes the queue after the last consumer has consumed the message.
      • Parameter 5: Arguments (with additional attributes).
  4. news: You can specify the sending method and content of the message queue. Because this mode is relatively simple, it does not involve all parameters. The following mode will explain in detail
    • BasicPublish method explanation
      • Parameter 1: Exchange (switch name).
      • Parameter 2: routingKey, where the queue name is specified. The message is sent to a queue with the same name as routekey.
      • Parameter 3: props (control state of the message), where you can control the persistence of the message.
        • Parameters as follows: MessageProperties PERSISTENT_TEXT_PLAIN
      • Parameter 4: body, whose type is an array of bytes to be typed.
  5. Close a channel and release a connection with a tool: close the channel, then release the connection.

4.2.1.2.2 Consumer code

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException{
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        Channel channel = connection.createChannel();
        // Channels bind message queues
        channel.queueDeclare("queue1".false.false.false.null);
        // Consume messages
        channel.basicConsume("queue1".true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body): " + newString(body)); }}); }}Copy the code
  1. Get the connection through the utility class

  2. Get connection channel

  3. Channels bind message queues

  4. Consume messages: This is used to specify which queue messages are consumed, as well as some mechanisms and callbacks

    • BasicConsume method explained
      • Parameter 1: Queue (queue name), which queue messages are consumed.
      • Parameter 2: autoAck The automatic acknowledgement mechanism that starts a message and removes it from the queue as soon as it is consumed.
      • Parameter 3: callback (Consumer callback interface). Callback is of type Consumer. DefaultConsumer is an implementation class of Consumer. Rewrite the handleDelivery method to obtain the data content of consumption. The body is mainly used here, that is, to view the message body. The other three parameters are not available for the time being, so you can print them out if you are interested, so that you can have a general understanding.

4.2.2 Work Queue Mode

  • Producer: Producers of messages (programs that send messages).

  • Queue: Message Queue, understood as a container to which producers send messages, which are stored for consumption by consumers.

  • Consumer: The Consumer of the message (the program that receives the message).

    • Here, we assume that Consumer1, Consumer2, and Consumer3 are consumers with different task speeds, respectively, which leads to an important problem with this pattern.

4.2.2.1 What Can I Understand

As can be seen from the figure, the working mode is to add multiple consumers on the basis of the simple queue mode, that is, to bind multiple consumers to the same queue and consume together, which can solve the message accumulation phenomenon in the simple queue mode if the production speed is much faster than the consumption speed.

  • Because messages disappear after they are consumed, you don’t have to worry about repeating the task.

4.2.2.2 Code implementation

Note: There are two work queue modes

  1. Polling mode: Each consumer shares the message equally
  2. Equitable distribution model: distribution by capacity, distribution of fast processing more, less distribution of slow processing

We first demonstrate the polling pattern, which, based on its shortcomings, leads to the fair distribution pattern

The following describes only the parts that differ from the above. In simple mode, these basic methods are covered

4.2.2.2.1 Polling mode – Producer code

public class Producer {
    @Test
    public void sendMessage(a) throws IOException, TimeoutException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        Channel channel = connection.createChannel();
        // Channels bind message queues
        channel.queueDeclare("work".true.false.false.null);
        for (int i = 1; i <= 20; i++) {
            // Publish the message
            channel.basicPublish(""."work".null, (i + "Message").getBytes());
        }
        // Close the channel and release the connection through the toolRabbitMqUtil.close(channel, connection); }}Copy the code

The process is basically the same as the simple queue pattern, with a few minor changes. In the producers, the main thing is to add a layer loop, because there are multiple consumers, so more messages are sent, and you can see some characteristics and problems.

4.2.2.2.2 Polling mode – consumer code

  • Consumer 1
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        final Channel channel = connection.createChannel();
        // Channels bind message queues
        channel.queueDeclare("work".true.false.false.null);
        // Consume messages
        channel.basicConsume("work".true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Consumer 1: Consumer -" + newString(body)); }}); }}Copy the code
  • Consumer 2
public class Consumer2 {
    public static void main(String[] args) throws IOException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        final Channel channel = connection.createChannel();
        // Channels bind message queues
        channel.queueDeclare("work".true.false.false.null);
        // Consume messages
        channel.basicConsume("work".true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2: Consumer -" + newString(body)); }}); }Copy the code

Both consumers above start automatic Ack responses in basicConsume, which will be detailed below. Meanwhile, statements of sleep 2S are added in Consumer 1 to simulate the scenario where consumer 1 processes messages slowly and consumer 2 processes messages quickly.

Running results:

  • Consumer1
consumers1No. : Consumption -1Digital message consumer1No. : Consumption -3Digital message consumer1No. : Consumption -5Digital message consumer1No. : Consumption -7Digital message consumer1No. : Consumption -9Digital message consumer1No. : Consumption -11Digital message consumer1No. : Consumption -13Digital message consumer1No. : Consumption -15Digital message consumer1No. : Consumption -17Digital message consumer1No. : Consumption -19No messageCopy the code
  • Consumer2
consumers2No. : Consumption -2Digital message consumer2No. : Consumption -4Digital message consumer2No. : Consumption -6Digital message consumer2No. : Consumption -8Digital message consumer2No. : Consumption -10Digital message consumer2No. : Consumption -12Digital message consumer2No. : Consumption -14Digital message consumer2No. : Consumption -16Digital message consumer2No. : Consumption -18Digital message consumer2No. : Consumption -20No messageCopy the code

Observe the execution: Found two consumers while everyone respective dealt with half of the last message, and it is a distribution according to one person, but consumers processing speed is fast, 2 immediately after all done, but consumer 1, every process need 2 s, so can only slow processing, and consumers. 2 is in the midst of an idle waste.

How do you switch to fair distribution?

This is related to the second parameter in basicConsume, to enable auto-confirmation consume, which defaults to true, meaning that as soon as I receive a message to the consumer in the queue, I will automatically return an acknowledgement consume, and the queue will automatically delete the message in the queue.

  • However, there is a very important problem in this way, which is to hand over the risk to the consumer. For example, the consumer receives 10 messages that he needs to process, consumes 4 messages, breaks down, hangs up, and loses the last 6 messages.

If you want to change to a merit-based allocation, there are two main points

  1. Set the channel to consume only one message at a time

  2. Disable automatic confirmation of messages and confirm messages manually

4.2.2.2.3 Fair distribution mode – Producer code

public class Producer {
    @Test
    public void sendMessage(a) throws IOException, TimeoutException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        Channel channel = connection.createChannel();
        // Send only one message at a time
        channel.basicQos(1);
        // Channels bind message queues
        channel.queueDeclare("work".true.false.false.null);
        for (int i = 1; i <= 20; i++) {
            // Publish the message
            channel.basicPublish(""."work".null, (i + "Message").getBytes());
        }
        // Close the channel and release the connection through the tool
        RabbitMqUtil.close(channel, connection);
    }
Copy the code

4.2.2.2.4 Fair distribution model – consumer code

  • Consumer 1
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        final Channel channel = connection.createChannel();
        Accept only one unconfirmed message at a time
        channel.basicQos(1);
        // Channels bind message queues
        channel.queueDeclare("work".true.false.false.null);
        // Consume messages
        channel.basicConsume("work".false.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Consumer 1: Consumer -" + new String(body));
                // Returning deliveryTag means the queue can remove the message
                channel.basicAck(envelope.getDeliveryTag(), false); }}); }}Copy the code
  • Consumer 2
public class Consumer2 {
    public static void main(String[] args) throws IOException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        final Channel channel = connection.createChannel();
        // Step 1: Accept only one unconfirmed message at a time
        channel.basicQos(1);
        // Channels bind message queues
        channel.queueDeclare("work".true.false.false.null);
        // Consume messages
        channel.basicConsume("work".false.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2: Consumer -" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false); }}); }Copy the code

Running results:

  • Consumer1
consumers1No. : Consumption -1No messageCopy the code
  • Consumer2
consumers2No. : Consumption -2Digital message consumer2No. : Consumption -3Digital message consumer2No. : Consumption -4Digital message consumer2No. : Consumption -5Digital message consumer2No. : Consumption -6Digital message consumer2No. : Consumption -7Digital message consumer2No. : Consumption -8Digital message consumer2No. : Consumption -9Digital message consumer2No. : Consumption -10Digital message consumer2No. : Consumption -11Digital message consumer2No. : Consumption -12Digital message consumer2No. : Consumption -13Digital message consumer2No. : Consumption -14Digital message consumer2No. : Consumption -15Digital message consumer2No. : Consumption -16Digital message consumer2No. : Consumption -17Digital message consumer2No. : Consumption -18Digital message consumer2No. : Consumption -19Digital message consumer2No. : Consumption -20No messageCopy the code

4.2.3 Publish and Subscribe Mode (Fanout Broadcast)

  • Producer: The producer of the message (the program that sends the message).
  • Exchange: switch that sends messages to a specified queue.
  • Queue: message queue, understood as a container to which producers send messages, which are stored for consumption by consumers.
  • Consumer: consumer of the message (the program that receives the message)

4.2.3.1 How to Understand

Fanout literal translation for “fan out” but everyone more will call it a broadcast or publish and subscribe, it is a kind of no routing key mode, the producer sends a message to exchange, exchange opportunities to copy all messages to all with its binding on a queue, and each queue can have only one consumer get the message, If multiple channels are created within a consumer connection, the result is a scramble for messages.

4.2.3.2 Code implementation

Note: The following describes only the parts that differ from the above. In simple mode, these basic methods are covered

4.2.3.2.1 Producer code

public class Producer {
    @Test
    public void sendMessage(a) throws IOException, TimeoutException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        final Channel channel = connection.createChannel();
        // Declare a switch
        channel.exchangeDeclare("order"."fanout");
        for (int i = 1; i <= 20; i++) {
            // Publish the message
            channel.basicPublish("order"."".null."fanout!".getBytes());
        }
        // Close the channel and release the connection through the toolRabbitMqUtil.close(channel, connection); }}Copy the code
  1. Declaring a switch

    • ExchangeDeclare Method description
      • Parameter 1: Exchange (switch name). If a switch does not exist, it will be created automatically
      • Parameter 2: type, select fanout mode
  2. Publish message: Enter the name of the switch defined above in the first argument of the basicPublish method, and the routing key is empty in the second argument

    • The loop of 20 is to demonstrate the consumer

4.2.3.2.2 Consumer code

  • Consumer 1
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // Declare a switch
        channel.exchangeDeclare("order"."fanout");
        // Create temporary queues
        String queue = channel.queueDeclare().getQueue();
        // Bind temporary queues and switches
        channel.queueBind(queue, "order"."");
        // Consume messages
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1: Consumer -" + newString(body)); }}); }}Copy the code
  1. Declaring a switch
  2. Create temporary queues
  3. Bind temporary queues and switches
    • QueueBind method explanation
      • Parameter 1: Queue (temporary queue)
      • Parameter 2: Exchange
      • Parameter 3: routingKey
  • Consumer 2: Demonstrates multiple channels in a single connection
public class Consumer2 {
    public static void main(String[] args) throws IOException {
       // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        
        // Get the connection channel
        Channel channel = connection.createChannel();
        Channel channel2 = connection.createChannel();
        
        // Declare a switch
        channel.exchangeDeclare("order"."fanout");
        channel2.exchangeDeclare("order"."fanout");
        
        // Create temporary queues
        String queue = channel.queueDeclare().getQueue();
        System.out.println(queue);
        
        // Bind temporary queues and switches
        channel.queueBind(queue, "order"."");
        channel2.queueBind(queue, "order"."");
        
        // Consume messages
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2: Consumer -" + newString(body)); }});// Consume messages
        channel2.basicConsume(queue, true.new DefaultConsumer(channel2) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2-2: Consumer -" + newString(body)); }}); }}Copy the code

Running results:

consumers2No. : Consumption-fanout! consumers2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout! consumers2No. : Consumption-fanout! consumers2No. : Consumption-fanout! consumers2No. : Consumption-fanout! consumers2No. : Consumption-fanout! consumers2No. : Consumption-fanout! consumers2No. : Consumption-fanout! consumers2No. : Consumption-fanout! consumers2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout! consumers2-2No. : Consumption-fanout!Copy the code

4.2.3.2.3 Why Are Switches Also declared among Consumers?

As can be seen from the above code, both Producer and Conusmer declare switches respectively. However, as can be seen from the figure, consumers do not have direct contact with switches. Why do consumers also declare switches?

This is to ensure that when a Producer or Producer executes, it never fails because the switch has not yet been declared. For example, if you only declare the switch in the Producer, you must start the Producer first. If you execute Conusmer directly, the switch does not yet exist and an error is reported. A full write declaration guarantees that whoever is started first will be declared to the switch.

4.2.4 Routing Mode (Routing/Direct)

  • Producer: The producer of the message (the program that sends the message).
  • Exchange: switch that sends messages to a specified queue.
  • routingKey: Routing keys (key1, key2, etc.) add another layer of restriction between the switch and the queue
  • Queue: message queue, understood as a container to which producers send messages, which are stored for consumption by consumers.
  • Consumer: consumer of the message (the program that receives the message)

4.2.4.1 What Can I Understand

The switch type in routing mode is Direct. Compared with fanout mode, routing keys are used. The producer sends a message to the switch carrying the specified routingKey, which the switch takes to find the queue bound to the routingKey, and then sends it to the queue. A queue can bind multiple Routingkeys.

4.2.4.2 Code implementation

4.2.4.2.1 Producer code

public class Producer {
    @Test
    public void sendMessage(a) throws IOException, TimeoutException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        Channel channel = connection.createChannel();
        // Declare a switch
        channel.exchangeDeclare("order_direct"."direct");
        / / specified routingKey
        String key = "info";
        // Publish the message
        channel.basicPublish("order_direct", key, null, ("Send to specified route" + key + "The news").getBytes());
        // Close the channel and release the connection through the toolRabbitMqUtil.close(channel, connection); }}Copy the code
  1. Specify the routingKey, that is, in the second argument to the basicPublish method, the value of the key

4.2.4.2.2 Consumer code

  • Consumer 1
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // Declare a switch
        channel.exchangeDeclare("order_direct"."direct");
        // Get the temporary queue
        String queue = channel.queueDeclare().getQueue();
        // Bind temporary queues and switches
        channel.queueBind(queue, "order_direct"."info");
        channel.queueBind(queue, "order_direct"."error");
        channel.queueBind(queue, "order_direct"."warn");
        // Consume messages
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1: consumption -" + newString(body)); }}); }}Copy the code
  1. The key value is added when the queue is bound to the switch
  • Consumer 2
public class Consumer2 {
    public static void main(String[] args) throws IOException {
         // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        Channel channel = connection.createChannel();
        // Declare a switch
        channel.exchangeDeclare("order_direct"."direct");
        // Get the temporary queue
        String queue = channel.queueDeclare().getQueue();
        // Bind temporary queues and switches
        channel.queueBind(queue, "order_direct"."error");
        // Consume messages
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2: Consumption -" + newString(body)); }}); }}Copy the code

Run result: Only consumer 1 received the message

Consumer 1: consumption - the message sent to the specified route INFOCopy the code

4.2.5 WildCard Matching Mode (Topic)

  • Producer: The producer of the message (the program that sends the message).
  • Exchange: switch that sends messages to a specified queue.
  • routingKey: Routing keys (key1, key2, etc.) add another layer of restriction between the switch and the queue
    • However, the key in Topic is in the form of wildcard characters, which can greatly improve efficiency
  • Queue: message queue, understood as a container to which producers send messages, which are stored for consumption by consumers.
  • Consumer: consumer of the message (the program that receives the message)

4.2.5.1 What Can I Understand

The wildcard matching mode is topic, which is similar to the Direct mode and is sometimes referred to as the routing mode. The difference between the two modes is that the routingKey of the Direct mode is a specified value. A Topic pattern routingKey can use wildcards, and generally consists of one or more words with a “between”. Split, for example ideal. Insert.

  • *: Matches exactly one word, for example:order.*Can be matched to order.insert
  • #: Matches one or more words, for example:order.#It matches order.insert.mon
    • #It’s like a multi-layered concept, and*It’s just a single layer concept

4.2.5.2 Code implementation

4.2.5.2.1 Producer code

public class Producer {
    @Test
    public void sendMessage(a) throws IOException, TimeoutException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("order_topic"."topic");
        // Declare a switch
        String key = "user.query.all";
        // Publish the message
        channel.basicPublish("order_topic", key, null, ("Send to specified route" + key + "The news").getBytes()); RabbitMqUtil.close(channel, connection); }}Copy the code

4.2.5.2.2 Consumer code

  • Consumer 1
public class Consumer1 {
    public static void main(String[] args) throws IOException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        Channel channel = connection.createChannel();
        // Declare a switch
        channel.exchangeDeclare("order_topic"."topic");
        // Get the temporary queue
        String queue = channel.queueDeclare().getQueue();
        // Specify the route key
        String key = "user.*";
        channel.queueBind(queue, "order_topic", key);
        // Publish the message
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 1: consumption -" + newString(body)); }}); }}Copy the code
  • Consumer 2
public class Consumer2 {
    public static void main(String[] args) throws IOException {
        // Get the connection from the utility class
        Connection connection = RabbitMqUtil.getConnection();
        // Get the connection channel
        Channel channel = connection.createChannel();
        // Declare a switch
        channel.exchangeDeclare("order_topic"."topic");
        // Get the temporary queue
        String queue = channel.queueDeclare().getQueue();
        // Specify the route key
        String key = "user.#";
        channel.queueBind(queue, "order_topic", key);
        channel.basicConsume(queue, true.new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("Consumer 2: Consumption -" + newString(body)); }}); }}Copy the code

Result: Only consumer 2 received the message, because the message is a multi-tier structure and only user.# can match it

Consumer 2: consumption - the message sent to the specified route user.query.allCopy the code

5. Springboot implements RabbitMQ

SpringBoot provides the Spring For RabbitMQ launcher, along with a series of annotations and a RabbitTemplate that greatly simplifies the process of developing RabbitMQ. The following are examples of [5.1 based on pure annotations] and [5.2 based on annotations + configuration class], which are used in much the same way, except for the location of declaration and binding of queue switches, etc. Generally speaking, the latter is better to maintain and manage, so choose one of them.

Environment Preparation:

  1. Start by creating a SprinBoot project, then select the RabbitMQ initiator and basic initiators such as unit tests
  2. Write the YML configuration file and write the data needed to connect to RabbitMQ

The RabbitMQ rely on

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
Copy the code

Yml configuration file

spring:
  rabbitmq:
    host: 192.168122.1. # server address
    port: 5672 # TCP port
    username: admin # username
    password: admin # user password
    virtual-host: /rabbitmq_springboot_01 # Virtual host
Copy the code

5.1 Based on pure annotations

Note: Instead of creating configuration classes to manage queues, switch declarations, bindings, etc., all are written directly to the consumer via annotations

5.1.1 Simple queue mode

All of the code that produces the message is put into Test

  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /** * Inject RabbitTemplate */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleSendMessage(a) {
        rabbitTemplate.convertAndSend("simple_queue"."This is a message !"); }}Copy the code
  1. The first step is to inject the RabbitTemplate provided by SpringBoot
  2. The RabbitTemplate convertAndSend method is used to send messages. It can be overloaded in many ways, and will use 2 and 3 arguments respectively today
    • ConvertAndSend method (two parameters)
      • Parameter 1: routingKey
      • Parameter 2: object (body of sent message)
    • ConvertAndSend method
      • Parameter 1: Exchange
      • Parameter 2: routingKey
      • Parameter 3: object (body of sent message)
  • consumers
// Inject the container
@Component
/ / to monitor the RabbitMQ
@RabbitListener(queuesToDeclare = @Queue(value = "simple_queue", durable = "true", exclusive = "false", autoDelete = "false"))
public class SimpleConsumer {
    // Automatic callback
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("Consumer:"+ message); }}Copy the code
  1. Into the container

  2. Listen on RabbitMQ, in the @rabbitListener annotation, to implement the queue declaration and subsequent switch and queue binding etc

    • @queue can have four arguments, and since it has default values, given a value, it will be created by default in a persistent, non-exclusive, non-auto-delete fashion
      • Parameter 1: value (queue name)
      • Parameter 2: durable (Durable message queue) The queue remains after the RabbitMQ restart. The default value is true
      • Parameter 3: EXCLUSIVE indicates whether the message queue is exclusive to the current Connection. The default is false
      • Parameter 4: auto-delete indicates that the message queue will be automatically deleted when it is not in use. The default is false
  3. By adding @Rabbithandler annotations to the method, we can implement automatic callbacks so that we can get messages from the producer

    • Note: receiveMessage the parameter type of this method depends on what type of data you have sent in the producer

5.1.2 Work queue mode

5.1.2.1 Polling Mode

  • Producer: Nothing to say, because working mode has multiple consumers, so send more messages
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /** * Inject RabbitTemplate */
    @Autowired

    @Test
    public void testWorkSendMessage(a) {
        for (int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("work_queue"."This is a message ! , serial number:"+ i); }}}Copy the code
  • consumers
@Component
public class WorkConsumer {  
    / / to monitor the RabbitMQ
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    // Consumer 1
    public void receiveMessage1(String message) {
        System.out.println("Consumer 1:" + message);
   
    / / to monitor the RabbitMQ
    @rabbitListener (queuesToDeclare = @queue ("work_queue") // Consumer 2 public void receiveMessage2(String message) { System.out.println(" consumer 2: "+ message); }}Copy the code
  1. The @rabbitListener annotation can be placed on either a class or a method, as in the code above, which is used to refer to different consumers.
    • However, adding the @RabbitListener annotation to the class and the @Rabbithandler annotation to the following two methods will cause an error and require the creation of a separate class for each consumer

5.1.2.2 Fair Mode (Based on Ability)

5.1.2.2.1 How to Modify the Configuration File

  • Producer invariant

  • Example Modify the configuration file yML/properties

spring:
  rabbitmq:
    host: 192.168122.1. # server address
    port: 5672 # TCP port
    username: admin # username
    password: admin # user password
    virtual-host: /rabbitmq_springboot_01 # Virtual host
	# New section
    listener:
      simple:
        acknowledge-mode: manual # Enable ack manual reply
        prefetch: 1 You can only consume 1 message at a time
Copy the code
  1. Acknowledge -mode Describes the option
    • Auto: indicates automatic confirmation, which is the default option
    • Manual: manual confirmation (set to manual confirmation based on capacity allocation)
    • None: indicates no confirmation. The device is automatically discarded after being sent
  • consumers
@Component
public class WorkConsumer {
	/ / to monitor the RabbitMQ
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    // Consumer 1
    public void receiveMessage(String body, Message message, Channel channel) throws IOException {
        try {
            // Prints the message subject
            System.out.println("Consumer 1:" + body);
            // Returning deliveryTag means the queue can remove the message
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
            // The consumer tells the queue that information consumption failed
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.true); }}/ / to monitor the RabbitMQ
    @RabbitListener(queuesToDeclare = @Queue("work_queue"))
    // Consumer 2
    public void receiveMessage2(String body, Message message, Channel channel) throws IOException{
        try {
            // Delay 2s indicates slow processing
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
			// Prints the message subject
            System.out.println("Consumer 2:" + body);
            // Returning deliveryTag means the queue can remove the message
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            e.printStackTrace();
            // The consumer tells the queue that information consumption failed
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false.true); }}}Copy the code
  1. Because manual validation is enabled in the YML configuration, you need to return confirmation messages after success and failure

  2. BasicAck method explanation

    • Parameter 1: deliveryTag (the index of the message), which confirms receipt of the message and can be deleted by the queue
    • Parameter 2: mutiple Select true to reject all messages smaller than deliveryTag at one time
  3. BasicNack method explanation

    • The above parameters 1 | 2
    • Parameter 3: Requeue (whether rejected items are re-queued)

Running results:

consumers1: This is a message! The serial number:2consumers1: This is a message! The serial number:3consumers1: This is a message! The serial number:4consumers1: This is a message! The serial number:5consumers1: This is a message! The serial number:6consumers1: This is a message! The serial number:7consumers1: This is a message! The serial number:8consumers1: This is a message! The serial number:9consumers1: This is a message! The serial number:10consumers1: This is a message! The serial number:11consumers1: This is a message! The serial number:12consumers1: This is a message! The serial number:13consumers1: This is a message! The serial number:14consumers1: This is a message! The serial number:15consumers1: This is a message! The serial number:16consumers1: This is a message! The serial number:17consumers1: This is a message! The serial number:18consumers1: This is a message! The serial number:19consumers1: This is a message! The serial number:20consumers2: This is a message! The serial number:1
Copy the code

Up to now, we have implemented the way of modifying the configuration file to realize the allocation according to the ability. We have added several configuration contents, we only used part of the above, the rest is for your reference, you can choose yML and properties by yourself

# send confirmation
spring.rabbitmq.publisher-confirm-type=correlated
# spring. The rabbitmq. Publisher - confirms = true (old)
Send a callback
spring.rabbitmq.publisher-returns=true
Manual confirmation of consumption
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
The concurrent consumer initializes the value
spring.rabbitmq.listener.simple.concurrency=1
# Maximum number of concurrent consumers
spring.rabbitmq.listener.simple.max-concurrency=10
# Number of messages that can be pulled and processed per consumer per listen
The number of messages processed in a single request should be greater than or equal to the number of transactions (the maximum number of unacks)
spring.rabbitmq.listener.simple.prefetch=1
Whether to support retry
spring.rabbitmq.listener.simple.retry.enabled=true
Copy the code

5.1.2.2.1 Factory Configuration Mode

/** * Set up consumer confirmation mechanism, and achieve the effect of versatile **@paramConnectionFactory connectionFactory *@return* /
@Bean("workListenerFactory")
public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory containerFactory =
        new SimpleRabbitListenerContainerFactory();
    containerFactory.setConnectionFactory(connectionFactory);
    // Change to manual confirmation
    containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    // Reject policy,true back to queue false discard, default is true
    containerFactory.setDefaultRequeueRejected(true);
    // Change the default PrefetchCount from 250 to 1
    containerFactory.setPrefetchCount(1);

    return containerFactory;
}
Copy the code
  • Consumer modification
@RabbitListener(queuesToDeclare = @Queue("work_queue"))
// Add the above listener to the containerFactory property, and pass in the configured factory
@RabbitListener(queuesToDeclare = @Queue("work_queue"), containerFactory = "workListenerFactory")
Copy the code

5.1.3 Publish and subscribe Mode

  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /** * Inject RabbitTemplate */
    @Autowired

    @Test
    public void testFanoutSendMessage(a) {
        rabbitTemplate.convertAndSend("order_exchange".""."This is a message !"); }}Copy the code
  1. Since the switch is involved from this pattern onwards, the three-parameter approach is used
  • consumers
@Component
public class FanoutConsumer {
    // Bind temporary queues and switches
    @queuebinding (bindings = {@queuebinding (value = @queue ()), // Type = "fanout") // Switch type)})
    public void receiveMessage1(String message) {
        System.out.println("Consumer 1:" + message);
    }

    // Bind temporary queues and switches
    @queuebinding (bindings = {@queuebinding (value = @queue ()), // Type = "fanout") // Switch type)})
    public void receiveMessage2(String message) {
        System.out.println("Consumer 2:"+ message); }}Copy the code

5.1.4 Routing Mode (Direct)

  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /** * Inject RabbitTemplate */
    @Autowired

    @Test
    public void testDirectSendMessage(a) {
        rabbitTemplate.convertAndSend("direct_exchange"."info"."This is a message !"); }}Copy the code
  • consumers
@Component
public class DirectConsumer {
    // Bind temporary queues and switches
    @rabbitlistener (bindings = {@queuebinding (value = @queue ()), // Type = "direct"), / / switches and type key = {" info ", "warn", "error"}}) / / routing key)
    public void receiveMessage1(String message) {
        System.out.println("Consumer 1:" + message);
    }

     // Bind temporary queues and switches
    @rabbitlistener (bindings = {@queuebinding (value = @queue ()), // Type = "direct"), / / switches and type key = {" info ", "warn", "error"}}) / / routing key)
    public void receiveMessage2(String message) {
        System.out.println("Consumer 2:"+ message); }}Copy the code

5.1.5 Theme mode

  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /** * Inject RabbitTemplate */
    @Autowired

    @Test
    public void testTopicSendMessage(a) {
        rabbitTemplate.convertAndSend("topic_exchange"."order.insert.common"."This is a message !"); }}Copy the code
  • consumers
@Component
public class TopicConsumer {
    // Bind temporary queues and switches
    @rabbitListener (bindings = {@queuebinding (value = @queue ()), Type = "topic"), / / switches and type key = {} "order. *" / / wildcard routing key)})
    public void receiveMessage1(String message) {
        System.out.println("Consumer 1:" + message);
    }

    // Bind temporary queues and switches
    @rabbitListener (bindings = {@queuebinding (value = @queue ()), Type = "topic"), / / switches and type key = {} "order. *" / / wildcard routing key)})
    public void receiveMessage2(String message) {
        System.out.println("Consumer 2:"+ message); }}Copy the code

5.2 Based on annotations + configuration classes

In fact, this way, is to switch, queue statement and binding in the configuration class, one is the consumer the annotations in the concise, then there is unified management, more organized, and producers and consumers to reference when more convenient, also change in the future, also do not need to modify every place.

Because the space is too long, here is a demonstration of the most complex Topic approach, the others are also easy to pick up.

  • The configuration class
@Configuration
public class RabbitMqConfiguration {
    
    public static final String TOPIC_EXCHANGE = "topic_order_exchange";
    public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
    public static final String TOPIC_QUEUE_NAME_2 = "test_topic_queue_2";
    public static final String TOPIC_ROUTINGKEY_1 = "test.*";
    public static final String TOPIC_ROUTINGKEY_2 = "test.#";

    @Bean
    public TopicExchange topicExchange(a) {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1(a) {
        return new Queue(TOPIC_QUEUE_NAME_1);
    }

    @Bean
    public Queue topicQueue2(a) {
        return new Queue(TOPIC_QUEUE_NAME_2);
    }

    @Bean
    public Binding bindingTopic1(a){
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_1);
    }
    @Bean
    public Binding bindingTopic2(a){
        returnBindingBuilder.bind(topicQueue2()) .to(topicExchange()) .with(TOPIC_ROUTINGKEY_2); }}Copy the code
  1. Add the @Configuration annotation to indicate that this is a Configuration class

  2. Define constants: Switch names, queue names, route keys, etc can be created as constants, which can be easily called, managed and modified. You can also create a special RabbitMQ constant class.

  3. Define the switch: Since this example is Topic, choose the TopicExchange type

  4. Define the queue: pass the queue name constant, because there is a default value for persistence, you can also customize the persistent, exclusive and other parameters

  5. Bind the switch to the queue: Bind the queue using BindingBuilder’s bind method, to bind to the specified switch, and with pass in the routing key

  • producers
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /** * Inject RabbitTemplate */
    @Autowired

    @Test
    public void testTopicSendMessage(a) {
        rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert"."This is a message !"); }}Copy the code
  • consumers
@Component
public class TopicConsumer {
	// Bind the queue
    @RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_1})
    public void receiveMessage1(String message) {
        System.out.println("Consumer 1:" + message);
    }
	
    // Bind the queue
    @RabbitListener(queues = {RabbitMqConfiguration.TOPIC_QUEUE_NAME_2})
    public void receiveMessage2(String message) {
        System.out.println("Consumer 2:"+ message); }}Copy the code