Introduction to MQ

1.1 Why use MQ

Message queues are a first-in, first-out data structure

The application scenarios include the following three aspects

1) Apply decoupling

The higher the coupling of the system, the lower the fault tolerance. Take the e-commerce application as an example. After a user creates an order, if the inventory system, logistics system and payment system are coupled, and any one of the subsystems is faulty or temporarily unavailable due to upgrade or other reasons, the ordering operation will be abnormal and the user experience will be affected.

With message queue decoupling, the coupling of the system is improved. For example, when the logistics system fails, it takes several minutes to repair. During this time, the data to be processed by the logistics system is cached in the message queue and the user’s order operation is completed normally. When the logistics system replies, the order messages in the message queue can be processed, and the terminal system does not perceive the failure of the logistics system for several minutes.

2) Flow peaking

Application systems may be overwhelmed by a sudden surge in system request traffic. With message queues, large numbers of requests can be cached and processed over long periods of time, which can greatly improve system stability and user experience.

In general, in order to ensure the stability of the system, if the system load exceeds the threshold, it will prevent user requests, which will affect the user experience, rather than using message queues to cache requests and wait for the system to finish processing to notify the user of the completion of the order, which is not a good experience.

For economic purposes:

If the QPS of the service system is 1000 during normal hours, the traffic peak is 10000. It is not cost-effective to configure a high-performance server to cope with the peak traffic. In this case, you can use message queues to peak the peak traffic

3) Data distribution

Message queues allow data to flow more widely across multiple systems. The producer of the data does not need to care about who uses the data, but simply sends the data to the message queue, where the consumer gets the data directly from the message queue

1.2 The advantages and disadvantages of MQ

Advantages: decoupling, peaking, data distribution

Disadvantages include the following:

  • The system availability decreases

    The more external dependencies introduced into the system, the worse the stability of the system. When MQ goes down, there is an impact on business.

    How to ensure high availability of MQ?

  • System complexity increases

    The addition of MQ has greatly increased the complexity of the system, which used to be synchronous remote calls between systems, and now asynchronous calls are made through MQ.

    How do YOU ensure that messages are not consumed repeatedly? How do I handle message loss? What about sequentiality of message delivery?

  • Consistency problem

    After processing services, system A sends message data to systems B, C, and D through MQ. If systems B and C process data successfully, system D fails to process data.

    How to ensure the consistency of message data processing?

1.3 Comparison of various MQ products

Common MQ products include Kafka, ActiveMQ, RabbitMQ, and RocketMQ.

A quick start to RocketMQ

RocketMQ is the MQ middleware of Alibaba in 2016, which is developed with Java language. Within Ali, RocketMQ has undertaken the message flow of high concurrency scenarios such as “Double 11” and can process the messages of trillion level.

2.1 Preparations

2.1.1 download RocketMQ

RocketMQ version 4.6.0 is chosen here

Download address: Download address

The official document: rocketmq.apache.org/docs/quick-…

2.2.2 Environment Requirements

  • On a system

  • JDK1.8(64位)

2.2 installation RocketMQ

2.2.1 Installation Procedure

Here I install it as a binary package:

  1. Decompress the Installation Package
  2. Going to the installation directory

2.2.2 Introduction to Directories

  • bin: Startup scripts, including shell scripts and CMD scripts
  • conf: Instance profile, including broker profile, logback profile, etc
  • lib: depends on JAR packages, including Netty, Commons-lang, and FastJSON

2.3 start RocketMQ

  1. The default RocketMQ VIRTUAL machine has a large memory. Starting Broker or NameServer may fail due to insufficient memory, so you need to edit the following two configuration files to change the JVM memory size

    #Edit runbroker. Sh and runServer. sh to change the default JVM size
    $ vi bin/runbroker.sh
    	#The reference set
    	JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
    
    $ vi bin/runserver.sh
    	#The reference set
    	JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    Copy the code
  2. Start the NameServer

    #1. Start the NameServer
    nohup sh bin/mqnamesrv &
    #2. View startup logs
    tail -f ~/logs/rocketmqlogs/namesrv.log
    Copy the code
  3. Start the Broker

    #1. Start the Broker
    nohup sh bin/mqbroker -n localhost:9876 &
    #2. View startup logs
    tail -f ~/logs/rocketmqlogs/broker.log 
    Copy the code

Some optional parameters for bin/ mqBroker:

  • -c: Specifies the path of the configuration file
  • -n: Indicates the address of NameServer

2.4 test RocketMQ

2.4.1 Sending messages

#1. Set environment variables
export NAMESRV_ADDR=localhost:9876
#2. Use the Demo of the installation package to send messages
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
Copy the code

2.4.2 Receiving messages

#1. Set environment variables
export NAMESRV_ADDR=localhost:9876
#2. Receive the message
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Copy the code

2.5 close RocketMQ

#1. Close the NameServer
sh bin/mqshutdown namesrv
#2. Close the Broker
sh bin/mqshutdown broker
Copy the code

2.6 Roles

  • Producer: The sender of the message; Example: sender
  • Consumer: Message receiver; Example: Recipient
  • Consumer Group: Consumer group; Each consumer instance belongs to a consumer group, and each message is consumed by only one consumer instance in the same consumer group. (Different consumer groups can consume the same message simultaneously)
  • Broker: Staging and transmitting messages; Example: express delivery companies
  • NameServer: Manages Broker; Example: the management of express delivery companies
  • Topic: Distinguish the types of messages; A sender can send messages to one or more topics; The recipient of a message can subscribe to one or more Topic messages
  • Message Queue: is a Topic partition; Used to send and receive messages in parallel

2.7 Detailed description of broker Configuration Files

The default broker configuration file location is: conf/broker

# Name of the cluster
brokerClusterName=rocketmq-cluster
Broker name. Note that the broker name is different for different configuration files
brokerName=broker-a
#0 indicates Master, >0 indicates Slave
brokerId=0
#nameServer address, semicolon split
namesrvAddr=rocketmq-nameserver1:9876; rocketmq-nameserver2:9876
# When sending a message, automatically create a topic that does not exist on the server, the default number of queues created
defaultTopicQueueNums=4
Whether to allow the Broker to create Topic automatically. It is recommended to enable it offline and disable it online
autoCreateTopicEnable=true
Whether to allow brokers to automatically create subscription groups. It is recommended to enable offline and disable online
autoCreateSubscriptionGroup=true
#Broker listening port for external services
listenPort=10911
Delete file time point, default 4:00 a.m
deleteWhen=04
The default file retention time is 48 hours
fileReservedTime=120
CommitLog default size of each file is 1G
mapedFileSizeCommitLog=1073741824
ConsumeQueue Saves 30W entries per file by default, which can be adjusted according to service conditions
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
# Check the physical file disk space
diskMaxUsedSpaceRatio=88
# Storage path
storePathRootDir=/usr/local/rocketmq/store
#commitLog Storage path
storePathCommitLog=/usr/local/rocketmq/store/commitlog
Consumption queue storage path Storage path
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
# Message index storage path
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint File storage path
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
# Abort File storage path
abortFile=/usr/local/rocketmq/store/abort
# limit message size
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
The role of the Broker
# -async_master Asynchronous replication Master
# -sync_master Synchronous dual-write Master
#- SLAVE
brokerRole=SYNC_MASTER
# Brushing mode
# -async_flush Flush the disk asynchronously
# -sync_flush Flush disk synchronously
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
# Number of thread pools sending messages
#sendMessageThreadPoolNums=128
# Number of pull message thread pools
#pullMessageThreadPoolNums=128
Copy the code

2.8 Building a visual monitoring platform

2.8.1 overview

RocketMQ has an open source extension to the incubator- RocketmQ-Externals. There is a submodule in this project called RocketmQ-Console. This is the management console project. Pull the incubator- RocketmQ-externals local first, because we need to compile and package rocketmQ-Console ourselves.

2.8.2 Download and compile the package

  1. Cloning project
git clone https://github.com/apache/rocketmq-externals
Copy the code
  1. inrocketmq-consoleIn the configurationnamesrvCluster address:
$ cdRocketmq - the console $vim SRC/main/resources/application properties rocketmq. Config. NamesrvAddr = 10.211.55.4:9876Copy the code
  1. The configuration is compiled and packaged

    mvn clean package -Dmaven.test.skip=true
    Copy the code
  2. Start the rocketmq – the console:

    Nohup Java -jar rocketmq-console-ng-2.0.0.jar > tmp.log &Copy the code

After successful startup, we can access http://IP address :8080 through the browser to enter the console interface, as shown below:

3. Message Sending and Consumption Examples (Maven)

  • Import MQ client dependencies

    == Note == : RocketMQ-client must be the same as RocketMQ

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.6.0</version>
    </dependency>
    Copy the code
  • Message sender step analysis:

    1. Creating a message producerproducerAnd specify the producer group name
    2. The specifiedNameserveraddress
    3. Start theproducer
    4. Creates the message object, specifying the topicTopic,TagAnd the message body
    5. Send a message
    6. Shut down the producerproducer
  • Analysis of message consumer steps:

    1. Create a consumerConsumer, formulate the consumer group name
    2. The specifiedNameserveraddress
    3. Subscribe to the topicTopicTag
    4. Set the callback function to process the message
    5. Start consumerconsumer

3.1 Basic Examples

3.1.1 Sending messages

1) Send the synchronization message

This reliable synchronous sending method is widely used, such as: important message notification, SMS notification.

public class SyncProducer {
	public static void main(String[] args) throws Exception {
    	// Instantiate the message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// Set the NameServer address
    	producer.setNamesrvAddr("localhost:9876");
    	// Set the number of retries when a message fails to be sent synchronously. The default is 2
        producer.setRetryTimesWhenSendFailed(2);
        // set the timeout period for sending messages, which is 3000ms by default
        producer.setSendMsgTimeout(3000);
    	// Start the Producer instance
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // Create the message and specify the Topic, Tag, and message body
    	    Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// Send a message to a Broker
            SendResult sendResult = producer.send(msg);
            // Returns whether the message arrived successfully via sendResult
            System.out.printf("%s%n", sendResult);
    	}
    	// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code

2) Send asynchronous messages

Asynchronous messages are often used in response time sensitive business scenarios where the sender cannot tolerate long waits for a response from the Broker.

public class AsyncProducer {
	public static void main(String[] args) throws Exception {
    	// Instantiate the message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// Set the NameServer address
        producer.setNamesrvAddr("localhost:9876");
        // Set the number of retries when asynchronous message sending fails. The default value is 2
        producer.setRetryTimesWhenSendAsyncFailed(2);
        // set the timeout period for sending messages, which is 3000ms by default
        producer.setSendMsgTimeout(3000);
    	// Start the Producer instance
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
    	for (int i = 0; i < 100; i++) {
                final int index = i;
            	// Create the message and specify the Topic, Tag, and message body
                Message msg = new Message("TopicTest"."TagA"."OrderID188"."Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // SendCallback Receives the result callback asynchronously
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index,
                            sendResult.getMsgId());
                    }
                    @Override
                    public void onException(Throwable e) {
      	              System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); }}); }// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code

3) Sending messages in one direction

This approach is mainly used in scenarios where the result of sending is not particularly concerned, such as log sending.

public class OnewayProducer {
	public static void main(String[] args) throws Exception{
    	// Instantiate the message Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    	// Set the NameServer address
        producer.setNamesrvAddr("localhost:9876");
    	// Start the Producer instance
        producer.start();
    	for (int i = 0; i < 100; i++) {
        	// Create the message and specify the Topic, Tag, and message body
        	Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// Send a one-way message with no results returned
        	producer.sendOneway(msg);

    	}
    	// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code

3.1.2 Consuming messages

1) Cluster mode (load balancing)

Consumers consume messages in cluster mode. == Only one consumer in the same consumer group will consume == for a message

public static void main(String[] args) throws Exception {
    // Instantiate the message producer and specify the group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    // Specify the Namesrv address.
    consumer.setNamesrvAddr("localhost:9876");
    / / subscribe to the Topic
    consumer.subscribe("Test"."*");
    // Load balancing mode consumption
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // Register the callback function to process the message
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
            System.out.printf("%s Receive New Messages: %s %n", 
                              Thread.currentThread().getName(), msgs);
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the messager
    consumer.start();
    System.out.printf("Consumer Started.%n");
}
Copy the code

2) Broadcast mode

Consumers consume messages by broadcasting, == a message is consumed by every consumer in the same consumer group ==

public static void main(String[] args) throws Exception {
    // Instantiate the message producer and specify the group name
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    // Specify the Namesrv address.
    consumer.setNamesrvAddr("localhost:9876");
    / / subscribe to the Topic
    consumer.subscribe("Test"."*");
    // Broadcast mode consumption
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // Register the callback function to process the message
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
            System.out.printf("%s Receive New Messages: %s %n", 
                              Thread.currentThread().getName(), msgs);
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the messager
    consumer.start();
    System.out.printf("Consumer Started.%n");
}
Copy the code

3.2 Sequential Messages

Message order means that messages can be consumed in the order in which they were sent (FIFO). RocketMQ can strictly ensure that messages are ordered, which can be divided into partition order or global order.

By default, messages are sent in Round Robin mode to different queues (partitioned queues). When consuming a message, it is pulled from multiple queues, in which case there is no guarantee of the order in which the message is sent and consumed. But if the sequential messages that control sending are only sent to the same queue, and are only pulled from that queue when consumed, order is guaranteed. If there is only one queue for sending and consuming, it is globally ordered. If more than one queue is involved, the partition is ordered, that is, messages are ordered relative to each queue.

Here is an example of partition ordering with an order. The sequential process of an order is: create, pay, push, complete. Messages with the same order number will be sent to the same queue successively. When consumed, the messages with the same OrderId must be obtained from the same queue.

3.2.1 Sequential message production

/** * Send sequential messages */
public class Producer {

   public static void main(String[] args) throws Exception {
       DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

       producer.setNamesrvAddr("127.0.0.1:9876");

       producer.start();

       String[] tags = new String[]{"TagA"."TagC"."TagD"};

       // Order list
       List<OrderStep> orderList = new Producer().buildOrders();

       Date date = new Date();
       SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
       String dateStr = sdf.format(date);
       for (int i = 0; i < 10; i++) {
           // Add a time prefix
           String body = dateStr + " Hello RocketMQ " + orderList.get(i);
           Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

           SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  // Select a queue based on the order ID
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());/ / order id

           System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
               sendResult.getSendStatus(),
               sendResult.getMessageQueue().getQueueId(),
               body));
       }

       producer.shutdown();
   }

   /** * Order steps */
   private static class OrderStep {
       private long orderId;
       private String desc;

       public long getOrderId(a) {
           return orderId;
       }

       public void setOrderId(long orderId) {
           this.orderId = orderId;
       }

       public String getDesc(a) {
           return desc;
       }

       public void setDesc(String desc) {
           this.desc = desc;
       }

       @Override
       public String toString(a) {
           return "OrderStep{" +
               "orderId=" + orderId +
               ", desc='" + desc + '\' ' +
               '} '; }}/** * generate simulated order data */
   private List<OrderStep> buildOrders(a) {
       List<OrderStep> orderList = new ArrayList<OrderStep>();

       OrderStep orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("Create");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("Create");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("Payment");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("Create");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("Payment");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("Payment");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111065L);
       orderDemo.setDesc("Complete");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("Push");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103117235L);
       orderDemo.setDesc("Complete");
       orderList.add(orderDemo);

       orderDemo = new OrderStep();
       orderDemo.setOrderId(15103111039L);
       orderDemo.setDesc("Complete");
       orderList.add(orderDemo);

       returnorderList; }}Copy the code

3.2.2 Sequential consumption of messages

/** * Sequential message consumption, with transaction mode (the application can control when Offset is committed) */
public class ConsumerInOrder {

   public static void main(String[] args) throws Exception {
       DefaultMQPushConsumer consumer = new 
           DefaultMQPushConsumer("please_rename_unique_group_name_3");
       consumer.setNamesrvAddr("127.0.0.1:9876");
       /** * set whether the Consumer starts to consume at the head of the queue or at the end of the queue for the first time 

* If not for the first time, then continue to consume at the same position as the last time */
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest"."TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // We can see that each queue has a unique consume thread to consume, and the order is ordered for each queue(partition) System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { // Simulate business logic processing... TimeUnit.SECONDS.sleep(random.nextInt(10)); } catch (Exception e) { e.printStackTrace(); } returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); System.out.println("Consumer Started."); }}Copy the code

3.3 Delayed Message

For example, in e-commerce, you can send a delayed message after submitting an order, check the status of the order one hour later, and cancel the order to release the inventory if there is still no payment.

3.3.1 Starting the message Consumer

public class ScheduledMessageConsumer {
   public static void main(String[] args) throws Exception {
      // Instantiate the consumer
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
      / / subscribe switchable viewer
      consumer.subscribe("TestTopic"."*");
      // Register a message listener
      consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
              for (MessageExt message : messages) {
                  // Print approximate delay time period
                  System.out.println("Receive message[msgId=" + message.getMsgId() + "]" + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
              }
              returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the consumerconsumer.start(); }}Copy the code

3.3.2 Sending delayed Messages

public class ScheduledMessageProducer {
   public static void main(String[] args) throws Exception {
      // Instantiate a producer to generate delayed messages
      DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
      // Start the producer
      producer.start();
      int totalMessagesToSend = 100;
      for (int i = 0; i < totalMessagesToSend; i++) {
          Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
          // Set latency level 3. This message will be sent after 10s. (Now only fixed times are supported, see delayTimeLevel)
          message.setDelayTimeLevel(3);
          // Send the message
          producer.send(message);
      }
       // Shut down the producerproducer.shutdown(); }}Copy the code

# # # 4.3.3 validation

You will see that the message is consumed 10 seconds later than it is stored

3.3.4 Restrictions

// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Copy the code

Right now, RocketMq does not support any time delay. You need to set several fixed delay levels, 1s through 2h for levels 1 through 18

3.4 Batch Messages

Batching messages can significantly improve the performance of delivering small messages. The limitations are that these batch messages should have the same topic, the same waitStoreMsgOK, and cannot be delayed messages. In addition, the total size of this batch of messages should not exceed 4MB.

3.4.1 Sending Batch Messages

If you only send messages of less than 4MB at a time, it is easy to use batch processing, such as the following:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA"."OrderID001"."Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID002"."Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA"."OrderID003"."Hello world 2".getBytes()));
try {
   producer.send(messages);
} catch (Exception e) {
   e.printStackTrace();
   / / handle the error
}
Copy the code

If the total length of the message is likely to be greater than 4MB, it is best to split the message

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext(a) {
       return currIndex < messages.size();
   }
   	@Override 
    public List<Message> next(a) {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // Adds 20 bytes to the log overhead
           if (tmpSize > SIZE_LIMIT) {
               // A single message exceeds the maximum limit
               // Ignore, otherwise the split process will be blocked
               if (nextIndex - currIndex == 0) {
                  // If the next sublist has no elements, add the sublist and exit the loop, otherwise just exit the loop
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       returnsubList; }}// Split a large message into smaller messages
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      / / handle the error}}Copy the code

3.5 Filtering Messages

In most cases, the TAG is a simple and useful design for selecting the messages you want. Such as:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC"."TAGA || TAGB || TAGC");
Copy the code

The consumer will receive a message containing either TAGA or TAGB or TAGC. But the limitation is that a message can only have one tag, which may not work for complex scenarios. In this case, you can use SQL expressions to filter the messages. SQL features can be computed from the properties at the time the message is sent. Some simple logic can be implemented under the syntax defined by RocketMQ. Here’s an example:

------------ | message | |----------| a > 5 AND b = 'abc' | a = 10 | --------------------> Gotten | b = 'abc'| | c = true | ------------ ------------ | message | |----------| a > 5 AND b = 'abc' | a = 1 | --------------------> Missed | b  = 'abc'| | c = true | ------------Copy the code

3.5.1 Basic SQL syntax

RocketMQ defines only some basic syntax to support this feature. You can also easily extend it.

  • Numerical comparison, such as:>.> =.<.< =.BETWEEN.=
  • Character comparisons, such as:=.<>.IN
  • IS NULLorIS NOT NULL
  • Logic symbolAND.OR.NOT

The supported types of constants are:

  • Numerical values, such as:123.3.1415
  • Characters such as:'abc'Must be wrapped in single quotation marks
  • NULL, special constants
  • Boolean value,TRUEFALSE

Only consumers using push mode can use SQL statements using SQL92 standard. The interface is as follows:

public void subscribe(finalString topic, final MessageSelector messageSelector)
Copy the code

3.5.2 Message Producer

When you send a message, you can set the properties of the message with putUserProperty

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
   tag,
   ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();
Copy the code

3.5.3 Message Consumer

Use MessagesElector.bysQL to filter messages using SQL

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
A, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override
   public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start();Copy the code

3.6 Transaction Message

3.6.1 Process Analysis

The figure above illustrates the general scheme of the transaction message, which is divided into two processes: the sending and committing of the normal transaction message, and the compensation process of the transaction message.

####1) Sending and submitting transaction messages

(1) Sending half messages.

(2) Write result of the server response message.

(3) Execute the local transaction according to the sending result (if the writing fails, the half message is not visible to the service and the local logic is not executed).

(4) Commit or Rollback based on the local transaction state (the Commit operation generates the message index and makes the message visible to the consumer)

2) Transaction compensation

(1) Initiate a “callback” from the server for the pending transaction messages without Commit/Rollback

(2) After receiving the backcheck message, the Producer checks the status of local transactions corresponding to the backcheck message

(3) Commit or Rollback based on the local transaction status

The compensation phase is used to resolve situations where the Commit or Rollback messages time out or fail.

3) Status of the transaction message

A transaction message has three states: commit state, rollback state, and intermediate state:

  • TransactionStatus.Com mitTransaction: submit a transaction, it allows consumer spending this message.
  • TransactionStatus. RollbackTransaction: roll back a transaction, it represents the message will be deleted, is not permitted to consume.
  • TransactionStatus. Unknown: intermediate state, it means we need to check the message queue to determine the state.

3.6.2 Sending a Transaction Message

1) Create transactional producers

By creating a producer using the TransactionMQProducer class and specifying a unique ProducerGroup, you can set up a custom thread pool to handle these check requests. After the local transaction is executed, the message queue needs to be replied based on the execution result. Refer to the previous section for the returned transaction state.

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // Create transaction listener
        TransactionListener transactionListener = new TransactionListenerImpl();
        // Create a message producer
        TransactionMQProducer producer = new TransactionMQProducer("group6");
        producer.setNamesrvAddr("192.168.25.135:9876; 192.168.25.138:9876");
        // The producer is the listener
        producer.setTransactionListener(transactionListener);
        // Start the message producer
        producer.start();
        String[] tags = new String[]{"TagA"."TagB"."TagC"};
        for (int i = 0; i < 3; i++) {
            try {
                Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                TimeUnit.SECONDS.sleep(1);
            } catch(MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); }}//producer.shutdown();}}Copy the code

2) Implement the transaction listening interface

When sending the semi-message is successful, we use the executeLocalTransaction method to execute the local transaction. It returns one of the three transaction states mentioned in the previous section. The checkLocalTranscation method is used to check the status of a local transaction and to respond to a request to check the message queue. It also returns one of the three transaction states mentioned in the previous section.

public class TransactionListenerImpl implements TransactionListener {

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println("Perform local transactions");
        if (StringUtils.equals("TagA", msg.getTags())) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else if (StringUtils.equals("TagB", msg.getTags())) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else {
            returnLocalTransactionState.UNKNOW; }}@Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println("MQ check message Tag ["+msg.getTags()+Local transaction execution result of "]");
        returnLocalTransactionState.COMMIT_MESSAGE; }}Copy the code

3.6.3 Restrictions

  1. Delayed and batch messages are not supported for transactional messages.
  2. To prevent a single message from being checked too many times and causing a semi-queue of messages to accumulate, we limit the number of checks for a single message to 15 by default, but users can use the Broker profiletransactionCheckMaxParameter to modify this restriction. If a message has been checked more than N times (N =transactionCheckMax) the Broker dismisses the message and, by default, prints an error log. Users can overrideAbstractTransactionCheckListenerClass to modify the behavior.
  3. Transaction messages are checked after a specific length of time such as transactionMsgTimeout in the Broker profile parameter. Users can also set user properties when sending a transaction messageCHECK_IMMUNITY_TIME_IN_SECONDSTo change this limitation, this parameter takes precedence overtransactionMsgTimeoutParameters.
  4. Transactional messages may be examined or consumed more than once.
  5. The target topic message submitted to the user may fail, currently depending on the logging. Its high availability passesRocketMQIf you want to ensure that transactional messages are not lost and that transactional integrity is guaranteed, it is recommended to use a synchronous dual write mechanism.
  6. The producer ID of a transaction message cannot be shared with the producer ID of another type of message. Unlike other types of messages, transaction messages allow a reverse lookup, where the MQ server can query to the consumer by their producer ID.

3.7 Configure AK and Secret when connecting RocketMQ of Aliyun

If you are calling RocketMQ from Aliyun, you also need to specify AK and Secret. Ali Cloud Demo: poke here

3.7.1 producers

The AK and Secert operations of the Producer are the same. You only need to specify the AK and Secert when creating the Producer. In this example, sending a common message:

public class SyncAKProducer {
	private static RPCHook getAclRPCHook(a) {
        return new AclClientRPCHook(new SessionCredentials("Set your own ACCESS_KEY"."Set your own SECRET_KEY."));
    }

	public static void main(String[] args) throws Exception {
		/** * Create a Producer and enable message tracks * If you do not want to enable message tracks, create the following: * DefaultMQProducer = new DefaultMQProducer(M" set my GroupName (unique) ", getAclRPCHook()); * /
        DefaultMQProducer producer = new DefaultMQProducer(Set your own GroupName (unique), getAclRPCHook(), true.null);

		/** * Set the access mode to Aliyun. This parameter needs to be set when using the message trace function on the cloud. If the message trace function is not enabled, this parameter is not set. */
        producer.setAccessChannel(AccessChannel.CLOUD);
    	// Set the NameServer address
    	producer.setNamesrvAddr("localhost:9876");
    	// Set the number of retries when a message fails to be sent synchronously. The default is 2
        producer.setRetryTimesWhenSendFailed(2);
        // set the timeout period for sending messages, which is 3000ms by default
        producer.setSendMsgTimeout(3000);
    	// Start the Producer instance
        producer.start();
    	for (int i = 0; i < 100; i++) {
    	    // Create the message and specify the Topic, Tag, and message body
    	    Message msg = new Message("TopicTest" /* Topic */."TagA" /* Tag */,
        	("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
        	);
        	// Send a message to a Broker
            SendResult sendResult = producer.send(msg);
            // Returns whether the message arrived successfully via sendResult
            System.out.printf("%s%n", sendResult);
    	}
    	// If no more messages are sent, close the Producer instance.producer.shutdown(); }}Copy the code

3.7.2 consumers

AK and Secert consumers set operation are all the same, just need to specify when creating Consummer, here will receive regular news, for example:

private static RPCHook getAclRPCHook(a) {
    return new AclClientRPCHook(new SessionCredentials("Set your own ACCESS_KEY"."Set your own SECRET_KEY."));
}

public static void main(String[] args) throws Exception {
	/** * Create a Consumer and enable the message trace * If you do not want to enable the message trace, you can create it as follows:  * DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(MqConfig.GROUP_ID, getAclRPCHook(), null); * /
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1", getAclRPCHook(), new AllocateMessageQueueAveragely(), true.null);

    /** * Set the access mode to Aliyun. This parameter needs to be set when using the message trace function on the cloud. If the message trace function is not enabled, this parameter is not set. */
    consumer.setAccessChannel(AccessChannel.CLOUD);

    // Specify the Namesrv address.
    consumer.setNamesrvAddr("localhost:9876");
    / / subscribe to the Topic
    consumer.subscribe("Test"."*");
    // Load balancing mode consumption
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // Register the callback function to process the message
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List
       
         msgs, ConsumeConcurrentlyContext context)
        {
            System.out.printf("%s Receive New Messages: %s %n", 
                              Thread.currentThread().getName(), msgs);
            returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});// Start the messager
    consumer.start();
    System.out.printf("Consumer Started.%n");
}
Copy the code

4. Message Sending and Consumption Example (Spring Boot)

4.1 Importing Dependencies

   <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-spring-boot-starter</artifactId>
       <version>2.1.1</version>
   </dependency>
Copy the code

4.2 producers

4.2.1 Application. Yaml configuration file

# application.yaml
rocketmq:
  name-server: 10.124128.200.: 9876
  producer:
    group: test-group
    # Number of retries when sending a synchronization message fails. Default is 2
    retry-times-when-send-failed: 2
    # Number of retries when sending an asynchronous message fails. Default is 2
    retry-times-when-send-async-failed: 2
    The default value is 3s
    send-message-timeout: 3000

	When connecting RocketMQ to Aliyun, you need to configure AK and SK
    access-key: 
    secret-key: 
Copy the code

4.2.2 producers

@RestController
@RequestMapping("/test")
public class ProducerTest {

	// Automatic injection
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @PostMapping("/sendSyncMessage")
    public void sendSyncMessage(@RequestBody Map<String, Object> msgMap){
    	// Build the message
    	Message message = new Message("TopicName"."Tag", hash, JSON.toJSONBytes(msgData));
    	
    	// Send a synchronization message
    	/ / method 1: use the same as the third chapter, the method of calling getProducer () method will return when DefaultMQProducer object, then call the method as the third chapter.
        SendResult sendResult =  rocketMQTemplate.getProducer().send(message);

		// Method 2: message sending method encapsulated with rocketMQTemplate
		// The first argument specifies Topic and Tag in the format 'topicName:tags'
		// The second argument, the Message object
		sendResult = rocketMQTemplate.syncSend("TopicName:Tag", message); }}Copy the code

4.2 consumers

4.2.1 Application. Yaml configuration file

rocketmq:
  name-server: 10.124128.200.: 9876

  # The following configuration can only be configured when RocketMQ is used in Ali Cloud
  consumer:
    access-key: 
    secret-key: 
  access-channel: CLOUD
Copy the code

4.2.2 Consumer message listener

@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq", consumerGroup = "springboot-mq-consumer-1", selectorExpression = "*")
public class Consumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("The Receive message:" + message);
		
		// If the consumption fails, a RuntimeException is thrown and RocketMQ will automatically retry
		// You can throw a RuntimeException either manually or using Lombok's @sneakyThrows annotation
		throw new RuntimeException("Consumption failure"); }}Copy the code

Common configuration parameters for the @RocketmqMessagelistener annotation:

parameter type The default value instructions
consumerGroup String Consumer groups
topic String Topic
selectorType SelectorType SelectorType.TAG Select the message using either TAG or SQL92. The default is TAG
selectorExpression String “*” Controls which messages can be selected
consumeMode ConsumeMode ConsumeMode.CONCURRENTLY Consumption mode, concurrent or sequential receive, default concurrent mode
messageModel MessageModel MessageModel.CLUSTERING Consumption mode, broadcast mode or cluster mode, default cluster mode
consumeThreadMax int 64 Maximum number of consuming threads
consumeTimeout long 15L Consumption timeout (the maximum time (in minutes) a message can block a using thread)
nameServer String ${rocketmq.name-server:} NameServer address
accessKey String ${rocketmq.consumer.access-key:} AK
secretKey String ${rocketmq.consumer.secret-key:} SK
accessChannel String ${rocketmq.access-channel:}
# 5. Message storage

Because of the high reliability requirement of distributed queue, data should be stored persistently.

  1. The message generator sends the message
  2. MQ receives the message, persists the message, and adds a new record to the store
  3. Returns an ACK to the producer
  4. MQ push message to the corresponding consumer and wait for the consumer to return ACK
  5. If the message consumer returns ACK successfully within the specified time, MQ considers the message consumption to be successful and deletes the message from the store, that is, step 6 is executed. If MQ does not receive ACK within the specified time, it considers that message consumption fails and tries to push the message again, repeating steps 4, 5 and 6
  6. MQ Delete message

5.1 Storage Media

Several commonly used products in the industry, such as RocketMQ, Kafka, and RabbitMQ, use == message flushing == to the == file system == of the deployed VM or physical machine for persistence (generally, there are asynchronous and synchronous disk flushing modes).

Message flushing provides a high efficiency, high reliability and high performance data persistence method for message storage. Unless the MQ machine itself is deployed or the local disk is down, the failure to persist is usually not an issue.

5.2 Message storage structure

RocketMQ messages are stored by CommitLog and ConsumeQueue (CommitLog). The real physical storage file of messages is CommitLog. ConsumeQueue (CommitLog) is a logical queue of messages. Each Message Queue under each Topic has a corresponding ConsumeQueue file.

  • CommitLog: Stores metadata of messages
  • ConsumerQueue: Stores messages inCommitLogThe index of the
  • IndexFile: provides a way through for message querieskeyOr time interval to query the message byIndexFileTo find the message without affecting the main process of sending and consuming the message

5.3 order to write

RocketMQ’s messages are written sequentially, ensuring the speed of message storage.

If the disk is used properly, the speed of the disk can match the data transfer speed of the network. The sequential write speed of current high-performance disks can reach 600MB/s, which exceeds the transmission speed of ordinary network adapters. But the speed of random disk write is only about 100KB/s, which is 6000 times better than sequential write performance! Because of this huge speed difference, a good message queuing system can be orders of magnitude faster than a normal message queuing system.

5.4 Disk Flushing Mechanism

RocketMQ’s messages are stored on disk, both to ensure recovery after a power failure and to allow the number of messages stored to exceed memory limits. To improve performance, RocketMQ ensures sequential disk writes as much as possible. When messages are written to RocketMQ through Producer, there are two types of disk writing: distributed synchronous disk flushing and asynchronous disk flushing.

1) Flush disks synchronously

When the write success status is returned, the message has been written to disk. The specific process is that the message is written to the PAGECACHE of the memory, immediately notify the flush thread flush disk, and then wait for the completion of the flush disk, flush the completion of the thread to wake up the waiting thread, the message is written back to the state of success.

2) Asynchronous disk flushing

When the write success status is returned, the message may just be written to the PAGECACHE of the memory. The return of the write operation is fast and the throughput is large. When the amount of messages in the memory accumulates to a certain extent, the disk is written quickly.

3) configuration

Both synchronous and asynchronous flush are set using the flushDiskType parameter in the Broker profile, which is configured to be either SYNC_FLUSH or ASYNC_FLUSH.

5.5 zero copy

The Linux operating system is divided into user mode and kernel mode. File operations and network operations need to switch between the two modes, which inevitably leads to data replication.

A server sends the contents of a local disk file to a client in two steps:

  1. Read: reads the contents of local files.

  2. Write: Sends the read content over the network.

These two seemingly simple operations actually replicated data four times, as follows:

  1. Copy data from disk to kernel mode memory;
  2. Copy from kernel mode memory to user mode memory;
  3. Then copy from user-mode memory to network-driven kernel-mode memory;
  4. Finally, it is copied from the kernel mode memory of the network driver to the network adapter for transmission.

By using MMAP, you can save the memory replication to the user mode and improve the speed. This mechanism is implemented in Java through MappedByteBuffer

RocketMQ takes advantage of these features, known as the “== zero copy ==” technology, to increase the speed at which messages are saved to the disk and sent over the network.

Note that MappedByteBuffer has several limitations. One of them is that no more than 1.5 files can be mapped to user mode virtual memory at a time. This is why RocketMQ sets a single CommitLog data file to 1G by default

Vi. High availability mechanism

The RocketMQ distributed cluster achieves high availability through the combination of Master and Slave.

The difference between Master and Slave: In the Broker configuration file, the brokerId parameter has a value of 0 to indicate that the Broker is Master, greater than 0 to indicate that the Broker is Slave, and the brokerRole parameter also indicates whether the Broker is Master or Slave.

The Master Broker can read and write messages. The Slave Broker can only read messages. That is, the Producer can only connect to the Master Broker and write messages. A Consumer can connect to either a Master Broker or a Slave Broker to read messages.

6.1 Message consumption High availability

In the Consumer configuration file, there is no need to set whether to read from the Master or Slave. When the Master is unavailable or busy, the Consumer is automatically switched to the Slave. With the automatic Consumer switching mechanism, if a Master machine fails, the Consumer can still read messages from the Slave without affecting the Consumer program. This is high availability on the consumer side.

6.2 Sending messages Is highly available

When creating a Topic, create multiple Message queues for a Topic on multiple Broker groups (machines with the same Broker name and different brokerId to form one Broker group), so that when the Master of one Broker group is unavailable, The masters of other groups are still available, and the producers can still send messages. RocketMQ does not currently support automatic conversion of Slave to Master. If you need to convert a Slave to Master, manually stop the Slave Broker, change the configuration file, and start the Broker with the new configuration file.

6.3 Primary/secondary Replication

If a Broker group has a Master and Slave, messages need to be replicated from the Master to the Slave, both synchronously and asynchronously.

1) Synchronous replication

In synchronous replication mode, the write success status is reported to the client after both Master and Slave are written successfully.

In synchronous replication mode, if the Master fails, the Slave has all backup data, which is easy to recover. However, synchronous replication increases the data write delay and reduces the system throughput.

2) Asynchronous replication

In asynchronous replication, the write success status is reported to the client as long as the Master is successfully written.

In asynchronous replication, the system has low latency and high throughput, but if the Master fails, some data may be lost because it is not written to the Slave.

3) configuration

Synchronous and asynchronous replication is set using the brokerRole parameter in the Broker configuration file, which has the following values:

  • ASYNC_MASTER: replicates the primary node asynchronously
  • SYNC_MASTER: synchronously replicates the primary node
  • SLAVEFrom the node:

4) summary

In actual applications, set the flush mode and the primary/secondary replication mode, especially the SYNC_FLUSH (synchronous flush) mode, based on service scenarios. As disk write actions are frequently triggered, performance is significantly reduced.

In general, it is a good choice to configure ASYNC_FLUSH for Master and Slave, and SYNC_MASTER for replication between Master and Slave. In this way, even if one machine fails, data will still be kept.

7. Load balancing

7.1 Producer Load Balancing

On the Producer side, when each instance sends a message, == by default, polls all message queues to send ==, so that messages are evenly placed on different queues. Since queues can be scattered across different brokers, messages are sent to different brokers, as shown below:

The labels on the arrow lines in the figure represent the order, with publishers sending the first message to Queue 0, then the second message to Queue 1, and so on.

7.2 Consumer Load Balancing

1) Cluster mode

In cluster consumption, each consumer group that subscribes to a topic receives a message, and each message is consumed by only one instance of a consumer group. RocketMQ uses an active pull to pull and consume messages, specifying which message queue to pull.

Every time the number of instances changes, load balancing is triggered, and the queue is evenly distributed to each instance according to the number of queues and the number of instances.

The default allocation algorithm is AllocateMessageQueueAveragely, the diagram below:

There’s another average algorithm is AllocateMessageQueueAveragelyByCircle, evenly between each queue, is just in the form of circular queue in turn points, the following figure:

In cluster mode, only one instance of a queue can be allocated. This is because if multiple instances of a queue consume messages at the same time, the same message will be consumed multiple times by different instances. So the algorithm is that a queue is assigned to only one consumer instance, and a consumer instance can be assigned to different queues at the same time.

By adding consumer instances to the queue, the consumption power of the queue can be expanded horizontally. When an instance goes offline, load balancing is triggered again, and the queue that was allocated to the queue will be allocated to another instance for consumption.

However, if the number of consumer instances is greater than the total number of message queues, the additional consumer instances will not be assigned to the queue and will not consume messages, thus not distributing the load. == So you need to control that the total number of queues is greater than or equal to the number of consumers. = =

2) Broadcast mode

Since broadcast mode requires that a message be delivered to all consumer instances under a consumer group, there is no such thing as a message being distributed.

One of the differences in implementation is that when consumers are assigned queues, all consumers are assigned to all queues.

Message retry

8.1 Sequential message retry

For sequential messages, when the consumer fails to consume the message, the message queue RocketMQ automatically retries the message repeatedly (at an interval of 1 second), at which point the application will be blocked from consuming the message. Therefore, when using sequential messages, it is important to ensure that the application can monitor and handle consumption failures in a timely manner to avoid blocking.

8.2 Unordered Message retry

For unordered messages (normal, scheduled, delayed, transactional), you can achieve message retry results by setting the return status when the consumer fails to consume the message.

The retry of unordered messages takes effect only for cluster consumption. The broadcast mode does not provide the failure retry feature. That is, after a failure is consumed, the failed message is not retried and new messages are consumed.

1) Retry times

Message queue RocketMQ allows a maximum of 16 retries per message by default, with the following interval for each retry:

The number of retries The interval since the last retry The number of retries The interval since the last retry
1 10 seconds 9 7 minutes
2 30 seconds 10 Eight minutes
3 1 minute 11 9 minutes
4 2 minutes 12 Ten minutes
5 3 minutes 13 Twenty minutes
6 4 minutes 14 30 minutes
7 5 minutes 15 1 hour
8 6 minutes 16 2 hours

If the message fails after 16 retries, the message will not be delivered. If a message fails to be consumed, 16 retries will be performed within the next 4 hours and 46 minutes. The message will not be delivered again after the retry period.

Note: No matter how many times a Message is retried, the Message ID of those retried messages does not change.

2) Configuration mode

If the consumption fails, configure the mode again

In cluster consumption mode, if message consumption fails, message retries are expected. You need to explicitly configure the message listener interface in any of the following ways:

  • returnAction.ReconsumeLater(recommended)
  • Returns Null
  • An exception is thrown
public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        // Process the message
        doConsumeMessage(message);
        // Method 1: return action.reconsumelater and the message will be retried
        return Action.ReconsumeLater;
        // Mode 2: returns NULL, and the message will be retried
        return null;
        // Mode 3: throw the exception directly, and the message will be retried
        throw new RuntimeException("Consumer Message exceotion"); }}Copy the code

If the consumption fails, the configuration mode is not retried

In cluster consumption mode, if the message fails, the message is not expected to be retried. You need to catch the exception that may be thrown in the consumption logic and finally return Action.CommitMessage. After that, the message will not be retried.

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            doConsumeMessage(message);
        } catch (Throwable e) {
            // Catch all exceptions in the consumption logic and return Action.CommitMessage;
            return Action.CommitMessage;
        }
        // Return Action.CommitMessage;
        returnAction.CommitMessage; }}Copy the code

User-defined maximum number of message retries

Message queue RocketMQ allows a Consumer to set a maximum number of retries when started. The retry interval will be as follows:

  • If the maximum number of retries is less than or equal to 16, the retry interval is the same as the preceding table.
  • The maximum number of retries is greater than 16, and the retry interval is 2 hours.
Properties properties = new Properties();
// Set the maximum number of message retries for the corresponding Group ID to 20
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
Consumer consumer =ONSFactory.createConsumer(properties);
Copy the code

Note:

  • The maximum number of message retries is set for all Consumer instances with the same Group ID.
  • If MaxReconsumeTimes is set for only one of two Consumer instances with the same Group ID, the configuration takes effect for both Consumer instances.
  • The configuration takes effect in overwrite mode, that is, the last started Consumer instance overwrites the configuration of the previous started instance

Gets the number of message retries

After receiving the message, the consumer can obtain the retry times of the message as follows:

public class MessageListenerImpl implements MessageListener {
    @Override
    public Action consume(Message message, ConsumeContext context) {
        // Get the number of retries for the message
        System.out.println(message.getReconsumeTimes());
        returnAction.CommitMessage; }}Copy the code

3) Retry for multiple consumer groups

Suppose there are consumer group A and consumer group B. When A and B are listening to the same topic, both A and B get the same message, but A fails to return action.reconsumelater, while B succeeds in consuming. When retrying, rocketMQ will only send the message to consumer group B, not to consumer group A.

Dead letter queues

When an initial consumption of a message fails, the message queue RocketMQ automatically retries the message. If the consumption fails after the maximum number of retries is reached, it indicates that the consumer was not able to consume the message correctly under normal circumstances. In this case, the message queue RocketMQ does not immediately discard the message, but instead sends it to a special queue for the consumer.

In Message queuing RocketMQ, such messages that can’t normally be consumed are called dead-letter messages, and the special queues that store dead-letter messages are called dead-letter queues.

9.1 Dead letter Feature

Dead-letter messages have the following features:

  • It’s not going to be consumed by consumers.
  • The validity period is the same as the normal message, which is 3 days. After 3 days, the message will be automatically deleted. Therefore, please process the dead letter message within 3 days after it is generated.

Dead letter queues have the following features:

  • A dead letter queue corresponds to a Group ID, not to a single consumer instance.
  • If a Group ID does not generate a dead letter message, message queue RocketMQ will not create a corresponding dead letter queue for it.
  • A dead letter queue contains all the dead letter messages generated for the corresponding Group ID, regardless of the Topic to which the message belongs.

9.2 Viewing Dead Letter Information

  1. Query the console for information about the topic where the dead letter queue appears

  2. Query dead letter messages by subject on the message page

  3. Choose to resend the message

    When a message goes to a dead letter queue, it means that some factor prevents the consumer from consuming the message properly, so you usually need to do special processing on it. Once the suspect is identified and the problem resolved, the message can be resend on the RocketMQ console of the message queue, allowing the consumer to re-consume.

Consumption is idempotent

Message queue After the RocketMQ consumer receives the message, it is necessary to idempotent the message according to the unique Key on the business.

10.1 Necessity of idempotent consumption

In Internet applications, especially when the network is unstable, the message queue RocketMQ may be repeated, which can be summarized as follows:

  • The message was repeated when sent

    When a message has been successfully sent to the server and persistence has been completed, the server fails to respond to the client due to intermittent network disconnection or client breakdown. If at this point the producer realizes that the Message failed and tries to send the Message again, the consumer will then receive two messages with the same content and the same Message ID.

  • Messages duplicate when delivered

    In the message consumption scenario, the message has been delivered to the consumer and the service processing is complete. When the client sends a reply to the server, the network is intermittently disconnected. To ensure that the Message is consumed at least once, the RocketMQ server in the Message queue will try to deliver the previously processed Message again after the network is restored, and the consumer will then receive two messages with the same content and the same Message ID.

  • Message duplication during load balancing (including but not limited to network jitter, Broker restart, and subscriber application restart)

    Rebalance is triggered when a RocketMQ Broker or client in the message queue restarts, expands or shrinks, and consumers may receive repeated messages.

10.2 Processing Methods

Because Message ids have the potential to conflict (duplicate), it is not recommended that Message ids be used as the basis for truly secure idempotent processing. The best approach is to use the business unique identity as the Key basis for idempotent processing, which can be set with the message Key:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
Copy the code

When a subscriber receives a message, it can idempotent according to the Key of the message:

consumer.subscribe("ons_test"."*".new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // Idempotent processing is performed according to the key uniquely identified by the service}});Copy the code

11. Precautions for RocketMQ

  1. With the same consumer group,The consumer logic should be the same(listeningtopic.tagAll the same)
  2. By default, messages are shared between different consumer groups (all consumer groups get the same message), and consumers within a consumer group are load balanced (only one consumer gets the message).