“This article has participated in the good article call order activity, click to see: back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”

This is the first in a three-part series on RocketMQ.

1. Introduce the MQ

1.1 Why use MQ

Message queues are a first-in, first-out data structure.

The application scenarios are as follows:

  • The application of decoupling

The higher the coupling of the system, the lower the fault tolerance. Take the e-commerce application as an example. After a user creates an order, if the inventory system, logistics system and payment system are coupled, and any one of the subsystems is faulty or temporarily unavailable due to upgrade or other reasons, the ordering operation will be abnormal and the user experience will be affected.

With message queue decoupling, the coupling of the system is reduced. For example, when the logistics system fails, it takes several minutes to repair. During this time, the data to be processed by the logistics system is cached in the message queue, and the user’s order operation is completed normally. When the logistics system recovers, the order messages in the message queue can be processed, and the terminal system cannot perceive the failure of the logistics system for several minutes.

  • Traffic peak clipping

If your application is experiencing a sudden surge of request traffic, it may overwhelm the system. With message queues, large numbers of requests can be cached and processed over long periods of time, which can greatly improve system stability and user experience.

In general, in order to ensure the stability of the system, if the system load exceeds the threshold, it will prevent user requests, which will affect the user experience, rather than using message queues to cache requests and wait for the system to finish processing to notify the user of the completion of the order, which is not a good experience.

Secondly, for economic purposes: If the QPS of the business system is 1000 in normal time, the traffic peak is 10000. In order to cope with the traffic peak, it is obviously not cost-effective to configure a high-performance server. In this case, the message queue can be used to peak the peak traffic.

  • Data distribution

Message queues allow data to flow more widely across multiple systems. The producer of the data does not need to care about who uses the data, but simply sends the data to the message queue, where the consumer gets the data directly from the message queue.

1.2 The advantages and disadvantages of MQ

Advantages: Decoupling, peak-cutting, and data distribution disadvantages include the following:

  • The system availability decreases

The more external dependencies introduced into the system, the worse the stability of the system. When MQ goes down, there is an impact on business.

How to ensure high availability of MQ?

  • System complexity increases

The addition of MQ has greatly increased the complexity of the system, which used to be synchronous remote calls between systems, and now asynchronous calls are made through MQ.

How do YOU ensure that messages are not consumed repeatedly? How do I handle message loss? What about sequentiality of message delivery?

  • Consistency problem

After processing services, system A sends message data to systems B, C, and D through MQ. If systems B and C process data successfully, system D fails to process data.

How to ensure the consistency of message data processing?

1.3 Comparison of various MQ products

Common MQ products include Kafka, ActiveMQ, RabbitMQ, and RocketMQ.

features ActiveMQ RabbitMQ RocketMQ Kafka
Development of language java erlang java scala
Single machine throughput All level All level The class of 100000 The class of 100000
timeliness Ms level Us level Ms level Ms than level
availability High (master/slave architecture) High (master/slave architecture) Very high (distributed architecture) Very high (distributed architecture)
features Mature products, used in many companies; More documentation; Various protocols are well supported Based on Erlang development, so the concurrency capability is very strong, performance is extremely good, latency is very low, the management interface is rich MQ has complete functions and good expansibility Only major MQ functions are supported. Some functions such as query and message tracing are not provided. After all, they are prepared for big data and widely used in the field of big data

2. A Quick start to RocketMQ

RocketMQ is the MQ middleware of Alibaba in 2016, which is developed with Java language. Within Ali, RocketMQ has undertaken the message flow of high concurrency scenarios such as “Double 11” and can process the messages of trillion level.

2.1 Preparations

2.1.1 download RocketMQ

The website address

RocketMQ version requirements

RocketMQ uses version 4.7.1

Download address

Wget HTTP: / / https://archive.apache.org/dist/rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zipCopy the code

2.2.2 Environment Requirements

  • On a system
  • JDK1.8(64位)
  • Source code installation requires Maven 3.2.x installed

2.2 installation RocketMQ

2.2.1 Installation Procedure

This tutorial is installed as a binary package

  1. Decompress the Installation Package
# yum install -y unzipUnzip rocketmq - all - 4.7.1 - bin - release. Zip#The following can be omitted and configured according to personal habits
#renameMv rocketmq - all - 4.7.1 - bin - release rocketmq - all - 4.7.1#Creating a Soft LinkLn -s/soft/rocketmq - all - 4.7.1 - bin - release/soft/rocketmqCopy the code
  1. Going to the installation directory
cd /soft/rocketmq
Copy the code

2.2.2 Introduction to Directories

  • Bin: startup scripts, including shell scripts and CMD scripts
  • Conf: instance configuration files, including broker configuration files and logback configuration files
  • Lib: relies on JAR packages, including Netty, Commons-lang, and FastJSON

2.3 start RocketMQ

NameServer uses 4 GB of memory by default, and Broker uses 8 GB by default, so it is best to change the default JVM heap parameters to learn from them.

#Check the memory usage first, and configure the JVM heap memory according to the machine memory usage!!
free -h
Copy the code

The default RocketMQ VM has a large memory. If the Broker fails to start due to insufficient memory, you need to edit the following two configuration files to change the JVM memory size.

#Edit runbroker. Sh and runServer. sh to change the default JVM sizecd /soft/rocketmq/bin
vi runbroker.sh
vi runserver.sh
Copy the code

Reference Settings:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

  1. Start the NameServer
#1. Start the NameServercd /soft/rocketmq
nohup sh bin/mqnamesrv &
#2. Check the startup log or tail -f nohup.out to check whether the startup is normal
tail -f ~/logs/rocketmqlogs/namesrv.log
#3. Check the port 9876 is yum install - y lsof or netstat NLP | grep, 9876
lsof -i:9876
Copy the code
  1. Start the Broker
#1. Start the Brokercd /soft/rocketmq
nohup sh bin/mqbroker -n localhost:9876 &
#2. View startup logs
tail -f ~/logs/rocketmqlogs/broker.log 
Copy the code

2.4 test RocketMQ

2.4.1 Sending messages

#1. Set environment variables
export NAMESRV_ADDR=localhost:9876
#2. Use the Demo of the installation package to send messagescd /soft/rocketmq
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Copy the code

2.4.2 Receiving messages

#1. Set environment variables
export NAMESRV_ADDR=localhost:9876
#2. Receive the messagecd /soft/rocketmq
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Copy the code

2.5 close RocketMQ

#1. Close the NameServercd /soft/rocketmq
sh bin/mqshutdown namesrv
#2. Close the Brokercd /soft/rocketmq
sh bin/mqshutdown broker
Copy the code

3. Set up the RocketMQ cluster

3.1 Roles

  • Producer: The sender of messages; Example: the sender
  • Consumer: Message receiver; Example: the recipient
  • Broker: staging and transmitting messages; Example: the post office
  • NameServer: Management Broker; Example: the management of various post offices
  • Topic: Distinguishes the types of messages; A sender can send messages to one or more topics; The recipient of a message can subscribe to one or more Topic messages
  • Message Queue: A partition of a Topic; Used to send and receive messages in parallel

3.2 Cluster Setup Mode

3.2.1 Cluster Features

  • NameServer is a nearly stateless node that can be deployed in clusters,No information is synchronized between nodes
  • Brokers are divided into Master and Slave. A Master can correspond to multiple slaves, but a Slave can correspond to only one Master.The relationship between Master and Slave is defined by specifying the same BrokerName and different BrokerIDS, BrokerId 0 for Master and non-0 for Slave. Multiple masters can also be deployed. Each Broker establishes persistent connections to all nodes in the NameServer cluster,Periodically register Topic information to all NameServer.
  • A persistent connection is established between the Producer and one node (randomly selected) in the NameServer cluster.Periodically fetch Topic routing information from the NameServer and establish a persistent connection to the Master that provides Topic servicesAnd,Periodically send heartbeats to the Master.Producer Is stateless and can be deployed in clusters.
  • The Consumer establishes a persistent connection to one of the NameServer cluster nodes (selected at random),Periodically collects Topic routing information from the NameServer, establishes persistent connections to the Master and Slave that provide Topic services, and periodically sends heartbeat messages to the Master and Slave. A Consumer can subscribe to messages from either a Master or Slave, and the subscription rules are determined by the Broker configuration.

3.2.3 Cluster mode

1) Single Master mode

This approach is risky, as the entire service may become unavailable if the Broker is restarted or down. Not recommended for online environments, but can be used for local testing.

2) Multi-master mode

For example, there are two or three masters in a cluster. The advantages and disadvantages of this mode are as follows:

  • Advantages: simple configuration, no impact on applications when a single Master breaks down or is restarted for maintenance. When disks are configured as RAID10, the RAID10 disks are reliable, and messages are not lost even when the machine breaks down and cannot be recovered. (a small number of messages are lost during asynchronous disk flushing, but no messages are lost during synchronous disk flushing.)
  • Disadvantages: When a single machine is down, messages that are not consumed on that machine cannot be subscribed until the machine is restored. Message real-time is affected.
3) Multi-master and Multi-Slave mode (asynchronous)

Each Master is configured with a Slave, and there are multiple master-slave pairs. HA uses asynchronous replication, and the Master has short message latency (milliseconds). The advantages and disadvantages of this mode are as follows:

  • Advantages: Even if the disk is damaged, the message loss is very small and the real-time performance of the message will not be affected. At the same time, after the Master breaks down, consumers can still consume from the Slave, and this process is transparent to the application, without manual intervention, and the performance is almost the same as the multi-master mode.
  • Disadvantages: A small number of messages will be lost if the Master machine breaks down and the disk is damaged.
4) Multi-master and Multi-Slave mode (synchronous)

Each Master has a Slave and multiple master-slave pairs are configured. HA uses synchronous dual-write mode. That is, HA returns a success message to the application only when both the Master and Slave servers are successfully written.

  • Advantages: There is no single point of failure for data and services. When the Master is down, there is no message delay, and service availability and data availability are very high.
  • Disadvantages: Slightly lower performance than asynchronous replication mode (about 10% lower), slightly higher RT for sending individual messages, and the current version (Which version is to be confirmedAfter the active node breaks down, the standby node cannot be automatically switched to the host.

3.3 Two-node Cluster Configuration

3.3.1 Overall Architecture

The message high availability mode is 2M-2S (synchronous dual-write)

3.3.2 Cluster workflow

  1. Start the NameServer,The NameServer listens on the port and waits for the Broker, Producer, and Consumer to connect. It acts as a routing control center.
  2. The Broker starts, keeps in constant contact with all Nameservers, and sends heartbeat packets regularly. The heartbeat packet contains information about the current Broker (IP+ port, etc.) and information about all topics. After successful registration,The NameServer cluster has a mapping between Topic and Broker.
  3. Before sending and receiving messages, create a Topic,Creating a Topic requires specifying which brokers the Topic is to be stored on, or it can be created automatically when a message is sent.
  4. The Producer sends messages, establishes a persistent connection with one of the NameServer clusters, and starts fromNameServer obtains which brokers to send the Topic to, polls to select a queue from the queue list, and then establishes a long connection with the queue Broker to send messages to the Broker.
  5. A Consumer, similar to a Producer, establishes a permanent connection with a NameServer for accessWhich brokers are currently subscribed to, and then directly connects to the brokers to consume messages.

3.3.3 Server Environment

Two Nameservers are deployed on S201 and S202 respectively.

Four brokers are deployed on S201 and S202: two master brokers (s201 and S202 deployed separately) and two Slave brokers (S201 and S202 deployed separately).

  • S201 onThe deployment ofmaster broker(a),slave broker(b-s)
  • s202Deployed onmaster broker(b),slave broker(a-s)
The serial number Node IP role Architectural patterns
1 S201 (192.168.85.201) Nameserver, brokerserver Master1, Slave2
2 S202 (192.168.85.202) Nameserver, brokerserver Master2, Slave1

3.3.4 Host Adding information

vim /etc/hosts
Copy the code

The configuration is as follows:

############ rocketmq start ############

# nameserver
192.168.85.201 s201
192.168.85.202 s202

# broker
192.168.85.201 rocketmq-master1
192.168.85.201 rocketmq-slave2
192.168.85.202 rocketmq-master2
192.168.85.202 rocketmq-slave1

############ rocketmq end ############
Copy the code

After the configuration, restart the NIC

#The centos7 command is as follows. The command varies according to the Linux version.
systemctl restart network
Copy the code

3.3.5 Firewall Configuration

The host machine needs to remotely access rocketMQ service and Web service of the virtual machine, and the related port number needs to be opened. The simple and crude way is to directly disable the firewall.

#Disabling the firewall
systemctl stop firewalld.service 
#Check the firewall status
firewall-cmd --state 
#Disable the firewall startup
systemctl disable firewalld.service
Copy the code

Or, for security, open only certain port numbers. RocketMQ uses three ports by default: 9876, 10911, and 11011. If the firewall is not turned off, then the firewall must open these ports:

  • nameserverBy default, port 9876 is used
  • masterPort 10911 is used by default
  • slaveBy default, port 11011 is used

Run the following command:

#Example Open the default port of the Name server
firewall-cmd --remove-port=9876/tcp --permanent
#Open the master default port
firewall-cmd --remove-port=10911/tcp --permanent
#Enabling the slave default port (optional in the current cluster mode)
firewall-cmd --remove-port=11011/tcp --permanent 
#Restarting the Firewall
firewall-cmd --reload
Copy the code

3.3.6 Environment Variable configuration

vim /etc/profile
Copy the code

Add the following command to the end of the profile

Input: wq! (or Shift + zz) Save and exit, and make the configuration take effect immediately:

source /etc/profile
Copy the code

3.3.7 Creating a Message Storage Path

Note: the primary and secondary storage directories cannot be the same, and should be separated!! What a rip off!! (Can have relation with version, unknown ha!!) RocketMQ has a lot of bugs on the Internet anyway.

mkdir /soft/rocketmq/store
mkdir /soft/rocketmq/store/commitlog
mkdir /soft/rocketmq/store/consumequeue
mkdir /soft/rocketmq/store/index
mkdir /soft/rocketmq/store-s
mkdir /soft/rocketmq/store-s/commitlog
mkdir /soft/rocketmq/store-s/consumequeue
mkdir /soft/rocketmq/store-s/index
Copy the code

3.3.8 Broker profile

The 2M-2s-sync architecture is adopted. The configuration file is stored in the following path:

1) master1

Server: 192.168.85.201 (S201)

vim /soft/rocketmq/conf/2m-2s-sync/broker-a.properties
Copy the code

Modify the configuration as follows:

#Cluster Name
brokerClusterName=rocketmq-cluster
#Broker name. Note that the broker name is different for different configuration files
brokerName=broker-a
#0 indicates Master, and >0 indicates Slave
brokerId=0
#NameServer address, separated by semicolonnamesrvAddr=s201:9876; s202:9876#When sending messages, topics that do not exist on the server are automatically created, the default number of queues created
defaultTopicQueueNums=4
#Whether to allow the Broker to automatically create topics. It is recommended that the Topic be enabled offline or disabled online
autoCreateTopicEnable=true
#Whether to allow the Broker to automatically create subscription groups. You are advised to enable offline and disable online subscription groups
autoCreateSubscriptionGroup=true
#The listening port of the Broker for external services
listenPort=10911
#The time when files are deleted is 4 am by default
deleteWhen=04
#File retention time, 48 hours by default
fileReservedTime=120
#CommitLog The default size of each file is 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue each file stores 30W entries by default. The value can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#Check the physical file disk space
diskMaxUsedSpaceRatio=88
#The store path
storePathRootDir=/soft/rocketmq/store
#CommitLog Storage path
storePathCommitLog=/soft/rocketmq/store/commitlog
#Consumption queue storage path Storage path
storePathConsumeQueue=/soft/rocketmq/store/consumequeue
#Message index storage path
storePathIndex=/soft/rocketmq/store/index
#Checkpoint File storage path
storeCheckpoint=/soft/rocketmq/store/checkpoint
#Abort File storage path
abortFile=/soft/rocketmq/store/abort
#Limit the message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#The role of the Broker
#- ASYNC_MASTER Master of asynchronous replication
#- SYNC_MASTER Synchronous dual-write Master
#- SLAVE
brokerRole=SYNC_MASTER
#Brush set way
#- ASYNC_FLUSH Flushes disks asynchronously
#- SYNC_FLUSH Flushes disks synchronously
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#Number of message sending thread pools
#sendMessageThreadPoolNums=128
#Number of pull message thread pools
#pullMessageThreadPoolNums=128
Copy the code
2) slave2

Server: 192.168.85.201 (S201)

vim /soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties
Copy the code

Modify the configuration as follows:

#Cluster Name
brokerClusterName=rocketmq-cluster
#Broker name. Note that the broker name is different for different configuration files
brokerName=broker-b
#0 indicates Master, and >0 indicates Slave
brokerId=1
#NameServer address, separated by semicolonnamesrvAddr=s201:9876; s202:9876#When sending messages, topics that do not exist on the server are automatically created, the default number of queues created
defaultTopicQueueNums=4
#Whether to allow the Broker to automatically create topics. It is recommended that the Topic be enabled offline or disabled online
autoCreateTopicEnable=true
#Whether to allow the Broker to automatically create subscription groups. You are advised to enable offline and disable online subscription groups
autoCreateSubscriptionGroup=true
#The listening port of the Broker for external services
listenPort=11011
#The time when files are deleted is 4 am by default
deleteWhen=04
#File retention time, 48 hours by default
fileReservedTime=120
#CommitLog The default size of each file is 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue each file stores 30W entries by default. The value can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#Check the physical file disk space
diskMaxUsedSpaceRatio=88
#The store path
storePathRootDir=/soft/rocketmq/store-s
#CommitLog Storage path
storePathCommitLog=/soft/rocketmq/store-s/commitlog
#Consumption queue storage path Storage path
storePathConsumeQueue=/soft/rocketmq/store-s/consumequeue
#Message index storage path
storePathIndex=/soft/rocketmq/store-s/index
#Checkpoint File storage path
storeCheckpoint=/soft/rocketmq/store-s/checkpoint
#Abort File storage path
abortFile=/soft/rocketmq/store-s/abort
#Limit the message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#The role of the Broker
#- ASYNC_MASTER Master of asynchronous replication
#- SYNC_MASTER Synchronous dual-write Master
#- SLAVE
brokerRole=SLAVE
#Brush set way
#- ASYNC_FLUSH Flushes disks asynchronously
#- SYNC_FLUSH Flushes disks synchronously
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#Number of message sending thread pools
#sendMessageThreadPoolNums=128
#Number of pull message thread pools
#pullMessageThreadPoolNums=128
Copy the code
3) master2

Server: 192.168.85.202 (S202)

vim /soft/rocketmq/conf/2m-2s-sync/broker-b.properties
Copy the code

Modify the configuration as follows:

#Cluster Name
brokerClusterName=rocketmq-cluster
#Broker name. Note that the broker name is different for different configuration files
brokerName=broker-b
#0 indicates Master, and >0 indicates Slave
brokerId=0
#NameServer address, separated by semicolonnamesrvAddr=s201:9876; s202:9876#When sending messages, topics that do not exist on the server are automatically created, the default number of queues created
defaultTopicQueueNums=4
#Whether to allow the Broker to automatically create topics. It is recommended that the Topic be enabled offline or disabled online
autoCreateTopicEnable=true
#Whether to allow the Broker to automatically create subscription groups. You are advised to enable offline and disable online subscription groups
autoCreateSubscriptionGroup=true
#The listening port of the Broker for external services
listenPort=10911
#The time when files are deleted is 4 am by default
deleteWhen=04
#File retention time, 48 hours by default
fileReservedTime=120
#CommitLog The default size of each file is 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue each file stores 30W entries by default. The value can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#Check the physical file disk space
diskMaxUsedSpaceRatio=88
#The store path
storePathRootDir=/soft/rocketmq/store
#CommitLog Storage path
storePathCommitLog=/soft/rocketmq/store/commitlog
#Consumption queue storage path Storage path
storePathConsumeQueue=/soft/rocketmq/store/consumequeue
#Message index storage path
storePathIndex=/soft/rocketmq/store/index
#Checkpoint File storage path
storeCheckpoint=/soft/rocketmq/store/checkpoint
#Abort File storage path
abortFile=/soft/rocketmq/store/abort
#Limit the message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#The role of the Broker
#- ASYNC_MASTER Master of asynchronous replication
#- SYNC_MASTER Synchronous dual-write Master
#- SLAVE
brokerRole=SYNC_MASTER
#Brush set way
#- ASYNC_FLUSH Flushes disks asynchronously
#- SYNC_FLUSH Flushes disks synchronously
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#Number of message sending thread pools
#sendMessageThreadPoolNums=128
#Number of pull message thread pools
#pullMessageThreadPoolNums=128
Copy the code
4) slave1

Server: 192.168.85.202 (S202)

vim /soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties
Copy the code

Modify the configuration as follows:

#Cluster Name
brokerClusterName=rocketmq-cluster
#Broker name. Note that the broker name is different for different configuration files
brokerName=broker-a
#0 indicates Master, and >0 indicates Slave
brokerId=1
#NameServer address, separated by semicolonnamesrvAddr=s201:9876; s202:9876#When sending messages, topics that do not exist on the server are automatically created, the default number of queues created
defaultTopicQueueNums=4
#Whether to allow the Broker to automatically create topics. It is recommended that the Topic be enabled offline or disabled online
autoCreateTopicEnable=true
#Whether to allow the Broker to automatically create subscription groups. You are advised to enable offline and disable online subscription groups
autoCreateSubscriptionGroup=true
#The listening port of the Broker for external services
listenPort=11011
#The time when files are deleted is 4 am by default
deleteWhen=04
#File retention time, 48 hours by default
fileReservedTime=120
#CommitLog The default size of each file is 1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue each file stores 30W entries by default. The value can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#Check the physical file disk space
diskMaxUsedSpaceRatio=88
#The store path
storePathRootDir=/soft/rocketmq/store-s
#CommitLog Storage path
storePathCommitLog=/soft/rocketmq/store-s/commitlog
#Consumption queue storage path Storage path
storePathConsumeQueue=/soft/rocketmq/store-s/consumequeue
#Message index storage path
storePathIndex=/soft/rocketmq/store-s/index
#Checkpoint File storage path
storeCheckpoint=/soft/rocketmq/store-s/checkpoint
#Abort File storage path
abortFile=/soft/rocketmq/store-s/abort
#Limit the message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#The role of the Broker
#- ASYNC_MASTER Master of asynchronous replication
#- SYNC_MASTER Synchronous dual-write Master
#- SLAVE
brokerRole=SLAVE
#Brush set way
#- ASYNC_FLUSH Flushes disks asynchronously
#- SYNC_FLUSH Flushes disks synchronously
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#Number of message sending thread pools
#sendMessageThreadPoolNums=128
#Number of pull message thread pools
#pullMessageThreadPoolNums=128
Copy the code

3.3.9 RocketMQ data synchronization

Alternatively, you can configure the four files on the S201 and synchronize them to the S202.

Scp-r /soft/rocketmq-all-4.7.1 root@s202:/soft/ SSH s202 ln -s /soft/rocketmq-all-4.7.1 /soft/ Rocketmq scp-r /ect/profile root@s202:/etc/ ssh s202 source /etc/profile scp -r /etc/hosts root@s202:/etc/Copy the code

3.3.10 Interface for Modifying the Startup Script file

If it has been modified before, skip it.

1) runbroker. Sh
vi /soft/rocketmq/bin/runbroker.sh
Copy the code

JVM parameters need to be adjusted appropriately for memory size:

#= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
#Development environment configures THE JVM Configuration
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
Copy the code
2) runserver then executes. Sh
vim /soft/rocketmq/bin/runserver.sh
Copy the code
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
Copy the code

3.3.11 Starting the Service

1) Start the NameServe cluster

Start NameServer at 192.168.85.201(S201) and 192.168.85.202(S202)

cd /soft/rocketmq/bin
nohup sh mqnamesrv &
Copy the code
2) Start the Broker cluster
  • Start master1 and slave2 on 192.168.85.201(s201)

Master1:

cd /soft/rocketmq/bin
nohup sh mqbroker -c /soft/rocketmq/conf/2m-2s-sync/broker-a.properties &
Copy the code

Slave2:

cd /soft/rocketmq/bin
nohup sh mqbroker -c /soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
Copy the code
  • Start master2 and slave2 on 192.168.85.202(s202)

master2

cd /soft/rocketmq/bin
nohup sh mqbroker -c /soft/rocketmq/conf/2m-2s-sync/broker-b.properties &
Copy the code

slave1

cd /soft/rocketmq/bin
nohup sh mqbroker -c /soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
Copy the code

3.3.12 Interface for Viewing the Process Status

After startup, view the startup process through JPS

3.3.13 Querying Logs

#View nameServer logs
tail -500f ~/logs/rocketmqlogs/namesrv.log
#Viewing broker Logs
tail -500f ~/logs/rocketmqlogs/broker.log
Copy the code

3.4 Mqadmin management tool

3.4.1 Usage Mode

/mqadmin {command} {args}

3.4.2 Command Introduction

1) Topic related
The name of the meaning Command options instructions
updateTopic Create the update Topic configuration -b Broker address: indicates the Broker in which the topic resides. Only one Broker is supported. The address is IP :port
-c Cluster name, which indicates the cluster where the topic resides. (You can query the cluster by clusterList.)
-h- Print the help
-n NameServer service address in the format of IP :port
-p Specify the new topic of read and write access (W = 2 | | R = 4 WR = 6)
-r Number of readable queues (default: 8)
-w Number of writable queues (default: 8)
-t ^[a-za-z0-9_ -]+$^[a-za-z0-9_ -]+$
deleteTopic Delete the Topic -c Cluster name: deleting a topic from a cluster (You can query a cluster by clusterList)
-h Print the help
-n NameServer service address in the format of IP :port
-t ^[a-za-z0-9_ -]+$^[a-za-z0-9_ -]+$
topicList View the Topic list -h Print the help
-c If -c is not configured, only the topic list is returned. If -c is added, information about clusterName, topic, and consumerGroup is returned. That is, the cluster and subscription relationship of a topic are not specified
-n NameServer service address in the format of IP :port
topicRoute View Topic routing information -t The name of the topic
-h Print the help
-n NameServer service address in the format of IP :port
topicStatus View the Topic message queue offset -t The name of the topic
-h Print the help
-n NameServer service address in the format of IP :port
topicClusterList View the Topic cluster list -t The name of the topic
-h Print the help
-n NameServer service address in the format of IP :port
updateTopicPerm Update Topic read and write permissions -t The name of the topic
-h Print the help
-n NameServer service address in the format of IP :port
-b Broker address: indicates the Broker in which the topic resides. Only one Broker is supported. The address is IP :port
-p Specify the new topic of read and write access (W = 2 | | R = 4 WR = 6)
-c Cluster name: indicates the cluster where the topic resides (you can query the cluster through clusterList). -b is preferred. If -b is not available, commands are executed for all brokers in the cluster
updateOrderConf Creating, deleting, and obtaining namespace-specific kV configurations from NameServer is not currently enabled -h Print the help
-n NameServer service address in the format of IP :port
-t The topic, the key
-v OrderConf, value
-m Method can be GET, PUT, or DELETE
allocateMQ The load results of the consumer list load message queue are calculated using the load averaging algorithm -t The name of the topic
-h Print the help
-n NameServer service address in the format of IP :port
-i The ipList, separated by commas, calculates the message queues for these IP unloaded topics
statsAll Print the Topic subscription relationship, TPS, accumulative amount, 24h read and write total amount, and other information -h Print the help
-n NameServer service address in the format of IP :port
-a Whether to print only active topics
-t Specify the topic
2) Cluster related
The name of the meaning Command options instructions
clusterList View cluster information, such as cluster, BrokerName, BrokerId, and TPS -m #InTotalYest, #OutTotalYest, #InTotalToday,#OutTotalToday
-h Print the help
-n NameServer service address in the format of IP :port
-i Print interval, in seconds
clusterRT Send messages to detect the cluster Broker RT. Message is sent to ${BrokerName} Topic. -a Amount, total number of probes per probe, RT = total time/amount
-s Message size, unit B
-c Which cluster to probe
-p Print format log to | segmentation, not print by default
-h Print the help
-m Belongs to the equipment room, printing use
-i Send interval, in seconds
-n NameServer service address in the format of IP :port
3) Broker related
The name of the meaning Command options instructions
updateBrokerConfig Updating the Broker configuration file modifies the Broker -b Broker address in the format of IP :port
-c The name of the cluster
-k The key value
-v The value value
-h Print the help
-n NameServer service address in the format of IP :port
brokerStatus View Broker statistics, health status (almost everything you want is there) -b Broker address: IP :port
-h Print the help
-n NameServer service address in the format of IP :port
brokerConsumeStats Consumption of each consumer in the Broker. Consume Offset, Broker Offset, Diff, TImestamp and other information are returned according to Message Queue dimension -b Broker address: IP :port
-t Request timeout
-l Diff Indicates the threshold. The value is printed only when the threshold is exceeded
-o Whether it is an order topic, usually false
-h Print the help
-n NameServer service address in the format of IP :port
getBrokerConfig Getting Broker configuration -b Broker address: IP :port
-n NameServer service address in the format of IP :port
wipeWritePerm Clear Broker write permission from NameServer -b Broker address: IP :port
-n NameServer service address in the format of IP :port
-h Print the help
cleanExpiredCQ Clear the expired Consume Queue on the Broker. Manually reducing the number of columns may result in an expired Queue -n NameServer service address in the format of IP :port
-h Print the help
-b Broker address: IP :port
-c The name of the cluster
cleanUnusedTopic Clean up the unused topics on the Broker and free the Consume Queue from memory. Manually deleting a Topic would produce unused topics -n NameServer service address in the format of IP :port
-h Print the help
-b Broker address: IP :port
-c The name of the cluster
sendMsgStatus Sends a message to the Broker, returning the send status and RT -n NameServer service address in the format of IP :port
-h Print the help
-b BrokerName, note that it is different from the Broker address
-s Message size, unit B
-c Send the number
4) Message correlation
The name of the meaning Command options instructions
queryMsgById If you are using an open source console, you should use offsetMsgId. This command has other parameters. For details, see QueryMsgByIdSubCommand. -i msgId
-h Print the help
-n NameServer service address in the format of IP :port
queryMsgByKey Query messages based on message keys -k msgKey
-t The name of the Topic
-h Print the help
-n NameServer service address in the format of IP :port
queryMsgByOffset Query the message by Offset -b Name of the Broker (please note that this is the name of the Broker, not the address of the Broker. The name of the Broker can be found in the clusterList)
-i Query queue id
-o Offset value
-t The name of the topic
-h Print the help
-n NameServer service address in the format of IP :port
queryMsgByUniqueKey According to the msgId query, the msgId is different from the offsetMsgId. For details, see Common Operation and maintenance issues. -g and -d are used together to try to get a specific consumer to consume the message and return the consumption result -h Print the help
-n NameServer service address in the format of IP :port
-i uniqe msg id
-g consumerGroup
-d clientId
-t The name of the topic
checkMsgSendRT Detects the RT that sends messages to a topic. The function is similar to clusterRT -h Print the help
-n NameServer service address in the format of IP :port
-t The name of the topic
-a Number of detection
-s Message size
sendMessage Send a Message, either to a specific Message Queue or to a general Message Queue, depending on the configuration. -h Print the help
-n NameServer service address in the format of IP :port
-t The name of the topic
-p Body, message body
-k keys
-c tags
-b BrokerName
-i queueId
consumeMessage Consume messages. You can perform different consumption logic with different configurations based on offset, start and end timestamps, and message queue consumption. See ConsumeMessageCommand. -h Print the help
-n NameServer service address in the format of IP :port
-t The name of the topic
-b BrokerName
-o The consumption starts at offset
-i queueId
-g Consumer Group
-s Start time stamp. See -h for the format
-d End timestamp
-c How many messages to consume
printMsg Consume messages from the Broker and print them, optionally for a period of time -h Print the help
-n NameServer service address in the format of IP :port
-t The name of the topic
-c Character set, such as UTF-8
-s SubExpress, filter expressions
-b Start timestamp. See -h for the format
-e End timestamp
-d Whether to print the message body
printMsgByQueue Similar to printMsg, but specifying a Message Queue -h Print the help
-n NameServer service address in the format of IP :port
-t The name of the topic
-i queueId
-a BrokerName
-c Character set, such as UTF-8
-s SubExpress, filter expressions
-b Start timestamp. See -h for the format
-e End timestamp
-p Print message or not
-d Whether to print the message body
-f Whether to count the number of tags and print them
resetOffsetByTime To reset the offset with the timestamp, both the Broker and the consumer are reset -h Print the help
-n NameServer service address in the format of IP :port
-g Consumer Group
-t The name of the topic
-s Reset the offset corresponding to this timestamp
-f Whether to force reset. If false, only backtracking offset is supported. If true, the timestamp corresponds to offset and consumeOffset regardless
-c Whether to reset the c++ client offset
5) Related to consumers and consumer groups
The name of the meaning Command options instructions
consumerProgress By viewing the subscription group consumption status, you can view the message accumulation volume of specific client IP addresses -g Name of the group to which the consumer belongs
-s Whether to print the CLIENT IP address
-h Print the help
-n NameServer service address in the format of IP :port
consumerStatus Check the consumer status, including whether there are all the same subscriptions in the same group, analyze whether the Process Queue is stacked, and return the consumer jStack. See ConsumerStatusSubCommand for more -h Print the help
-n NameServer service address in the format of IP :port
-g consumer group
-i clientId
-s Whether to execute jstack
getConsumerStatus Get the Consumer’s progress -g Name of the group to which the consumer belongs
-t Query subject
-i Consumer IP address of the client
-n NameServer service address in the format of IP :port
-h Print the help
updateSubGroup Update or create a subscription relationship -n NameServer service address in the format of IP :port
-h Print the help
-b The Broker address
-c The name of the cluster
-g Consumer Group Name
-s Whether the group is allowed to consume
-m Whether to start consumption from the minimum offset
-d Broadcast mode or not
-q Retry queue number
-r Maximum retry times
-i When slaveReadEnable is enabled and slave consumption is not reached, BrokerId is recommended for slave consumption. You can configure the slave machine ID to actively slave slave consumption
-w If the Broker recommends consuming from a slave, the configuration determines which slave to consume from. BrokerId is configured, such as 1
-a Whether to notify other consumer load balancers when the number of consumers changes
deleteSubGroup Delete the subscription relationship from the Broker -n NameServer service address in the format of IP :port
-h Print the help
-b The Broker address
-c The name of the cluster
-g Consumer Group Name
cloneGroupOffset The offset of the source group is used in the target group -n NameServer service address in the format of IP :port
-h Print the help
-s Source Consumer Group
-d Target Consumer Group
-t The name of the topic
-o Temporary unused
6) Connection correlation
The name of the meaning Command options instructions
consumerConnec tion Example Query a Consumer’s network connection -g Name of the group to which the consumer belongs
-n NameServer service address in the format of IP :port
-h Print the help
producerConnec tion Example Query the network connection of Producer -g Name of the group to which the producer belongs
-t The topic name
-n NameServer service address in the format of IP :port
-h Print the help
7) NameServer related
The name of the meaning Command options instructions
updateKvConfig Update NameServer kv configuration, currently not in use -s The namespace
-k key
-v value
-n NameServer service address in the format of IP :port
-h Print the help
deleteKvConfig Example Delete the kV configuration of NameServer -s The namespace
-k key
-n NameServer service address in the format of IP :port
-h Print the help
getNamesrvConfig Obtain the NameServer configuration -n NameServer service address in the format of IP :port
-h Print the help
updateNamesrvConfig Example Modify the NameServer configuration -n NameServer service address in the format of IP :port
-h Print the help
-k key
-v value
8) other
The name of the meaning Command options instructions
startMonitoring Start the monitoring process to monitor the number of deleted messages and messages in the retry queue -n NameServer service address in the format of IP :port
-h Print the help

3.4.3 Precautions

  • -n indicates the NameServer address in the format of IP :port
  • You can get help for almost any command by using -h
  • If both the Broker address (-b) and clusterName (-c) configuration items are available, the Broker address is preferred. If no Broker address is configured, run this command on all hosts in the cluster

3.5 Building a Cluster Monitoring Platform

3.5.1 track of overview

RocketMQ has an open source extension to the incubator- RocketmQ-Externals. There is a submodule in this project called RocketmQ-Console. This is the management console project. Pull the incubator- RocketmQ-externals local first, because we need to compile and package rocketmQ-Console ourselves.

3.5.2 Download and compile the package

git clone https://github.com/apache/rocketmq-externals
cd rocketmq-console
mvn clean package -Dmaven.test.skip=true
Copy the code

Note: Configure the NamesRV cluster address in rocketMQ. console before packaging, or add the NamesRV cluster address command when executing the Springboot JAR

Rocketmq. Config. NamesrvAddr = 192.168.85.201:9876; 192.168.85.202:9876Copy the code

Start the rocketmq – the console:

Java jar rocketmq - the console - ng - 1.0.0. JarCopy the code

After successful startup, we can access http://localhost:8080 through the browser to enter the console interface, as shown in the picture below:

Cluster status:

4. Example of sending messages

  • Import MQ client dependencies
< the dependency > < groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - client < / artifactId > < version > 4.7.1 < / version > </dependency>Copy the code
  • Message sender step analysis
1. Create a producer and specify the name of the producer group. 2. 4. Create message objects and specify Topic, Tag, and message body 5. 6. Shut down the producerCopy the code
  • Message consumer step analysis
1. Create a Consumer and specify the name of the Consumer group. 2. Subscribe to Topic and Tag 4. Set the callback function to process the message 5. Start consumerCopy the code

4.1 Basic Examples

4.1.1 Sending messages

1) Send the synchronization message

This reliable synchronous sending method is widely used, such as: important message notification, SMS notification.

public static void main(String[] args) throws Exception {
    // Instantiate the message Producer
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Set the NameServer address
    producer.setNamesrvAddr("s201:9876; s202:9876");
    // Start the Producer instance
    producer.start();
    for (int i = 0; i < 100; i++) {
        // Create the message and specify the Topic, Tag, and message body
        Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
                ("Hello RocketMQ moe " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        // Send a message to a Broker
        SendResult sendResult = producer.send(msg);
        // Returns whether the message arrived successfully via sendResult
        System.out.printf("%s%n", sendResult);
    }
    // If no more messages are sent, close the Producer instance.
    producer.shutdown();
}
Copy the code
2) Send asynchronous messages

Asynchronous messages are often used in response time sensitive business scenarios where the sender cannot tolerate long waits for a response from the Broker. Blog.csdn.net/jessDL/arti…

public static void main(String[] args) throws Exception {
    // Instantiate the message Producer
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Set the NameServer address
    producer.setNamesrvAddr("s201:9876; s202:9876");
    // Start the Producer instance
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);
    int messageCount = 100;// Number of messages sent
    final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
    for (int i = 0; i < messageCount; i++) {
        final int index = i;
        // Create the message and specify the Topic, Tag, and message body
        Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world zoe".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // SendCallback Receives the result callback asynchronously
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                countDownLatch.countDown();
                System.out.printf("%-10d OK %s %n", index,
                        sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                countDownLatch.countDown();
                System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); }}); }// RocketMQ async message cannot be found TOPIC No route info of this TOPIC
    // Thread.sleep(3000); // Delay the execution of the main thread to ensure that the topic is created successfully!
    countDownLatch.await(5, TimeUnit.SECONDS);// Close production at 100 (sender)
    // If no more messages are sent, close the Producer instance.
    producer.shutdown();
}
Copy the code
3) Sending messages in one direction

This approach is mainly used in scenarios where the result of sending is not particularly concerned, such as log sending.

public static void main(String[] args) throws Exception {
    // Instantiate the message Producer
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    // Set the NameServer address
    producer.setNamesrvAddr("s201:9876; s202:9876");
    // Start the Producer instance
    producer.start();
    for (int i = 0; i < 100; i++) {
        // Create the message and specify the Topic, Tag, and message body
        Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        );
        // Send a one-way message with no results returned
        producer.sendOneway(msg);

    }
    // If no more messages are sent, close the Producer instance.
    producer.shutdown();
}
Copy the code

4.1.2 Consuming messages

1) Load balancing mode

Consumers consume messages in load balancing mode. Multiple consumers consume queue messages together, and each consumer processes different messages.

public static void main(String[] args) throws Exception {
    // Instantiate the message producer and specify the group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    // Specify the Namesrv address.
    consumer.setNamesrvAddr("s201:9876; s202:9876");
    / / subscribe to the Topic
    consumer.subscribe("TopicTest"."*");
    // Load balancing mode consumption
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // Register the callback function to process the message
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
            System.out.printf("%s Receive New Messages: %s %n",
                    Thread.currentThread().getName(), msgs);
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the messager
    consumer.start();
    System.out.printf("Consumer Started.%n");
}
Copy the code
2) Broadcast mode

Consumers consume messages by broadcasting, and each consumer consumes the same message.

public static void main(String[] args) throws Exception {
    // Instantiate the message producer and specify the group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    // Specify the Namesrv address.
    consumer.setNamesrvAddr("s201:9876; s202:9876");
    / / subscribe to the Topic
    consumer.subscribe("TopicTest"."*");
    // Broadcast mode consumption
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // Register the callback function to process the message
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
            System.out.printf("%s Receive New Messages: %s %n",
                    Thread.currentThread().getName(), msgs);
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the messager
    consumer.start();
    System.out.printf("Consumer Started.%n");
}
Copy the code

4.2 Sequential Message

Message order means that messages can be consumed in the order in which they were sent (FIFO). RocketMQ can strictly ensure that messages are ordered, which can be divided into partition order or global order.

By default, messages are sent in Round Robin mode to different queues (partitioned queues). When consuming a message, it is pulled from multiple queues, in which case there is no guarantee of the order in which the message is sent and consumed. But if the sequential messages that control sending are only sent to the same queue, and are only pulled from that queue when consumed, order is guaranteed. If there is only one queue for sending and consuming, it is globally ordered. If more than one queue is involved, the partition is ordered, that is, messages are ordered relative to each queue.

Here is an example of partition ordering with an order. The sequential process of an order is: create, pay, push, complete. Messages with the same order number will be sent to the same queue successively. When consumed, the messages with the same OrderId must be obtained from the same queue.

4.2.1 Sequential message production

/** * OrderStep{orderId=15103111039, desc=' create '} * OrderStep{orderId=15103111039, desc=' create '} OrderId = 0, orderId= 0, orderId= 0, orderId= 0, orderId= 0, orderId= 0 OrderId = 0, orderId= 0, orderId= 0, orderId= 0, orderId= 0, orderId= 0, orderId= 0 } * OrderStep{orderId=15103111065, desc=' OrderStep '} * orderId=15103117235, Desc =' create '} * OrderStep{orderId=15103117235, desc=' pay '} */
public static void main(String[] args) throws Exception {
    // List<OrderStep> orderSteps = buildOrders();
    // Collections.sort(orderSteps, Comparator.comparingLong(o -> o.orderId));
    // for (OrderStep orderStep : orderSteps) {
    // System.out.println(orderStep);
    // }

    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

    producer.setNamesrvAddr("s201:9876; s202:9876");

    producer.start();

    String[] tags = new String[]{"TagA"."TagC"."TagD"};

    // Order list
    List<OrderStep> orderList = buildOrders();

    Date date = new Date();
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String dateStr = sdf.format(date);
    for (int i = 0; i < 10; i++) {
        // Add a time prefix
        String body = dateStr + " Hello RocketMQ " + orderList.get(i);
        Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Long id = (Long) arg;  // Select a queue based on the order ID
                long index = id % mqs.size();
                return mqs.get((int) index);
            }
        }, orderList.get(i).getOrderId());/ / order id

        System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                sendResult.getSendStatus(),
                sendResult.getMessageQueue().getQueueId(),
                body));
    }

    producer.shutdown();
}

/** * Order steps */
private static class OrderStep {
    private long orderId;
    private String desc;

    public long getOrderId(a) {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getDesc(a) {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString(a) {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\' ' +
                '} '; }}/** * generate simulated order data */
private static List<OrderStep> buildOrders(a) {
    List<OrderStep> orderList = new ArrayList<OrderStep>();

    OrderStep orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("Create");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("Create");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("Payment");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103117235L);
    orderDemo.setDesc("Create");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("Payment");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103117235L);
    orderDemo.setDesc("Payment");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111065L);
    orderDemo.setDesc("Complete");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("Push");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103117235L);
    orderDemo.setDesc("Complete");
    orderList.add(orderDemo);

    orderDemo = new OrderStep();
    orderDemo.setOrderId(15103111039L);
    orderDemo.setDesc("Complete");
    orderList.add(orderDemo);

    returnorderList; }}Copy the code

4.2.2 Sequential consumption of messages

public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
    consumer.setNamesrvAddr("s201:9876; s202:9876");
    /** * set whether the Consumer starts to consume at the head of the queue or at the end of the queue for the first time 

* If not for the first time, then continue to consume at the same position as the last time */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest"."TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // We can see that each queue has a unique consume thread to consume, and the order is ordered for each queue(partition) System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { // Simulate business logic processing... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.println("Consumer Started."); } Copy the code

4.3 Delayed Message

For example, in e-commerce, you can send a delayed message after submitting an order, check the status of the order one hour later, and cancel the order to release the inventory if there is still no payment.

4.3.1 Starting the Message Consumer

public static void main(String[] args) throws Exception {
    // Instantiate the consumer
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
    // Set the NameServer address
    consumer.setNamesrvAddr("s201:9876; s202:9876");
    / / subscribe switchable viewer
    consumer.subscribe("TestTopic"."*");
    // Register a message listener
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
            for (MessageExt message : messages) {
                // Print approximate delay time period
                System.out.println("Receive message[msgId=" + message.getMsgId() + "]" + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
            }
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the consumer
    consumer.start();
}
Copy the code

4.3.2 Sending delayed Messages

public static void main(String[] args) throws Exception {
    // Instantiate a producer to generate delayed messages
    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    // Set the NameServer address
    producer.setNamesrvAddr("s201:9876; s202:9876");
    // Start the producer
    producer.start();
    int totalMessagesToSend = 100;
    for (int i = 0; i < totalMessagesToSend; i++) {
        Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
        // Set latency level 3. This message will be sent after 10s. (Now only fixed times are supported, see delayTimeLevel)
        message.setDelayTimeLevel(3);
        // Send the message
        producer.send(message);
    }
    // Shut down the producer
    producer.shutdown();
}
Copy the code

4.3.3 validation

You will see that the message is consumed 10 seconds later than it is stored

4.3.4 Restrictions

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Copy the code

Right now, RocketMq does not support any time delay. You need to set several fixed delay levels, 1s through 2h for levels 1 through 18

4.4 Batch Message

Batching messages can significantly improve the performance of delivering small messages. The limitations are that these batch messages should have the same topic, the same waitStoreMsgOK, and cannot be delayed messages. In addition, the total size of this batch of messages should not exceed 4MB.

4.4.1 Sending Batch Messages

If you only send messages of less than 4MB at a time, it is easy to use batch processing, such as the following:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA"."OrderID001"."Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID002"."Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID003"."Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   / / handle the error
}
Copy the code

If the total length of the message is likely to be greater than 4MB, it is best to split the message

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext(a) {
       return currIndex < messages.size();
   }
   	@Override 
    public List<Message> next(a) {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // Adds 20 bytes to the log overhead
           if (tmpSize > SIZE_LIMIT) {
               // A single message exceeds the maximum limit
               // Ignore, otherwise the split process will be blocked
               if (nextIndex - currIndex == 0) {
                  // If the next sublist has no elements, add the sublist and exit the loop, otherwise just exit the loop
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       returnsubList; }}// Split a large message into smaller messages
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      / / handle the error}}Copy the code

4.5 Filtering Messages

In most cases, the TAG is a simple and useful design for selecting the messages you want. Such as:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC"."TAGA || TAGB || TAGC");
Copy the code

The consumer will receive a message containing either TAGA or TAGB or TAGC. But the limitation is that a message can only have one tag, which may not work for complex scenarios. In this case, you can use SQL expressions to filter the messages. SQL features can be computed from the properties at the time the message is sent. Some simple logic can be implemented under the syntax defined by RocketMQ. Here’s an example:

------------ | message | |----------| a > 5 AND b = 'abc' | a = 10 | --------------------> Gotten | b = 'abc'| | c = true | ------------ ------------ | message | |----------| a > 5 AND b = 'abc' | a = 1 | --------------------> Missed | b  = 'abc'| | c = true | ------------Copy the code

4.5.1 Basic SQL syntax

RocketMQ defines only some basic syntax to support this feature. You can also easily extend it.

  • Numerical comparison, for example: >, >=, <, <=, BETWEEN, =;
  • Character comparisons, such as: =, <>, IN;
  • IS NULL or IS NOT NULL;
  • Logic symbols AND, OR, NOT;

The supported types of constants are:

  • Values such as 123,3.1415;
  • Characters, such as’ ABC ‘, must be enclosed in single quotes;
  • NULL, a special constant
  • Boolean value, TRUE or FALSE

Only consumers using push mode can use SQL statements using SQL92 standard. The interface is as follows:

public void subscribe(finalString topic, final MessageSelector messageSelector)
Copy the code

4.5.2 Message Producer

When you send a message, you can set the properties of the message with putUserProperty

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();
Copy the code

4.5.3 Message consumer

Use MessagesElector.bysQL to filter messages using SQL

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
A, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start();Copy the code

4.6 Transaction Message

4.6.1 Process analysis

The figure above illustrates the general scheme of the transaction message, which is divided into two processes: the sending and committing of the normal transaction message, and the compensation process of the transaction message.

1) Sending and submitting transaction messages

(1) Sending half messages.

(2) Write result of the server response message.

(3) Execute the local transaction according to the sending result (if the writing fails, the half message is not visible to the service and the local logic is not executed).

(4) Commit or Rollback based on the local transaction state (the Commit operation generates the message index and makes the message visible to the consumer)

2) Transaction compensation

(1) Initiate a “callback” from the server for the pending transaction messages without Commit/Rollback

(2) After receiving the backcheck message, the Producer checks the status of local transactions corresponding to the backcheck message

(3) Commit or Rollback based on the local transaction status

The compensation phase is used to resolve situations where the Commit or Rollback messages time out or fail.

3) Status of the transaction message

A transaction message has three states: commit state, rollback state, and intermediate state:

  • TransactionStatus.Com mitTransaction: submit a transaction, it allows consumer spending this message.
  • TransactionStatus. RollbackTransaction: roll back a transaction, it represents the message will be deleted, is not permitted to consume.
  • TransactionStatus. Unknown: intermediate state, it means we need to check the message queue to determine the state.

4.6.2 Sending a transaction message

1) Create transactional producers

By creating a producer using the TransactionMQProducer class and specifying a unique ProducerGroup, you can set up a custom thread pool to handle these check requests. After the local transaction is executed, the message queue needs to be replied based on the execution result. Refer to the previous section for the returned transaction state.

public static void main(String[] args) throws MQClientException, InterruptedException {
    // Create transaction listener
    TransactionListener transactionListener = new TransactionListenerImpl();
    // Create a message producer
    TransactionMQProducer producer = new TransactionMQProducer("group6");
    producer.setNamesrvAddr("s201:9876; s202:9876");
    // The producer is the listener
    producer.setTransactionListener(transactionListener);
    // Start the message producer
    producer.start();
    String[] tags = new String[]{"TagA"."TagB"."TagC"};
    for (int i = 0; i < 3; i++) {
        try {
            Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ Transaction Msg " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
            TimeUnit.SECONDS.sleep(1);
        } catch(MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }}//producer.shutdown();
}
Copy the code
2) Implement the transaction listening interface

When sending the semi-message is successful, we use the executeLocalTransaction method to execute the local transaction. It returns one of the three transaction states mentioned in the previous section. The checkLocalTranscation method is used to check the status of a local transaction and to respond to a request to check the message queue. It also returns one of the three transaction states mentioned in the previous section.

public static class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("Perform local transactions");
        if (StringUtils.equals("TagA", msg.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TagB", msg.getTags())) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            returnLocalTransactionState.UNKNOW; }}@Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("MQ check message Tag [" + msg.getTags() + Local transaction execution result of "]");
        returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code

4.6.3 Restrictions

  1. Delayed and batch messages are not supported for transactional messages.
  2. To prevent a single message from being checked too many times and causing a semi-queue of messages to accumulate, we limit the number of checks for a single message to 15 by default, but users can use the Broker profiletransactionCheckMaxParameter to modify this restriction. If a message has been checked more than N times (N =transactionCheckMax) the Broker dismisses the message and, by default, prints an error log. Users can overrideAbstractTransactionCheckListenerClass to modify the behavior.
  3. Transaction messages are checked after a specific length of time such as transactionMsgTimeout in the Broker profile parameter. When sending a transaction message, the user can also change this limit by setting the user property CHECK_IMMUNITY_TIME_IN_SECONDS, which takes precedence overtransactionMsgTimeoutParameters.
  4. Transactional messages may be examined or consumed more than once.
  5. The target topic message submitted to the user may fail, currently depending on the logging. Its high availability is guaranteed through RocketMQ’s own high availability mechanism, and if you want to ensure that transactional messages are not lost and that transactional integrity is guaranteed, a synchronous dual write mechanism is recommended.
  6. The producer ID of a transaction message cannot be shared with the producer ID of another type of message. Unlike other types of messages, transaction messages allow a reverse lookup, where the MQ server can query to the consumer by their producer ID.