RocketMQ: Hello, RocketMQ!

concept

MQ (Message Queue) : Message queues. The main purpose is to put the transmitted data in the queue, through the message queue to achieve the function of message transmission, in which the producer is to put the message into the message queue, the consumer is to the specified queue to get the message for specific processing.

features

Peak clipping and valley filling: It is mainly used to solve the problem that the system crashes and messages are lost when the system write pressure is greater than the normal system processing capacity when the system traffic suddenly increases

System uncoupling: Uncoupling the system. The system will not be overly dependent on all service modules, resulting in bloated system and difficult maintenance

Improve system performance: When the system is decoupled, when a system calls multiple systems, you can send a message to the message queue, and then let the message queue notify the system to be called.

Current storage pressure measurement: some links on the line may be difficult to pressure measurement, you can use message queue to accumulate a certain amount of messages and then open the message to pressure measurement

RocketMQ features:

  • Support for transaction messages: message sending and DB operations maintain final consistency. Rabbitmq and Kafka do not support this
  • Support final consistency of data between multiple systems incorporating RocketMQ (multi-party transaction, two-party transaction prerequisite)
  • Delayed message support (RabbitMQ (natively not supported, dead letter queue can be delayed), kafka not supported)
  • Supports resending failure messages at specified times and intervals (kafka does not support this, rabbitMQ requires manual confirmation)
  • Support tag filtering on consumer to reduce unnecessary network traffic (not supported by RabbitMQ and Kafka)
  • Support for repeated message consumption (not supported by RabbitMQ, not supported by Kafka)

content

The cluster structure

Name Server

  • The Name Server is a stateless node that can be clustered and does not synchronize any information between nodes

Broker

  • Brokers can be Master and Slave. A Master can correspond to multiple slaves. However, a Slave can only correspond to one Master. The mapping between the Master and Slave is defined by specifying the same Broker ID and different Broker ids. A Broker ID 0 indicates that the Master is a Slave, and a non-0 indicates that the Slave is a Slave
  • Each Broker establishes long connections to all nodes in the Name Server cluster, and registers Topic information to all Name Servers periodically (every 30s by default). The Name Server scans all connections to the stockbroker every 10 seconds. If the Name Server does not receive a heartbeat within 2 minutes, the Name Server disconnects from the Broker

Producer

  • The Producer establishes a long connection with one node (randomly selected) in the Name Server cluster, obtains Topic routing information from the Name Server regularly, establishes a long connection with the Master that provides Topic services, and sends heartbeat messages to the Master periodically. Product is stateless and can be deployed in a cluster
  • Producer retrieves all topic queues from the NameServer every 30 seconds (which can be configured by ClientConfig’s pollNameServerInterval)
  • The Broker scans all live connections every 10 seconds. If the Broker does not receive a heartbeat within 2 minutes, it closes the connection to Producer

Consumer

  • The Consumer establishes a long connection with one node (randomly selected one) in the Name Server cluster, obtains Topic routing information from the Name Server regularly, establishes a long connection with the Master and Slave that provide Topic services, and directs the long connection to the Master. Slave Sends the heartbeat.
  • The Consumer can subscribe to messages from the Master or Slave, and the subscription rules are specified by the Broker
  • The Consumer gets the latest status of the topic from the Name Server every 30 seconds, meaning that it takes up to 30 seconds for the Consumer to know when the Broker is unavailable
  • The Consumer sends heartbeat to all associated brokers every 30 seconds. The broker scans all live connections every 10 seconds. If a connection has not sent heartbeat data within 2 minutes, it closes the connection. A notification is sent to all consumers in the Consumer Group, and the consumers in the Group reallocate the queue and continue to consume. When the Consumer is notified that the master is down, it turns to the slave. The slave cannot guarantee that 100% of the master’s messages will be synchronized, so a small number of messages will be lost. But once the master recovers, unsynchronized messages will eventually be consumed.
  • A consumer queue is created after a consumer connects (or has been connected before). We extended the native consumer identity from **{IP}@{consumer group} to {IP}@{consumer group}{topic}{tag}**, (for example: XXX, XXX, XXX, XXX @ mqtest_producergroup_2m2sTest_tag – test). Any element that is different is considered to be a different consumer, and each consumer has its own consumption queue (default: number of brokers queues * number of brokers).

Centos install RocketMQ

download

“Download address] (rocketmq.apache.org/release_not…

Upload to the specified Linux directory

  • unzip

Unzip rocketmq – all – 4.7.1 – bin – release. Zip

  • Start the NameServer

nohup ./bin/mqnamesrv &

  • Check whether the startup is successful

netstat -an | grep 9876

Before starting the broker, you need to edit the configuration file and change the JVM memory Settings. The default memory is 4GB (this can be changed if there are problems during startup).

  • Modify the configuration

cd bin

vim runserver.sh
Copy the code

vim runbroker.sh

  • Start the Broker

nohup ./mqbroker -n localhost:9876 &

Check whether the Broker is started

tail -f ~/logs/rocketmqlogs/broker.log

test

  • Message is sent

cd bin

export NAMESRV_ADDR=localhost:9876

./tools.sh org.apache.rocketmq.example.quickstart.Prod ucer

– Message receiving

cd bin

export NAMESRV_ADDR=localhost:9876./tools.sh org.apache.rocketmq.example.quickstart.Cons umer“`

  • Close the RocketMQ

cd bin

./mqshutdown broker 

./mqshutdown namesrv
Copy the code

Install the RocketMQ console

  • Download github.com/apache/rock… ses

  • Unzip, modify configuration, package (actually a front-end application)

  • Packaging orders

mvn clean package -Dmaven.test.skip=true

  • Go to target and start the JAR

Java jar rocketmq – the console – ng – 1.0.0. Jar

  • If yes, an error is reported

(Reason for this: JAR packages are missing in JDK9 and above and need to be manually imported into the project POM)

< the groupId > javax.mail. XML. Bind < / groupId > < artifactId > jaxb - API < / artifactId > < version > 2.3.0 < / version >Copy the code
< the groupId > com. Sun. XML. Bind < / groupId > < artifactId > jaxb - impl < / artifactId > < version > 2.3.0 < / version >Copy the code
< the groupId > com. Sun. XML. Bind < / groupId > < artifactId > jaxb - core < / artifactId > < version > 2.3.0 < / version >Copy the code
< the groupId > javax.mail. Activation < / groupId > < artifactId > activation < / artifactId > < version > 1.1.1 < / version >Copy the code
  • Rebuild Maven

//TEST error can be ignored,

mvn clean install

Browser access localhost:9877, if error

In RocketMQ installation, the Linux port number is not enabled. Ports 10909 and 9876 need to be enabled

firewall-cmd –zone=public –add- port=10909/tcp –permanent

firewall-cmd –zone=public –add- port=9876/tcp –permanent

systemctl restart firewalld.service

firewall-cmd –reload

  • Restart the console project

Project test message sending

  • Pom.xml introduces dependencies
< the groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - spring - the boot - starter < / artifactId > < version > 2.1.0 < / version >Copy the code
  • Production of the message

public static void main(String[] args) throws Exception {

DefaultMQProducer = new DefaultMQProducer("myproducer-group"); // Set NameServer producer.setNamesrvaddr (" 192.168.248.129:9,876 "); // Start producer. Start (); Message Message = new Message("myTopic", "myTag", ("TestMQ").getBytes()); SendResult result = producer. Send (message, 1000); System.out.println(result); // Close the producer producer.shutdown(); }Copy the code

If sendDefaultImpl Call Timeout fails, you can manually disable the Linux firewall

Disabling the Firewall

Systemctl stop firewalld # check firewall status

firewall-cmd –state

// Open port 10911

firewall-cmd –zone=public –add- port=10911/tcp –permanent

systemctl restart firewalld.service firewall-cmd –reload

Messages can be viewed from the RocketMQ console

  • The consumer side consumes messages

Well, that’s all for today’s article, hoping to help those of you who are confused in front of the screen