Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, welcome to add my wechat “JAVA_front” to communicate and learn together

0 Article Overview

Message backlogs are a problem we must face when using message-oriented middleware, whether the problem is caused by producing messages too quickly or by insufficient consumer spending power. In this article, we use RocketMQ as an example to analyze a common approach to message backlogs.


1 don’t deal with

Must message backlogs be dealt with? I think that without affecting the business, the message backlog can be left unprocessed and wait for the message backlog to be gradually digested, because the message backlog is essentially consumer protection. Let’s review the three roles of message-oriented middleware: decoupling, async, and peak clipping.


Decoupling 1.1

Suppose the user purchases in an e-commerce system, how should the system inform the logistics system after the payment is successful?

The first way is that the payment system directly calls the logistics system, but this will have a problem: the payment system and the logistics system have a strong dependence, when the logistics system problems, directly affect the user transaction process, resulting in payment failure.

In the second way, the payment system pushes the payment success message to the message-oriented middleware, at which point the transaction process ends. The logistics system subscribes to this message for subsequent processing. Even if the logistics system has problems, the transaction system will not be affected.


1.2 the asynchronous

Suppose the logistics system takes 100 milliseconds to process business. If the payment system calls the logistics system directly, the overall link response time increases by 100 milliseconds.

If the payment system pushes the payment success message to the messaging middleware, the payment system can directly return the payment, and the entire link time does not need to increase the 100 milliseconds, which is the performance improvement brought by asynchronization.


1.3 peak clipping

It is assumed that a large number of payment documents will be generated in the event of the “double Eleven” merchants’ second kill activity. If the payment system directly calls the logistics system, the pressure of the payment system will be transferred to the logistics system, which is completely unnecessary.

If the payment system pushes the payment success message to the message middleware, the logistics system can uniformly pull data processing according to the system capacity and reduce the flow peak.

Message accumulation in the middleware is essentially a protection for the logistics system. The flow pressure is released to the logistics system at a uniform speed, so we do not need to deal with message backlog in this case.


2 to deal with

If the business requires real-time consumer processing, all messages must be processed in a certain amount of time, because this scenario message backlog is affecting the business, then we must take action. First let’s take a look at the RocketMQ network deployment diagram:



A Producer is a message Producer, a Consumer is a message Consumer, and a Broker is a message Broker that stores and forwards messages. NameServer acts as a registry to maintain the state of producer, Broker, and consumer cluster services. The message backlog problem can be considered from three dimensions: producer, Broker and consumer.


2.1 producers

Producers can reduce message backlogs by reducing the number of messages sent. The number of messages sent can be considered from two dimensions: first, the number of messages sent can be reduced. For messages that are obviously not needed in the downstream, messages can not be sent, or for some frequently changing business messages, messages can be sent after the stability of business messages.

The second is to reduce the size of the message content. For example, if the consumer only needs 5 fields, there is no need for the producer to send all 10 fields, especially for some large context fields.


2.2 the Broker

Messages that consumers do not care about can be filtered at the Broker side to reduce the number of messages sent to consumers and improve throughput. RocketMQ is used as an example to analyze the three filtering methods: Tag, SQL expression, and Filter Server.


(1) Tag

A producer sets a Tag for a message, and a consumer sets the Tag it cares about when subscribed. this way the Broker can filter in the ConsumeQueue and only read messages that match the Tag from CommitLog:

public class TagFilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.start();
        String[] tags = new String[] { "TagA"."TagB"."TagC" };
        for (int i = 0; i < 10; i++) {
            // Set a Tag for each message
            Message msg = new Message("MyTopic", tags[i % tags.length], "Hello".getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.println("sendResult=", + sendResult); } producer.shutdown(); }}public class TagFilterConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        // Subscribe only to message Tag equal to TagA or TagC
        consumer.subscribe("MyTopic"."TagA || TagC");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("ThreadName=" + Thread.currentThread().getName() + ",messages=" + msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); }}Copy the code


(2) SQL expressions

Tags can only support simple filtering logic. If complex filtering is required, SQL expressions are required. SQL expressions are described as follows on the official website:

Support syntax Numeric comparison, like >, >=, <, <=, BETWEEN, = Character comparison, like =, <>, IN IS NULL or IS NOT NULL Logical AND, or, NOT support type Numeric, like 123, 3.1415 Character, like 'ABC ', must be made with single quotes NULL, special constant Boolean, TRUE or FALSECopy the code

The producer uses putUserProperty to customize the attribute, and the consumer uses an expression to filter:

public class SqlFilterProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.start();
        String[] tags = new String[] { "TagA"."TagB"."TagC" };
        for (int i = 0; i < 10; i++) {
            // Set a Tag for each message
            Message msg = new Message("MyTopic", tags[i % tags.length], ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // Set custom properties for each message
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.println("sendResult="+ sendResult); } producer.shutdown(); }}public class SqlFilterConsumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        // SQL expression
        consumer.subscribe("MyTopic", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + "and (a is not null and a between 0 and 3)"));
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("ThreadName=" + Thread.currentThread().getName() + ",messages=" + msgs);
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); }}Copy the code


(3) Filter Server

Filter Server supports user-defined Java functions that the Broker side performs to Filter messages. We need to be careful not to write functions that consume a lot of memory or create thread operations that might cause Broker downtime:

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
        producer.start();
        String[] tags = new String[] { "TagA"."TagB"."TagC" };
        for (int i = 0; i < 10; i++) {
            Message msg = new Message("MyTopic", tags[i % tags.length], ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            msg.putUserProperty("a", String.valueOf(i));
            SendResult sendResult = producer.send(msg);
            System.out.println("sendResult="+ sendResult); } producer.shutdown(); }}public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        String codeScript = MixAll.file2String("/home/admin/filters/MyMessageFilterImpl.java");
        consumer.subscribe("MyTopic"."com.java.front.rocketmq.test.filter.MyMessageFilterImpl", codeScript);
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for(MessageExt msg : msgs) {
                    System.out.println("ThreadName=" + Thread.currentThread().getName() + ",message=" + msg);
                }
                returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }}); consumer.start(); }}public class MyMessageFilterImpl implements MessageFilter {

    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("a");
        if(StringUtils.isEmpty(property)) {
            return false;
        }
        if(Integer.parseInt(property) >= 3) {
            return true;
        }
        return false; }}Copy the code


2.3 consumers

Consumers need to think about how to speed up their consumption so that they can consume the backlog of messages as quickly as possible. Note that if a consumer has downstream dependencies, such as writing to a database after subscribing to a message or invoking a downstream application, the ability to consume downstream dependencies must also be considered when increasing consumption speed.


(1) Optimize consumption logic

If the message consumption logic has problems such as slow SQL and slow service, the consumption speed will be reduced, resulting in message backlog. We can use an open source diagnostic tool such as Arthas to analyze the response time of each method across the consumption link and optimize the slow method if found.


(2) Increase the consumption thread

Increasing the number of threads to consume can also increase consumption speed. RocketMQ provides two methods to set the number of threads:

SetConsumeThreadMin Minimum number of threads to consume setConsumeThreadMax Maximum number of threads to consumeCopy the code


(3) Increase the consumption step

RocketMQ provides two methods to set the consumption step if you can get several more messages per consumption to increase consumption speed:

SetPullBatchSize single get maximum number of messages from MessageQueue setConsumeMessageBatchMaxSize single actuator maximum number of messages to consumers (parameter List < MessageExt > MSGS maximum length)Copy the code


(4) Increase consumption nodes

Due to the limited processing capacity of a single machine, when consumption threads and consumption steps have increased to the bottleneck, we can consider expanding the consumption nodes in the cluster. There are two caveats to this operation: first, do not consume more nodes than the number of message partitions, and second, ordered consumption causes low concurrency.


(5) Orderly consumption to disorderly consumption

RocketMQ divides into three types of messages by order dimension, with regular messages having the best concurrency but not guaranteed order. Globally ordered messages have the best order, but the worst concurrency. Partitioned ordered messages can ensure local order of the same service ID and a certain degree of concurrency, but the degree of concurrency is limited by the number of queues.

(a) General messages

Ordinary messages are also known as concurrent messages. During production, a message may be written to any queue. Consumers can start multiple threads to consume in parallel. While there is no guarantee of order on the message side, we can achieve business order on the business side using state machines

(b) Partitioned ordered messages

Message ordering requires the cooperation of producers and consumers. Producers need to send messages with the same business ID to the same messageQueue, but a messageQueue cannot be processed concurrently during consumption, which affects the consumption concurrency to a certain extent

public class Producer {
    public static void main(String[] args) throws Exception {
        MQProducer producer = new DefaultMQProducer("producerGroup");
        producer.start();
        for (int i = 0; i < 10; i++) {
            int orderId = i;
            Message msg = new Message("MyTopic", ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size(); // Calculate the queue subscript based on the service number
                    returnmqs.get(index); } }, orderId); } producer.shutdown(); }}public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("MyTopic"."*")
        consumer.registerMessageListener(new MessageListenerOrderly() { // Get organized
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.println("ThreadName=" + Thread.currentThread().getName() + ",messages=" + msgs);
                returnConsumeOrderlyStatus.SUCCESS; }}); consumer.start(); }}Copy the code

(c) Globally ordered messages

Global ordering is a special case of partitioned ordering. If a topic has only one message queue, global ordering can be achieved with the worst concurrency


3 Article Summary

In this paper, we analyze the message backlog problem processing ideas, processing schemes are divided into two categories: do not deal with and to deal with. Does not handle is to point to in the case of not affect business news system peak clipping feature can be used to protect consumers, if you want to deal with our from the producers, the Broker, consumers three dimensions are analyzed, and the producers to reduce the production volume, the Broker for message filtering, increased consumer consumption rate, hope to be of help in this paper.

Welcome to pay attention to the public account “JAVA Front” to view more wonderful sharing articles, mainly including source code analysis, practical application, architecture thinking, workplace sharing, product thinking and so on, welcome to add my wechat “JAVA_front” to communicate and learn together