Introduction to the

RocketMQ characteristics

RocketMQ is a distributed messaging middleware open-source by Alibaba in 2012. RocketMQ has been donated to the Apache Software Foundation and became an Apache Top Level project on September 25, 2017. As a domestic middleware that has experienced the baptism of “super project” of Alibaba Double 11 for many times and has stable and excellent performance, it has been used by more and more domestic enterprises in recent years with its characteristics of high performance, low latency and high reliability. Its main characteristics are:

1. Flexibility and scalability

RocketMQ naturally supports clustering, and each of its four core components (Name Server, Broker, Producer, and Consumer) can scale horizontally without a single point of failure.

2. Ability to accumulate massive information

RocketMQ uses the zero-copy principle to accumulate very large messages. It is said that a single machine can accumulate hundreds of millions of messages while maintaining low write latency.

3. Support sequential messages

Message consumers are guaranteed to consume messages in the order they were sent. Sequential messages are classified into global ordering and local ordering. It is recommended to use local ordering, that is, producers send a certain type of message to the same queue in sequence.

4. Multiple message filtering methods

Message filtering is divided into server – side filtering and consumer – side filtering. The server side can filter according to the requirements of message consumers. The advantage is to reduce unnecessary message transmission, but the disadvantage is to increase the burden of message server, and the implementation is relatively complicated. On the consumer side, filtering is completely application-specific. This approach is more flexible and has the disadvantage that many useless messages are transmitted to message consumers.

5. Support transaction messages

RocketMQ supports transaction messages in addition to normal and sequential messages, which provides another solution for distributed transactions.

6. Backtrack

Backtracking is a message that has been successfully consumed by the consumer. Due to business requirements, it needs to be re-consumed. RocketMQ supports backtracking of consumption in milliseconds, either forward or backward.

The basic concept

The following is a map of RocketMQ’s deployment structure, which describes the four core components of RocketMQ: Name Server, Broker, Producer, and Consumer. Each component can be deployed in a cluster mode for horizontal scaling.

producers

The Producer is responsible for producing messages, and the Producer sends messages to the message server that are generated by the business application system. RocketMQ provides three ways to send messages: synchronous, asynchronous, and one-way.

The synchronous

Synchronous sending means that the sender sends the data and sends the next data packet only after receiving the response from the receiver. It is used for important notification messages, such as important notification emails and marketing SMS messages.

Asynchronous send

Asynchronous sending means that the sender sends data and then sends the next data packet without waiting for the response from the receiver. It is generally used in service scenarios where the link takes a long time and the response time is sensitive, for example, the transcoding service is enabled after a user uploads a video.

One way to send

Unidirectional sending refers to sending messages without waiting for a response from the server and without triggering a callback function. It applies to scenarios that require very short time but do not require high reliability, such as log collection.

The producer group

A Producer Group is a Group of a class of producers who are usually sending one kind of message and sending it logically. From the deployment structure, producers mark themselves as a cluster by the name of Producer Group.

consumers

The Consumer is responsible for consuming the messages, and the Consumer pulls the information from the message server and enters it into the user application. From the perspective of user application, there are two types of consumers: pull consumers and push consumers.

Pull consumers

A Pull Consumer takes the initiative to Pull information from the message server. As long as the messages are pulled in batches, the user application will start the consumption process, so Pull is called active consumption.

Push consumer

The Push Consumer encapsulates the pull, consumption schedule, and other internal maintenance of the message, leaving the callback interface performed when the message arrives to the user application. So a Push is called a passive consumption type, but from an implementation point of view it still pulls messages from the message server. Different from a Pull, Push first registers a consumption listener and starts consuming messages only after the listener is triggered.

Consumer groups

Consumer Group Is the collection name of a class of consumers that are grouped together because they usually consume the same kind of messages and consume logically. Consumer groups are similar to producer groups in that they are grouped together and named for the same roles. Grouping is a clever concept, and RocketMQ uses this grouping mechanism to achieve natural message load balancing. When consuming messages, the Consumer Group implements the distribution of messages to multiple Consumer server instances. For example, if a Topic has nine messages and one Consumer Group has three instances (three processes or three machines), each instance will share three messages equally. This also means that we can easily scale horizontally by adding machines.

Message server

A message Broker is a message storage center that receives messages from producers and stores them. Consumers get messages from the Broker. It also stores message-related metadata, including user groups, consumption progress offsets, queue information, and so on. As can be seen from the deployment structure diagram, brokers are classified into Master and Slave. The Master can write and read, while the Slave can only read. Physically, there are four cluster deployment modes of the Broker: single-master, multi-master, multi-master, multi-slave (synchronous flushing), and multi-master, multi-slave (asynchronous flushing).

A single Master

If the Broker restarts or goes down, the entire service may become unavailable. This method is risky, so it is not recommended for online environments.

More than the Master

All message servers are masters, not slaves. This method has the advantages of simple configuration, and has no impact on applications when a Master fails or restarts for maintenance. The disadvantage is that during the outage of a single machine, messages that are not consumed on the machine cannot be subscribed until the machine is restored, which affects the real-time performance of messages.

Multi-master multi-Slave (Asynchronous replication)

Each Master is configured with a Slave. Therefore, there are multiple pairs of master-slaves. Messages are asynchronously replicated and messages are delayed in milliseconds between the Master and Slave. The advantages of this method are that the message loss is very small, and the real-time performance of the message is not affected. After the Master is down, consumers can continue to consume from the Slave. The intermediate process is transparent to the user application, without manual intervention, and the performance is almost the same as that of the multi-master method. The disadvantage is that when Master is down, very few messages are lost in the event of disk corruption.

Multi-master multi-Slave (Synchronous double-write)

Each Master is configured with a Slave. Therefore, there are multiple master-slave pairs. Messages are written in synchronous dual-write mode. The advantages of this approach are that there are no single points of data and services, no latency for Master downtime messages, and very high availability of services and data. The disadvantage is that the performance is lower than that of asynchronous replication, and the delay of sending messages is higher.

Name server

The NameServer is used to hold Broker meta-information and find Broker information for producers and consumers. NameServer is designed to be almost stateless and can scale horizontally, with no communication between nodes, marking itself as a pseudo-cluster by deploying multiple machines. Each Broker registers with the NameServer when it is started. The Producer gets the routing information of the Broker from Topic to NameServer before sending messages. The Consumer also gets the routing information of the Topic periodically. So in terms of functionality it should be similar to ZooKeeper, and it is said that early versions of RocketMQ did use ZooKeeper, and then changed to NameServer for their own implementation.

The message

A Message, “Message,” is a Message to be transmitted. A message must have a Topic, which you can think of as the address to which your letter will be mailed. A message can also have an optional Tag and key-value pair at the value, which can be used to set a business key and look up the message on the Broker to find problems during development.

The theme

A Topic can be thought of as a generic class of messages. It is the first level type of messages. For example, an e-commerce system can be divided into transaction messages, logistics messages, etc., and a message must have a Topic. Topics have very loose relationships with producers and consumers. A Topic can have zero, one, or more producers sending messages to it, and a producer can send messages to different topics at the same time. A Topic can also be subscribed by zero, one, or more consumers.

The label

A Tag, which can be thought of as a subtopic, is a second-level type of message that provides additional flexibility for the user. Using tags, messages for different purposes within the same business module can be identified with different tags for the same Topic. For example, the transaction message can be divided into: transaction creation message, transaction completion message, and so on. A message can have no Tag. Tags help keep your code clean and consistent, and they also help with the query system RocketMQ provides.

The message queue

Message queues, in which topics are divided into one or more subtopics, are Message queues. Multiple message queues can be set up under a Topic, and RocketMQ polls all queues under that Topic to send the message. Broker internal messages:

Message consumption pattern

There are two patterns of news consumption: Clustering and Broadcasting. The default mode is cluster consumption. In this mode, a consumer cluster jointly consumes multiple queues of a topic, and a queue is consumed by only one consumer. If a consumer fails, other consumers in the group will continue to consume after the consumer fails. The broadcast consumption message will be sent to each consumer in the consumer group for consumption.

Message order

Message Order has two types: Orderly consumption and concurrent consumption. Sequential consumption means that messages are consumed in the same order as the producer sends them for each message queue, so if you are dealing with a scenario where global ordering is mandatory, you need to ensure that only one message queue is used for the topic. Parallel consumption no longer guarantees message order, and the maximum amount of parallel consumption is limited by the thread pool specified by each consumer client.

An engineering example

Java accesses a RocketMQ instance

RocketMQ currently supports Java, C++, and Go. Using Java as an example, RocketMQ is used to send and receive messages.

Introduction of depend on

< the dependency > < groupId > org. Apache. Rocketmq < / groupId > < artifactId > rocketmq - client < / artifactId > < version > 4.2.0 < / version > </dependency>Copy the code

Add RocketMQ client access support. The RocketMQ client version is the same as the RocketMQ version installed.

Message producer

package org.study.mq.rocketMQ.java; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; Public class Producer {public static void main(String[] args) throws Exception {// Create a Producer, DefaultMQProducer = new DefaultMQProducer("niwei_producer_group"); // Specify the address of producer.setNamesrvaddr ("localhost:9876"); Producer.start (); // Initialize producer.start().for(int i = 0; i < 100; I++) {// create a Message object with the subject, label, and content of the Message."topic_example_java"/* Message subject name */,"TagA"/* Message tag */, ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
            );

            //发送消息并返回结果
            SendResult sendResult = producer.send(msg);

            System.out.printf("%s%n", sendResult); } // once a producer instance is no longer in use, it is shutdown, including cleaning up resources and shutting down network connections. }}Copy the code

The example uses the DefaultMQProducer class to create a message producer. Typically, each application creates a DefaultMQProducer object, so the application maintains the producer object, which can be set to a global object or a singleton. The producerGroup constructor parameter is the name of the message producerGroup. Both producers and consumers must give the GroupName and ensure that the name is unique. ProducerGroup is not useful for sending ordinary messages, which will be used later in distributed transaction messages.

Next, specify the NameServer address and call the start method for initialization, which only needs to be called once during the entire application life cycle.

After initialization, the send method is called to send messages. In this example, 100 identical messages are simply constructed to send. Actually, a Producer object can send messages with multiple topics and labels, and the labels of the message object can be empty. The send method is called synchronously and indicates success as long as no exception is thrown.

Finally, when the application exits, the shutdown method is called to clean up resources, close the network connection, and unregister itself from the server. It is generally recommended that the application call shutdown in the JBOSS, Tomcat, and other containers exit hook.

Message consumer

package org.study.mq.rocketMQ.java; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.io.UnsupportedEncodingException; import java.util.Date; import java.util.List; Public class Consumer {public static void main(String[] args) throws Exception {// Create a message Consumer, DefaultMQPushConsumer = new DefaultMQPushConsumer("niwei_consumer_group"); // Specify NameServer address consumer.setNamesrvaddr ("localhost:9876"); / / set the Consumer first started from the queue head start consumption or queue tail began to Consumer. SetConsumeFromWhere (ConsumeFromWhere. CONSUME_FROM_FIRST_OFFSET); // Subscribe to all messages in a Topic"topic_example_java"."*"); / / register message listener consumer. RegisterMessageListener (newMessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {/ / by default only a message in the list, you can set parameters to batch receives the messageif(list ! = null) {for (MessageExt ext : list) {
                        try {
                            System.out.println(new Date() + new String(ext.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); }}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); // The consumer object must call start to initialize consumer.start() before it can be used; System.out.println("Message consumer started"); }}Copy the code

The example uses the DefaultMQPushConsumer class to create a message consumer. Like a producer, an application creates a DefaultMQPushConsumer object, which is typically maintained by the application and can be set to a global object or a singleton. The class constructor takes the parameter consumerGroup, which is the name of the message’s consumerGroup and needs to be unique.

Next, specify the NameServer address and set whether consumer consumption starts at the head of the queue or at the tail of the queue when the consumer application first starts.

The subscribe method is then called to subscribe to the consumer object for messages under the specified topic. The first argument is the topic name and the second erase is the tag name. The example represents messages subscribed to all tags under the topic name topic_example_java.

The most important thing is to register the message listener to consume messages. The example uses the method of Consumer Push, which is to set the listener callback to consume messages. By default, there is only one message in the List of listener callback methods, and you can set parameters to receive messages in batches.

Finally, the start method is called for initialization, which only needs to be called once during the entire application life cycle.

Start the Name Server

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
Copy the code

The Name Server and Broker of the four core components of RocketMQ are provided by the RocketMQ installation package, so both applications need to be started to provide messaging services. Start Name Server first, make sure you have RocketMQ matching JDK installed on your machine and set the environment variable JAVA_HOME, then execute mqNAMesrv in bin directory of RocketMQ installation directory, By default, the command execution status is output to the nohup.out file in the current directory. Finally, the actual running status of the Name Server is displayed by tracing the log file.

Start the Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
Copy the code

Also make sure you have the RocketMQ matching JDK installed on your machine, set the environment variable JAVA_HOME, and then execute the MQBroker in the bin directory under the RocketMQ installation directory. By default, the execution of this command is output to the nohup.out file in the current directory. Finally, the log file is traced to see the actual operation of the Broker.

Run the Consumer

Run the Consumer class first so that when a producer sends a message, the record of the message can be seen in the Consumer back end. If the configuration is ok, you will see a message printed on the console that the consumer is started

Run the Producer

Finally, run the Producer class, and you can see the received messages in the Consumer console

Spring integration RocketMQ

Unlike RabbitMQ, ActiveMQ, Kafka and other messaging middleware, the Spring community has provided integration of these middleware products in a variety of ways, For example, ActiveMQ via Spring-JMS, RabbitMQ via Spring-Rabbit under spring AMQP, kafka via Spring-kafka, They make it easier to use their apis in Spring projects. There are currently three ways to integrate RocketMQ into the Spring framework. One is to define message producers and consumers as bean objects to be managed by the Spring container. The second is to use external project RocketMQ RocketMQ community – JMS (https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms) and then through the spring – JMS Third, if your application is spring-boot based, External projects for RocketMQ can be used Rocketmq – spring – the boot – starter (https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter) is more convenient Send and receive messages.

In general, the RocketMQ-JMS project implements parts of the JMS 1.1 specification and currently supports the publish/subscribe model for sending and receiving messages in JMS. The RocketMQ-spring-boot-starter project currently supports synchronous sending, asynchronous sending, one-way sending, sequential consumption, parallel consumption, cluster consumption, broadcast consumption, and other features. You can use this project if you prefer a quick development framework like Spring Boot and your existing features meet your business requirements. Of course, the first approach is the most flexible in terms of API usage, so let’s take a quick look at how Spring integrates RocketMQ as an example.

Message producer

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class SpringProducer {

    private Logger logger = Logger.getLogger(getClass());

    private String producerGroupName;

    private String nameServerAddr;

    private DefaultMQProducer producer;

    public SpringProducer(String producerGroupName, String nameServerAddr) {
        this.producerGroupName = producerGroupName;
        this.nameServerAddr = nameServerAddr;
    }

    public void init() throws Exception {
        logger.info("Start message producer service..."); // Create a producer and set a producer group. Producer = new DefaultMQProducer(producerGroupName); // Specify the NameServer address producer.setNamesrvaddr (nameServerAddr); // Initialize SpringProducer. Producer.start () only needs to be initialized once during the entire application lifecycle; logger.info("Message producer service started successfully.");
    }

    public void destroy() {
        logger.info("Start shutting down message producer service...");

        producer.shutdown();

        logger.info("Message producer service is down.");
    }

    public DefaultMQProducer getProducer() {
        returnproducer; }}Copy the code

Message producers divide the life cycle of producer DefaultMQProducer objects into constructors, init, and destroy. Constructors include the producer group name and NameServer address as variables provided by the Spring container during configuration. Init instantiates the DefaultMQProducer object, sets the NameServer address, initializes the producer object, and destroy cleans up resources when the producer object is destroyed.

Message consumer

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class SpringConsumer {

    private Logger logger = Logger.getLogger(getClass());

    private String consumerGroupName;

    private String nameServerAddr;

    private String topicName;

    private DefaultMQPushConsumer consumer;

    private MessageListenerConcurrently messageListener;

    public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {
        this.consumerGroupName = consumerGroupName;
        this.nameServerAddr = nameServerAddr;
        this.topicName = topicName;
        this.messageListener = messageListener;
    }


    public void init() throws Exception {
        logger.info("Start messaging consumer service..."); // Create a message consumer and set a message consumer group consumer = new DefaultMQPushConsumer(consumerGroupName); // Specify the NameServer address consumer.setNamesrvaddr (nameServerAddr); / / set the Consumer first start from the queue head start consumption or queue tail began to Consumer. SetConsumeFromWhere (ConsumeFromWhere. CONSUME_FROM_FIRST_OFFSET); // Subscribe to all messages in a Topic. Subscribe (topicName,"*"); / / register message listener consumer. RegisterMessageListener (messageListener); // The consumer object must call start to initialize consumer.start() before it can be used; logger.info("Message consumer service started successfully.");
    }

    public void destroy(){
        logger.info("Begin shutting down message Consumer services...");

        consumer.shutdown();

        logger.info("Message consumer service is closed.");
    }

    public DefaultMQPushConsumer getConsumer() {
        returnconsumer; }}Copy the code

Similar to the message producer, the message consumer divides the life cycle of the producer DefaultMQPushConsumer object into constructor, init, and destroy methods, as described in the introduction to Java accessing a RocketMQ instance. Of course, having a consumer object also requires the message listener to perform specific processing logic upon receiving the message.

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class MessageListener implements MessageListenerConcurrently {

    private Logger logger = Logger.getLogger(getClass());

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if(list ! = null) {for (MessageExt ext : list) {
                try {
                    logger.info("Monitored message:" + new String(ext.getBody(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); }}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}Copy the code

The message listener class simply extracts the anonymous inner class code declared when registering the message listener in the Previous Java example and defines it as a single class.

Spring configuration file

Since only the Spring framework is used for integration, there is no need to add dependencies beyond the Sping framework core JAR package. In this example, the message producer and message consumer are split into two profiles to better demonstrate the effect of sending and receiving messages.

<? xml version="1.0" encoding="UTF-8"? > <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="Http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

    <bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy">
        <constructor-arg name="nameServerAddr" value="localhost:9876"/>
        <constructor-arg name="producerGroupName" value="spring_producer_group"/>
    </bean>

</beans>
Copy the code

Message producer configuration is simple, defining a message producer object that calls init when initialized and destroys before the object is destroyed, with the Name Server address and producer group configured.

<? xml version="1.0" encoding="UTF-8"? > <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="Http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

    <bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" />

    <bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy">
        <constructor-arg name="nameServerAddr" value="localhost:9876"/>
        <constructor-arg name="consumerGroupName" value="spring_consumer_group"/>
        <constructor-arg name="topicName" value="spring-rocketMQ-topic" />
        <constructor-arg name="messageListener" ref="messageListener" />
    </bean>

</beans>
Copy the code

The message consumer is similar to the message producer configuration, with the addition of a message listener object definition and binding.

Run instance program

Start the Name Server and Broker as described above, and then run the message producer and message consumer programs. For simplicity we simulate these two programs with two unit test classes:

package org.study.mq.rocketMQ.spring;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringProducerTest {

    private ApplicationContext container;

    @Before
    public void setup() {
        container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");
    }

    @Test
    public void sendMessage() throws Exception {
        SpringProducer producer = container.getBean(SpringProducer.class);

        for(int i = 0; i < 20; I++) {// create a Message object with the subject, label, and content of the Message."spring-rocketMQ-topic",
                    null,
                    ("Spring RocketMQ demo "+ I).getBytes(remotingHelper.default_charset) /* Message content */); SendResult SendResult = producer.getproducer ().send(MSG); System.out.printf("%s%n", sendResult); }}}Copy the code

The SpringProducerTest class simulates message producers sending messages.

package org.study.mq.rocketMQ.spring;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringConsumerTest {

    private ApplicationContext container;

    @Before
    public void setup() {
        container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml"); } @Test public void consume() throws Exception { SpringConsumer consumer = container.getBean(SpringConsumer.class); Thread.sleep(200 * 1000); consumer.destroy(); }}Copy the code

The SpringConsumerTest class simulates the message consumer receiving the message, and the current thread needs to be put to sleep for a while before the Consume method returns, keeping the consumer program alive to listen to the message sent by the producer.

Run the SpringProducerTest and SpringConsumerTest classes separately, and you can see the received message in the SpringConsumerTest console:

If you start two SpringConsumerTest class processes, since they belong to the same consumer group, you can see in the SpringConsumerTest console that they share the message equally: