Why use MQ

  • The application of decoupling
  1. The more coupled the system, the less fault tolerant it is. Taking e-commerce applications as an example, after users create orders, if they call the inventory system, logistics system and payment system coupled with each other, any subsystem fails or becomes temporarily unavailable due to upgrading or other reasons, the ordering operation will be abnormal and the user experience will be affected.
  2. Use message queues to decouple. For example, when the logistics system breaks down, it takes several minutes to repair. During this time, the data to be processed by the logistics system is cached in the message queue, and the user’s order operation is completed normally. When the logistics system replies, the order messages in the message queue can be processed, and the terminal system cannot perceive that the logistics system has broken down for several minutes.

  • Traffic peak clipping

  1. An application system may be overwhelmed by a sudden surge in system traffic. Message queues can cache a large number of requests and spread them over a long period of time, which greatly improves system stability and user experience.

  • Data distribution

  1. Message queues allow data to flow more across multiple systems. The producer of the data doesn’t need to care who uses it, just sends it to the message queue, where the consumer gets it directly.

RocketMQ installation

  1. Download the binary versionBinary: rocketmq - all - 4.7.0 - bin - the zip (PGP) [SHA512]
  2. Unpack the
  3. catalogue
    • Bin: startup scripts, including shell scripts and CMD scripts
    • Conf: instance configuration files, including broker configuration files and logback configuration files
    • Lib: Depends on jar packages, including Netty, Commons-lang, FastJSON, etc
  4. Start MQ, log file in/root/logs/rocketmqlogs
  • Modifying the JVM configuration
# edit runbroker.sh and runserver.sh to change the default JVM size, both to 1GB
vi runbroker.sh
vi runserver.sh
Copy the code
  • Start the NameServer
# 1. Start NameServer
 ./mqnamesrv &
 # 2. View the startup log
tail -f /root/logs/rocketmqlogs/namesrv.log
Copy the code
  • Start the Broker
# 1. Start Broker
./mqbroker -n localhost:9876 &
# 2. View the startup log
tail -f /root/logs/rocketmqlogs/broker.log 
Copy the code
  • Close the RocketMQ
# 1. Close NameServer
sh bin/mqshutdown namesrv
# 2. Close the Broker
sh bin/mqshutdown broker
Copy the code
  • Check whether the startup is successful
[root@localhost bin]# jps
2981 Jps
2858 NamesrvStartup
2909 BrokerStartup
Copy the code

RocketMQ role introduction

  1. Producer: message producer, responsible for producing messages, generally by the business system.
  2. Consumer: Message Consumer, responsible for consuming messages, usually the backend system responsible for asynchronous consumption. The following is an overview of different RocketMQ consumers.
    • Push Consumer: Registers a listener with the Consumer object to listen for pushed messages and consume them.
    • Pull Consumer: Actively requests the Broker to pull messages and then consume them.
  3. Producer Group: a collection of producers, typically used to send the same kind of message.
  4. Consumer Group: consumer collection, typically used to receive a type of message for consumption.
  5. Broker: MQ message server for message storage and forwarding.
  6. NameServer: Broker coordinator, similar to Zookeeper.
  7. Topic: Topic of a message. The sender can send messages to multiple topics, and consumers can subscribe messages to multiple topics.
  8. Message QueueIn:TopicAn internal partition of a small class that can be used for send and receive processing.

RocketMQ cluster mode

  1. A single Master model
  • This is risky because if the Broker restarts or goes down, the entire service may become unavailable. This is not recommended for online environments, but can be used for local testing.
  1. More than the Master model
  • There are no slaves in a cluster but only masters, for example, two or three masters. The advantages and disadvantages of this mode are as follows:
  • The configuration is simple, and the maintenance of a Master disk has no impact on applications. When the Master disk is configured as RAID10, the RAID10 disk is highly reliable and does not lose messages even when the machine is down and cannot be recovered. (a few messages are lost when the Master disk is flushed asynchronously, but none is lost when the Master disk is flushed synchronously.)
  • Disadvantages: During a single machine outage, unconsumed messages on the machine cannot be subscribed until the machine is restored, affecting the real-time performance of messages.
  1. Multi-master, Multi-Slave mode (Asynchronous)
  • Each Master is configured with a Slave, and there are multiple pairs of master-slaves. HA adopts asynchronous replication, and the Master has a short message delay (in milliseconds). Advantages and disadvantages of this mode are as follows:
  • Advantages: Even if the disk is damaged, the message loss is very small, and the real-time performance of the message is not affected. In addition, when the Master is down, consumers can still consume from the Slave. This process is transparent to the application, without manual intervention, and the performance is almost the same as that of the multi-master mode.
  • Disadvantages: Master down, loss of small messages in case of disk corruption.
  1. Multi-master, Multi-Slave mode (Synchronous)
  • Each Master is configured with a Slave. There are multiple master-slave pairs. HA adopts the synchronous dual-write mode.
  • Advantages: No single point of failure for data and services, no delay for messages in the case of Master outage, high service availability and data availability;
  • Disadvantages: Lower performance than asynchronous replication (about 10% lower), higher RT for sending a single message, and the standby node cannot automatically switch to the host when the active node is down.

Dual-active/Secondary Cluster construction (Synchronous dual-write mode)

RocketMQ cluster. PNG

Cluster workflow

  1. Start NameServer. The NameServer listens to the port and waits for brokers, Producers, and consumers to connect to it, acting as a routing control center.
  2. The Broker starts, maintains long connections to all Nameservers, and periodically sends heartbeat packets. The heartbeat package contains the current Broker information (IP+ port, etc.) and stores all Topic information. After registration, there is a mapping between topics and brokers in the NameServer cluster.
  3. Before sending or receiving a message, a Topic is created. When creating a Topic, you need to specify which brokers the Topic will be stored on, or you can automatically create a Topic when sending a message.
  4. The Producer sends a message and establishes a long connection with one of the NameServer clusters when starting the message. The Producer obtains from the NameServer which brokers the currently sent Topic exists. The polling selects a queue from the queue list. It then establishes a long connection with the Broker on which the queue resides to send messages to the Broker.
  5. Similar to Producer, a Consumer establishes a long connection with one of the Nameservers, obtains information about which brokers are currently subscribed to, and then directly establishes a connection channel with the brokers to consume messages.

Cluster construction – Environment preparation

  1. Disable the firewall.

Configure HOST information for easy access.

  1. vim /etc/hosts
# nameserver192.168.162.130 rocketmq - nameserver1 192.168.162.131 rocketmq - nameserver2# broker
192.168.162.130 rocketmq-master1
192.168.162.131 rocketmq-slave1
192.168.162.132 rocketmq-master2
192.168.162.133 rocketmq-slave2
Copy the code
  1. After the configuration is complete, restart the NICsystemctl restart network

Modify the message storage path

mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
Copy the code

Cluster setup -Broker configuration

master1

  1. vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties
  2. Modify the configuration as follows:
# Name of the owning cluster
brokerClusterName=rocketmq-cluster
Note that different configuration files are filled in differently here
brokerName=broker-a
#0 indicates Master, >0 indicates Slave
brokerId=0
#nameServer address, semicolon splitnamesrvAddr=rocketmq-nameserver1:9876; rocketmq-nameserver2:9876# when sending a message, automatically create a topic that does not exist on the server
defaultTopicQueueNums=4
Whether to allow the Broker to create topics automatically
autoCreateTopicEnable=true
Whether to enable the Broker to automatically create subscription groups
autoCreateSubscriptionGroup=true
The port on which the Broker listens for external services
listenPort=10911
The default time for deleting files is 4 a.m
deleteWhen=04
The default retention time is 48 hours
fileReservedTime=120
CommitLog The default file size is 1 gb
mapedFileSizeCommitLog=1073741824
ConsumeQueue By default, each file holds 30W bytes, which can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
Check physical file disk space
diskMaxUsedSpaceRatio=88
# storage path
storePathRootDir=/usr/local/rocketmq/store
CommitLog storage path
storePathCommitLog=/usr/local/rocketmq/store/commitlog
Consume queue storage path Storage path
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
Message index storage path
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint File storage path
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort file storage path
abortFile=/usr/local/rocketmq/store/abort
# limit the message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
The role of the Broker
# -async_master Asynchronous replication Master
# -sync_master Synchronizes the double write Master
#- SLAVE
brokerRole=SYNC_MASTER
# Brush plate mode
# -async_flush Asynchronously flush disks
# -sync_flush Synchronously flush disks
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# Number of sending thread pools
#sendMessageThreadPoolNums=128
# pull message thread pool number
#pullMessageThreadPoolNums=128
Copy the code

slave1

  1. Modifying a Configuration Filevi /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties
# Name of the owning cluster
brokerClusterName=rocketmq-cluster
Note that different configuration files are filled in differently here
brokerName=broker-a
#0 indicates Master, >0 indicates Slave
brokerId=1
#nameServer address, semicolon splitnamesrvAddr=rocketmq-nameserver1:9876; rocketmq-nameserver2:9876# when sending a message, automatically create a topic that does not exist on the server
defaultTopicQueueNums=4
Whether to allow the Broker to create topics automatically
autoCreateTopicEnable=true
Whether to enable the Broker to automatically create subscription groups
autoCreateSubscriptionGroup=true
The port on which the Broker listens for external services
listenPort=11011
The default time for deleting files is 4 a.m
deleteWhen=04
The default retention time is 48 hours
fileReservedTime=120
CommitLog The default file size is 1 gb
mapedFileSizeCommitLog=1073741824
ConsumeQueue By default, each file holds 30W bytes, which can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
Check physical file disk space
diskMaxUsedSpaceRatio=88
# storage path
storePathRootDir=/usr/local/rocketmq/store
CommitLog storage path
storePathCommitLog=/usr/local/rocketmq/store/commitlog
Consume queue storage path Storage path
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
Message index storage path
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint File storage path
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort file storage path
abortFile=/usr/local/rocketmq/store/abort
# limit the message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
The role of the Broker
# -async_master Asynchronous replication Master
# -sync_master Synchronizes the double write Master
#- SLAVE
brokerRole=SLAVE
# Brush plate mode
# -async_flush Asynchronously flush disks
# -sync_flush Synchronously flush disks
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
# Number of sending thread pools
#sendMessageThreadPoolNums=128
# pull message thread pool number
#pullMessageThreadPoolNums=128
Copy the code

master2

  1. Modifying a Configuration Filevi /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties
# Name of the owning cluster
brokerClusterName=rocketmq-cluster
Note that different configuration files are filled in differently here
brokerName=broker-b
#0 indicates Master, >0 indicates Slave
brokerId=0
#nameServer address, semicolon splitnamesrvAddr=rocketmq-nameserver1:9876; rocketmq-nameserver2:9876# when sending a message, automatically create a topic that does not exist on the server
defaultTopicQueueNums=4
Whether to allow the Broker to create topics automatically
autoCreateTopicEnable=true
Whether to enable the Broker to automatically create subscription groups
autoCreateSubscriptionGroup=true
The port on which the Broker listens for external services
listenPort=10911
The default time for deleting files is 4 a.m
deleteWhen=04
The default retention time is 48 hours
fileReservedTime=120
CommitLog The default file size is 1 gb
mapedFileSizeCommitLog=1073741824
ConsumeQueue By default, each file holds 30W bytes, which can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
Check physical file disk space
diskMaxUsedSpaceRatio=88
# storage path
storePathRootDir=/usr/local/rocketmq/store
CommitLog storage path
storePathCommitLog=/usr/local/rocketmq/store/commitlog
Consume queue storage path Storage path
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
Message index storage path
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint File storage path
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort file storage path
abortFile=/usr/local/rocketmq/store/abort
# limit the message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
The role of the Broker
# -async_master Asynchronous replication Master
# -sync_master Synchronizes the double write Master
#- SLAVE
brokerRole=SYNC_MASTER
# Brush plate mode
# -async_flush Asynchronously flush disks
# -sync_flush Synchronously flush disks
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# Number of sending thread pools
#sendMessageThreadPoolNums=128
# pull message thread pool number
#pullMessageThreadPoolNums=128
Copy the code

slave2

  1. Modifying a Configuration Filevi /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties
# Name of the owning cluster
brokerClusterName=rocketmq-cluster
Note that different configuration files are filled in differently here
brokerName=broker-b
#0 indicates Master, >0 indicates Slave
brokerId=1
#nameServer address, semicolon splitnamesrvAddr=rocketmq-nameserver1:9876; rocketmq-nameserver2:9876# when sending a message, automatically create a topic that does not exist on the server
defaultTopicQueueNums=4
Whether to allow the Broker to create topics automatically
autoCreateTopicEnable=true
Whether to enable the Broker to automatically create subscription groups
autoCreateSubscriptionGroup=true
The port on which the Broker listens for external services
listenPort=11011
The default time for deleting files is 4 a.m
deleteWhen=04
The default retention time is 48 hours
fileReservedTime=120
CommitLog The default file size is 1 gb
mapedFileSizeCommitLog=1073741824
ConsumeQueue By default, each file holds 30W bytes, which can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
Check physical file disk space
diskMaxUsedSpaceRatio=88
# storage path
storePathRootDir=/usr/local/rocketmq/store
CommitLog storage path
storePathCommitLog=/usr/local/rocketmq/store/commitlog
Consume queue storage path Storage path
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
Message index storage path
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint File storage path
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort file storage path
abortFile=/usr/local/rocketmq/store/abort
# limit the message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
The role of the Broker
# -async_master Asynchronous replication Master
# -sync_master Synchronizes the double write Master
#- SLAVE
brokerRole=SLAVE
# Brush plate mode
# -async_flush Asynchronously flush disks
# -sync_flush Synchronously flush disks
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
# Number of sending thread pools
#sendMessageThreadPoolNums=128
# pull message thread pool number
#pullMessageThreadPoolNums=128
Copy the code

Start the cluster

1) Start the NameServe cluster

cd /usr/local/rocketmq/bin
./mqnamesrv &
Copy the code

2) Start the Broker cluster

  1. Master1:
cd /usr/local/rocketmq/bin
./mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties &
Copy the code
  1. slave1
cd /usr/local/rocketmq/bin
./mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
Copy the code
  1. master2
cd /usr/local/rocketmq/bin
./mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties &
Copy the code
  1. slave2
cd /usr/local/rocketmq/bin
./mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
Copy the code

The cluster monitoring platform was built

  1. Summary:RocketMQThere is an open source project that extends itincubator-rocketmq-externalsThe project has a submodule calledrocketmq-consoleThis is the administrative console projectincubator-rocketmq-externalsPull to local, because we need to do it ourselvesrocketmq-consoleCompile and package to run.
  2. Before packagingrocketmq-consoleIn the configurationnamesrvCluster address:
Rocketmq. Config. NamesrvAddr = 192.168.25.135:9876; 192.168.25.138:9876Copy the code
  1. Compile and package
mvn clean package -Dmaven.test.skip=true
Copy the code
  1. Start the rocketmq – the console:
Java jar rocketmq - the console - ng - 1.0.0. JarCopy the code
  1. accesshttp://localhost:8080