What is message queue MQ?

Message queues and RocketMQ

Message Queue MQ

Message Queue (MQ for short) is a cross-process communication mechanism commonly used for asynchronous transfer of data between applications. MQ products are also commonly referred to as “messaging middleware” in architectures. Its main responsibility is to ensure reliable data transmission between services and realize the decoupling between services.

This is too academic. Let’s take a look at a practical case of a project. Assuming that the municipal tax system reports the annual tax summary data to the provincial tax system, according to the previous design, as a data producer, the municipal tax system needs to know the IP, port, interface and other details of the provincial tax system. Then, the data is sent to the provincial tax system synchronously through RPC or RESTful. The provincial tax system, as the consumer of the data, responds that “the data has been received”.

While there is no logical problem, there are three new technical problems:

  • If the provincial tax system is being upgraded and maintained at the time of reporting, the municipal tax system must design additional retransmission mechanisms to ensure data integrity;
  • If the provincial tax system takes 1 minute to receive data and the municipal tax system adopts synchronous communication, the transmission thread of the municipal tax system will be blocked for 1 minute. In high concurrency scenario, such a long time of blocking is easy to cause the system collapse.
  • If there is any change in the invocation mode, interface, IP and port of the provincial tax system interface, the municipal tax system must be notified immediately for adjustment, otherwise there will be communication failure.

Can be seen from the above three questions, the * * * * system at the provincial level changes directly affect the municipal taxation system, both the strong coupling, if the problem on the Internet micro service architecture, dozens of service invocation in series, each service if have similar between strong coupling, the system will be difficult to maintain.

It can be seen that after the introduction of message queues, both producers and consumers only process data for message queues. Data producers do not need to know the information of specific consumers at all, as long as they put the data in the specified queue according to the prior agreement. Similarly, the consumer listens to the message queue, and if new data is generated in the queue, MQ will “PUSH” or “PULL” the new data to the consumer for subsequent processing.

As can be seen from the diagram, as long as the message queue product is stable and reliable, the process of message communication is guaranteed. In the architecture space, many vendors have developed their own MQ products. The most representative open source products are:

  • Kafka
  • ActiveMQ
  • ZeroMQ
  • RabbitMQ
  • RocketMQ

Each product has its own different design and implementation principles, but the underlying goal is the same: to provide a reliable asynchronous transport mechanism for interprocess communication. RocketMQ, as an Ali product, is naturally integrated into the Spring Cloud Alibaba ecosystem. After many tests of Double 11, RocketMQ is excellent in performance, reliability and ease of use. Let’s take a look at RocketMQ.

RocketMQ

RocketMQ is a distributed message queue middleware. RocketMQ was originally designed to meet the needs of Alibaba’s own business for asynchronous messaging. After version 3.x, RocketMQ was officially open source and donated to Apache. It is also one of the most widely and widely used MQ products in China.

RocketMQ has many excellent features. In terms of availability, RocketMQ emphasizes no single point of cluster, high availability at any point, load balancing capability of clients, and easy horizontal scaling. In terms of performance, the 100-million-level message processing behind Tmall’s Double 11 promotion is guaranteed by RocketMQ. In terms of API, it provides rich functions, which can realize asynchronous message, synchronous message, sequential message, transaction message and other rich functions, which can meet most application scenarios. In terms of reliability, the system provides message persistence, failure retry mechanism, and message query tracing functions to further guarantee reliability.

After understanding RocketMQ’s many features, let’s understand some important concepts of RocketMQ:

  • Message: A Message is broadly defined as business data passed between processes. In the narrow sense, different MQ products attach additional attributes to messages such as Topic, Tags, and so on.
  • Producer: Refers to the role of producing data. In the previous case, the municipal tax system acted as the Producer of messages.
  • Message Consumer: Refers to the role of using data. The provincial tax system in the previous example is message Consumer.
  • MQ message service Broker: MQ message server, used for message storage and message forwarding.
  • RocketMQ groups producers that send the same type of messages into Producer groups.
  • Consumer Groups: RocketMQ groups consumers who consume the same type of message into Consumer groups.

After understanding these basic concepts, we will formally enter the deployment and use of RocketMQ, through the case code to understand the implementation process of RocketMQ. For RocketMQ, there are two phases: cluster RocketMQ servers and application access to the RocketMQ queue. First, let’s deploy the RocketMQ cluster.

Deploy the RocketMQ cluster

RocketMQ naturally adopts the cluster mode. The common RocketMQ cluster has three forms: ** multi-master mode, multi-master multi-slave-asynchronous replication mode, and multi-master multi-slave-synchronous double-write mode. The advantages and disadvantages of these three modes are listed below.

  • The multi-master mode is the easiest to configure and the most used. The advantage is that the maintenance of a single Master has no impact on the application. When the RAID10 disk is configured as RAID10, the performance is the highest even when the RAID10 disk is down and cannot be recovered because the RAID10 disk is very reliable and the synchronous flush message is not lost. The disadvantage is that during a single machine outage, messages that have not been consumed by the machine cannot be subscribed until the machine is restored, and the real-time performance of messages will be affected.
  • Multi-master, multi-slave Asynchronous replication mode. Each Master is configured with a Slave. There are multiple pairs of master-slaves. HA adopts asynchronous replication. At the same time, if the Master is down, consumers can still consume from the Slave. This process is transparent to the application and does not require manual intervention. The performance is almost the same as that of the multi-master mode. The disadvantage is that the Master is down and a small number of messages are lost in the case of disk corruption.
  • In the multi-master, multi-Slave synchronous dual-write mode, the HA adopts the synchronous dual-write mode, that is, the data is successfully written to the application only when the data is successfully written to both the Master and Slave. In this mode, data and services have no single point of failure. When the Master is down, messages are not delayed, and the service and data availability are very high. The performance is 10% lower than that of the asynchronous replication mode, and the execution time of sending a single message is slightly longer. In the current version, when the active node is down, the standby node cannot automatically switch to the host.

Set up a spatial Master server cluster, first look at the deployment architecture diagram:

In the dual Master architecture, there is a new role for NameServer, RocketMQ’s own lightweight routing registry that enables brokers to initiate dynamic registration and discovery. When the Broker is started, a heartbeat report is automatically sent to NameServer notifying the Broker to come online. When the Provider obtains routing information from NameServer, it establishes a long connection to the specified Broker to complete the data transmission.

To avoid single-node bottlenecks, NameServer typically deploys more than two nodes as high availability redundancy. NameServer itself is stateless and does not communicate with each other. Therefore, all NameServer nodes must be configured during Broker cluster configuration to ensure state synchronization.

Deploying a RocketMQ cluster involves two steps: deploying NameServer and deploying a Broker cluster.

First, deploy the NameServer cluster.

Create two CentOS7 VMS with IP addresses 192.168.31.200 and 192.168.31.201 respectively. The two VMS are required to have more than 2 GB of memory, and install 64-bit JDK1.8.

Then visit the Apache RocketMQ download page:

www.apache.org/dyn/closer….

RocketMQ: rocketmq-all-4.8.0-bin-release.zip: Rocketmq-all-4.8.0-bin-release /bin/runserver.sh: rocketmq-all-4.8.0-bin-release/bin/runserver.sh: rocketmq-all-4.8.0-bin-release Because RocketMQ is server software, it is configured with 8GB of memory by default, which is too much for PCS and/or laptops, so reduce the JVM to 1GB near line 82 for demonstration purposes.

Modify before:

JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
Copy the code

Revised:

CD/usr/local/rocketmq - all - 4.8.0 - bin - release/bin/sh mqnamesrvCopy the code

The NameServer boot success. SerializeType =JSON indicates that NameServer has been successfully started. NameServer will use port 9876 to provide services, do not forget to set the firewall on the pass. Then deploy NameServer on another 201 device to form a NameServer cluster.

Second, deploy the Broker cluster.

Create two CentOS7 VMS with IP addresses of 192.168.31.210 and 192.168.31.211. Also, the two VMS must have more than 2 GB memory and install 64-bit JDK1.8.

Open the Rocketmq-all-4.8.0-bin-release directory, edit the /bin/runbrock. sh file, and also reduce the default memory footprint for starting brokers from 8GB to 1G. Change 64 lines to the following:

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"
Copy the code

In the conf directory, RocketMQ has provided us with three sets of cluster configuration templates:

  • 2m-2s-async indicates the dual-master dual-slave asynchronous replication mode.
  • 2m-2s-sync indicates the dual-primary, dual-secondary synchronization and dual-write mode.
  • 2m-noslave indicates the dual-master mode.

We append the NameServer cluster address to the end of broker-a. perties and broker-B. perties in the 2m-noslave dual mode directory. I also annotate the meaning of each item in the template for easy understanding. First, the full contents of broker-a. perties are as follows:

BrokerClusterName =DefaultCluster # Broker name brokerName=broker-a #brokerId=0 brokerClusterName= Broker - A #brokerId=0 Greater than zero means log file time removed from node brokerId=0 # default 4am deleteWhen=04 # Log file retention time, Default 48 hours fileReservedTime=48 #Broker role #- ASYNC_MASTER Asynchronous copy Master #- SYNC_MASTER Synchronous double write Master brokerRole=ASYNC_MASTER FlushDiskType =ASYNC_FLUSH # NameServer node list Use semicolons to split namesrvAddr=192.168.31.200:9876; 192.168.31.201:9876Copy the code

Broker-b. perties only differs from brokerName as follows:

brokerClusterName=DefaultCluster brokerName=broker-b brokerId=0 deleteWhen=04 fileReservedTime=48 BrokerRole =ASYNC_MASTER flushDiskType=ASYNC_FLUSH # End Add, NameServer node list, split with a semi-colon namesrvAddr=192.168.31.200:9876; 192.168.31.201:9876Copy the code

Upload the rocketmq-all-4.8.0-bin-release directory to the /usr/local directory and run the following command to start broker node A.

CD /usr/local/rocketmq-all-4.8.0-bin-release/ sh bin/mqbroker -c./conf/2m-noslave/broker-a. pertiesCopy the code

Add the C parameter after the MQBroker command is launched to specify which Broker configuration file to load.

The Broker will use port 10911 to service the Broker. Please enable firewall access.

The broker[broker-a, 192.168.31.210:10911] boot Success. SerializeType =JSON and name server is 192.168.31.200:9876; 192.168.31.201:9876Copy the code

Similarly, execute the following command on the other Master to start and load the broker-b configuration file.

CD /usr/local/rocketmq-all-4.8.0-bin-release/ sh bin/mqbroker -c./conf/2m-noslave/broker -b.pertiesCopy the code

Now that the NameServer cluster and Broker cluster are deployed, execute two commands to verify.

First, use the mqadmin command to check the cluster status.

The mqadmin command exists in the bin directory to manage the RocketMQ cluster. You can use clusterList to view the cluster nodes as follows:

Sh mqadmin clusterList -n 192.168.31.200:9876Copy the code

Query the registration information on NameServer, and the following result is displayed.

You can see that there are two brokers in the DefaultCluster cluster because the BID number is 0, indicating that they are both Master nodes.

Second, use RocketMQ’s tools.sh tool to test the actual operation of MQ by generating demo data. Use the following command in the bin directory.

Export NAMESRV_ADDR = 192.168.31.200:9876 sh tools. Sh org. Apache. Rocketmq. Example. The quickstart. ProducerCopy the code

You will see the screen output log:

SendResult [sendStatus=SEND_OK, msgId=7F0000010B664DC639969F28CF540000, offsetMsgId=C0A81FD200002A9F00000000000413B6, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000010B664DC639969F28CF9B0001, offsetMsgId=C0A81FD200002A9F000000000004147F, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000010B664DC639969F28CFA30002, offsetMsgId=C0A81FD200002A9F0000000000041548, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000010B664DC639969F28CFA70003, offsetMsgId=C0A81FD300002A9F0000000000033C56, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000010B664DC639969F28CFD60004, offsetMsgId=C0A81FD300002A9F0000000000033D1F, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000010B664DC639969F28CFDB0005, offsetMsgId=C0A81FD300002A9F0000000000033DE8, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=2], queueOffset=0]
...
Copy the code

The alternate occurrence of broker-A and broker-b indicates that the cluster is in effect.

We tested the service provider, now we test the consumer, run the following command:

Export NAMESRV_ADDR = 192.168.31.200:9876 sh tools. Sh org. Apache. Rocketmq. Example. The quickstart. ConsumerCopy the code

The RocketMQ double Master cluster setup is complete. The multi-master, multi-slave configuration is also similar. If you look at the official documentation, you will be able to get started very quickly.

ConsumeMessageThread_11 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=2, storeSize=203, queueOffset=157, sysFlag=0, bornTimestamp=1612100880154, BornHost = / 192.168.31.210:54104, storeTimestamp = 1612100880159, storeHost = / 192.168.31.211:10911, msgId=C0A81FD300002A9F0000000000053509, commitLogOffset=341257, bodyCRC=1116443590, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=158, CONSUME_START_TIME=1612100880161, UNIQ_KEY=7F0000010DA64DC639969F2C4B1A0314, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56, 56], transactionId='null'}]] ConsumeMessageThread_12 Receive New Messages: [MessageExt [brokerName=broker-b, queueId=3, storeSize=203, queueOffset=157, sysFlag=0, bornTimestamp=1612100880161, BornHost = / 192.168.31.210:54104, storeTimestamp = 1612100880162, storeHost = / 192.168.31.211:10911, msgId=C0A81FD300002A9F00000000000535D4, commitLogOffset=341460, bodyCRC=898409296, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=158, CONSUME_START_TIME=1612100880164, UNIQ_KEY=7F0000010DA64DC639969F2C4B210315, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56, 57], transactionId='null'}]]Copy the code

With the cluster deployed, how do you use RocketMQ for messaging? Let’s look at the Spring Boot code.

The application connects to the RocketMQ cluster

As an example, we used Spring Boot to integrate MQ client to achieve message sending and receiving. First, we simulated Producer Producer.

The Producer Producer sends messages

The first step is to create the RocketMQ-Provider project using the Spring Initializr wizard, ensuring that POM.xml introduces the following dependencies.

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <! -- RocketMQ client, Rocketmq </groupId> <artifactId> RocketMQ-client </artifactId> The < version > 4.8.0 < / version > < / dependency >Copy the code

Step 2, configure the application application.yml.

Rocketmq-client communicates primarily through encoding, so no additional configuration is required in application.yml.

server:
  port: 8000
spring:
  application:
    name: rocketmq-producer
Copy the code

Step 3, create the Controller, and the producer sends the message.

@RestController public class ProviderController { Logger logger = LoggerFactory.getLogger(ProviderController.class); @getMapping (value = "/send_s1_tax") public String send1() throws MQClientException {// Creates the DefaultMQProducer message producer object DefaultMQProducer producer = new DefaultMQProducer("producer-group"); SetNamesrvAddr ("192.168.31.200:9876; 192.168.31.201:9876 "); // Establish a long connection with NameServer For (int I = 0; i< 100 ; I++) {/ / data text String data = "{\" title \ ": \" X city tax in the first quarter of 2021 annual summary data \ "} "; Tax-data-topic = tax-data-topic = tax-data-topic = tax-data-topic = tax-data-topic = tax-data-topic = tax-data-topic = tax-data-topic = tax-data-topic */ Message Message = new Message("tax-data-topic", "2021S1", data.getbytes ()); SendResult result = producer.send(message); // Print the result object on the console logger.info(" Message sent: MsgId:" + result.getmsgid () + ", sending status :" + result.getsendStatus ()); } } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { producer.shutdown(); } return "success"; }}Copy the code

After the program is running, go to http://localhost:8000/send_s1_tax, the console will see the following output shows that the data has been the Broker receives, the Broker receives the Producer after the end of the task has been completed.

Message has been sent: MsgId: 7 f00000144e018b4aac29f3b7b280062, delivery status: SEND_OK message has been sent: MsgId: 7 f00000144e018b4aac29f3b7b2a0063, delivery status: SEND_OKCopy the code

Now let’s develop the Consumer.

The Consumer Consumer receives the message

As a first step, use the Spring Initializr wizard to create the RocketMq-Consumer project, ensuring that POM.xml introduces the following dependencies.

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <! -- RocketMQ client, Rocketmq </groupId> <artifactId> RocketMQ-client </artifactId> The < version > 4.8.0 < / version > < / dependency >Copy the code

The second step, application.yml, also requires no additional Settings.

server:
  port: 9000
spring:
  application:
    name: rocketmq-consumer
Copy the code

The third step, RocketmqConsumerApplication increase consumer to monitor the entrance of the application startup code, the key code comments has been completed.

@SpringBootApplication public class RocketmqConsumerApplication { private static Logger logger = LoggerFactory.getLogger(RocketmqConsumerApplication.class); public static void main(String[] args) throws MQClientException { SpringApplication.run(RocketmqConsumerApplication.class, args); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); // Set consumer.setNamesrvaddr ("192.168.31.200:9876; 192.168.31.201:9876 "); Subscribe contains two arguments: topic: specifies which topic the consumer subscribes to from the Broker, consistent with the Provider. SubExpression: subExpression is used to filter tags. The same topic can contain many different tags, subExpression is used to filter the qualified tags for acceptance. For example, if this parameter is set to *, all tags data is received. For example, if set to 2020S1, only tags=2020S1 messages will be received from the Broker, and 2020S2 will be excluded. */ consumer.subscribe("tax-data-topic", "*"); // Create a listener to catch and process new messages as they come in. consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( A List < MessageExt > MSGS, ConsumeConcurrentlyContext context) {/ / batch data processing for (MessageExt MSG: MSGS) {logger.info(" consumer data :"+new String(msg.getBody())); } / / return data has been receiving logo return ConsumeConcurrentlyStatus. CONSUME_SUCCESS; }}); // Start the consumer, establish a long connection to the Broker, and start listening. consumer.start(); }}Copy the code

When the application starts, the Provider generates new messages, which are immediately consumed by the Consumer and generated by the console.

The 2021-01-31 22:25:14. 17328-212 the INFO/MessageThread_3 C.L.R.R ocketmqConsumerApplication: Consumer Consumption Data :{"title":"X City tax Summary for q1 2021 "} 2021-01-31 22:25:14.217 INFO 17328 -- [MessageThread_2] C.L.R.R ocketmqConsumerApplication: consumer spending data: {" title ":" in the first quarter of 2021 X city tax summary data "}Copy the code

This is how Spring Boot connects to a RocketMQ cluster. In the current case, we control the sending and receiving of messages through code. The Spring Cloud Stream module is also provided in the Spring Cloud ecosystem, allowing programmers to access MQ more easily in a “declarative” development approach. But Spring Cloud Stream itself is too packaged, and many RocketMQ details are hidden, which is not a good thing to get started with. You’ll understand Spring Cloud Stream better after you’ve mastered RocketMQ.