Chapter 3 How RocketMQ works

7. Consumption is idempotent

1 What is consumption idempotent

When consumers repeatedly consume a certain message, the result of repeated consumption is the same as the result of one-time consumption, and the repeated consumption does not have any negative impact on the business system, then the consumption process is idempotent consumption.

Idempotent: An operation is idempotent if its impact on the system is the same as that of its execution once.

In Internet applications, messages are likely to be repeatedly sent or consumed, especially when the network is unstable. If duplicate messages might interfere with business processing, idempotent processing should be done for the messages.

2 Analysis of message repetition scenarios

When might messages be re-consumed? The most common cases are the following three:

Duplicate messages were sent

After a message has been successfully sent to the Broker and persisted, a network flash occurs, causing the Broker to fail to respond to Producer. If the Producer realizes that the Message failed and tries to send the Message again, the Broker may have two messages with the same content and the same Message ID, and the subsequent Consumer must consume the Message twice.

Message repetition during consumption

The message was delivered to the Consumer and completed business processing. When the Consumer responded to the Broker, the network was disconnected and the Broker did not receive a successful Consumer response. To ensure that a message is consumed at least once, the Broker will try again to deliver previously processed messages after the network has recovered. The consumer will receive the same Message with the same Message ID as the previous one.

Duplicate message while Rebalance

When the number of consumers in a Consumer Group changes, or the number of queues for the topics it subscribes to changes, this triggers the Rebalance and the Consumer may receive a message that it has been consumed.

3 Generic solutions

The two elements

Two elements are involved in the design of idempotent solutions: idempotent tokens and uniqueness processing. By taking full advantage of these two elements, good idempotent solutions can be designed.

  • Idempotent token: an established protocol between producers and consumers, usually a string with a unique business identity. For example, order number, serial number. Usually the Producer sends the message along with it.
  • Uniqueness processing: The server adopts certain algorithm policies to ensure that the same service logic is not executed successfully for multiple times. For example, multiple payments for the same order will only succeed once.

The solution

For common systems, a generic solution to idempotent operations is:

  1. The first step is to deduplicate the cache. If an idempotent token already exists in the cache, it indicates that the operation is repetitive. If the cache does not match, go to the next step.

  2. Before uniqueness processing, the database is queried for the existence of data indexed by idempotent tokens. If yes, the operation is repetitive. If no, go to the next step.

  3. Three operations are done in the same transaction: after uniqueness processing, the idempotent token is written to the cache and the data with the idempotent token as the unique index is written to the DB.

    I already checked for repetition in step 1, why do I need to check again in Step 2? If step 2 can be entered, it indicates that the operation is not repeated. Is it repeated the second time?

    Of course not. Generally, data in the cache has an expiration date. Once the expiration date of the data in the cache expires, cache penetration occurs, allowing the request to reach the DBMS directly.

Solution Examples

Take the payment scenario:

  1. When the payment request arrives, the cache value whose key is the payment serial number is first obtained from the Redis cache. If the value is not empty, it indicates that the payment is repeated, and the business system directly returns the repeated payment mark on the calling side. If value is empty, go to the next step
  2. To DBMS according to the payment serial number query whether there is a corresponding instance. If yes, it indicates that the payment is repeated, and the business system directly returns the repeated payment mark on the calling side. If no, this is the first operation. Go to the next step to complete the uniqueness processing
  3. Perform three operations in a distributed transaction:
  • Complete payment task
  • The currentPayment serial numberAs the key, any string as the value, through the set(key, value, expireTime) to write data to the Redis cache
  • The currentPayment serial numberAs the primary key, with other related data written to the DBMS

Realization of idempotent consumption

The solution to consuming idempotent is simple: specify unique identifiers for messages that do not repeat. Because Message ids can be duplicated, Message IDS are not recommended for truly safe idempotent processing. The best approach is to use a business unique identifier, which can be set through message keys, as a Key basis for idempotent processing.

In the case of a payment scenario, the Key of the message can be set to the order number as a basis for idempotent processing. Specific code examples are as follows:

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
Copy the code

When a consumer receives a message, it can realize consumption idempotent according to the Key of the message, namely the order number:

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List
       
        msgs, ConsumeConcurrentlyContext context)
        {
        for(MessageExt msg:msgs){
            String key = msg.getKeys();
            // Perform idempotent processing based on the business unique identifier Key
            / /...
        }
    	returnConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});Copy the code

RocketMQ ensures that messages are not lost, but not repeated.

Message accumulation and consumption delay

1 the concept

In the message processing process, if the Consumer can’t keep up with the Producer’s sending speed, the NUMBER of unprocessed messages in MQ will increase. This part of the message processing process is called heap messages. Messages pile up, which in turn causes messages to be consumed late. The following scenarios focus on message accumulation and consumption delays:

  • Continuous accumulation caused by mismatch between upstream and downstream capabilities of service systems and cannot be recovered.

  • Business systems have a high demand for real-time consumption of messages, even if the consumption delay caused by temporary accumulation is unacceptable.

2. Cause analysis

When a Consumer consumes a message using the long polling Pull mode, it has two phases:

Message pull

The Consumer obtains messages from the server in the long polling Pull mode and caches the pulled messages to the local buffer queue. For pull consumption, there is high throughput in an Intranet environment, so this phase is not generally a bottle neck for message accumulation.

A single-thread, single-partition, low-spec host (Consumer, 4C8G) can achieve tens of thousands of TPS. With multiple partitions and threads, you can easily reach several hundred thousand TPS.

News consumption

The Consumer submits the locally cached message to the consuming thread, processes the message using business consuming logic, and gets a result. This is the real message consumption process. At this point, the Consumer’s spending power is entirely dependent on the message consumption time and consumption concurrency. If it takes a long time to process a single message due to complex business processing logic, the overall message throughput will not be high, and the Consumer local buffer queue will reach the upper limit and stop pulling messages from the server.

conclusion

The main bottleneck of message accumulation lies in the client’s consumption power, which is determined by consumption time and concurrency. Note that consumption time takes precedence over consumption concurrency. That is, on the premise of ensuring the rationality of consumption time, the problem of consumption concurrency is considered.

3 Consumption time

The main factor that affects the message processing time is the code logic. There are two main types of code logic that can affect processing time: internal CPU computation code and external I/O operation code.

Often, without complex recursions and loops in your code, the internal computation time is almost negligible compared to the external I/O operations. So external IO code is the main problem affecting message processing time.

External IO operation code examples:

  • Read and write external databases, such as access to remote MySQL
  • Read and write external cache systems, such as access to remote Redis
  • Downstream system calls, such as Dubbo’s RPC remote calls, Spring Cloud’s Http interface calls to downstream systems

It is necessary to sort out the downstream system call logic in advance and master the expected time of each call operation, so as to judge whether the time of IO operation in the consumption logic is reasonable. Typically, messages pile up because downstream systems have a service exception or have reached DBMS capacity limits, resulting in increased consumption time.

Service exceptions are not just code errors like 500 in the system, but more insidious problems. For example, network bandwidth.

Reaching the DBMS capacity limit also leads to an increase in message consumption time.

4 Consumption concurrency

Generally, the concurrency of consumption on the consumer side is jointly determined by the number of threads per node and the number of nodes, and its value is the number of threads per node x the number of nodes. However, the number of threads per node usually needs to be adjusted first, and if the hardware resources of a single node reach the upper limit, horizontal scaling is required to increase consumption concurrency.

Number of threads per node, which is the number of threads contained in a single Consumer

The number of nodes is the number of consumers contained in the Consumer Group

For ordinary messages, delayed messages, and transaction messages, the calculation of concurrency is the number of threads per node * the number of nodes. But it is different for sequential messages. The consumption concurrency of sequential messages is equal to the number of Queue partitions for the Topic.

1) Global sequential messages: The Topic of this type of message has only one Queue partition. This ensures that all messages for that Topic are consumed sequentially. To ensure this global ordering, only one thread of a Consumer can consume at any one time in a Consumer Group. So the concurrency is 1.

2) Partition order messages: Topics of this type of message have multiple Queue partitions. It only guarantees that messages in each Queue partition of the Topic are consumed sequentially, not sequentially across the entire Topic. To ensure this ordering, messages in each Queue partition can only be consumed by one thread of one Consumer at a time in the Consumer Group. That is, at most, there will be multiple Queue tillers and parallel consumption of multiple threads with multiple consumers at the same time. So the concurrency is the number of partitions for Topic.

5 Number of single-machine threads

Caution should be taken when setting the number of threads in the thread pool of a host. Do not blindly increase the number of threads. Setting too many threads will cause a lot of overhead of thread switching. The optimal thread count calculation model of single node in ideal environment is C * (T1 + T2)/T1.

  • C: number of CPU cores

  • T1: indicates the internal CPU logical computing time

  • T2: time spent on external I/O operations

The optimal number of threads = C * (T1 + T2) /T1 = C * T1/T1 + C * T2/T1 = C + C * T2/T1

Note that the calculated values are theoretical data under ideal conditions and are not recommended for direct use in production environments. Instead, set a value smaller than this value based on the current environment and observe the effect of pressure measurement, and then gradually increase the number of threads based on the effect until you find the value of optimal performance in that environment.

6 How to Avoid

In order to avoid unexpected message accumulation and consumption delay, the whole business logic needs to be thoroughly investigated and sorted out in the early design stage. The most important of these are sorting out the consuming time of messages and setting the concurrency of message consuming.

It takes time to comb through the consumption of messages

The consumption time of the message is obtained by pressure measurement and the code logic of the operation with high time consumption is analyzed. The consumption time of message combing should pay attention to the following information:

  • Whether the message consumption logic is too computationally complex, whether the code has defects such as infinite loops and recursion.

  • Whether the I/O operations in the message consumption logic are required and can be circumvented by local caching, etc.

  • Whether complex and time-consuming operations in the consumption logic can be asynchronized. If so, whether it will cause logical confusion.

Set the consumption concurrency

The concurrency of message consumption can be calculated in the following two steps:

  • Gradually increase the number of threads in a single Consumer node, and observe the system indicators of the node to obtain the optimal number of consuming threads and message throughput of a single node.

  • Calculate the number of nodes to be set based on the traffic peaks of upstream and downstream links

    Number of nodes = Peak traffic/Message throughput of a single node

Nine, cleaning up the message

Are messages erased after they are consumed? It won’t.

Messages are stored in commitlog files in sequence, and the size of messages is variable. Therefore, messages are deleted not by message, but by commitlog file. Otherwise, cleaning efficiency will be sharply reduced and logic complexity will be realized.

Commitlog files have an expiration time, which defaults to 72 hours, or three days. In addition to manual cleaning by the user, it is automatically cleaned in the following cases, regardless of whether messages in the file have been consumed:

  • The system automatically deletes expired files when the file clearing time (4 am by default) is reached

  • When a file expires and the disk space usage reaches the threshold (75% by default), the system automatically deletes expired files regardless of whether the time limit is reached

  • When the disk usage reaches the clearing threshold (85% by default), the system starts to clear files according to the preset rules, regardless of whether the files have expired. By default, the cleanup starts with the oldest file

  • When the disk usage reaches the system warning threshold (90% by default), the Broker will reject writing messages

    Note the following points:

    1) Deleting a 1GB file is a stressful IO operation for a RocketMQ system. During the deletion process, system performance deteriorates dramatically. Therefore, the default cleanup time is 4am, the time of least traffic. If we want to ensure that disk space is free, we don’t want the system to delete commitlog files at other points in time.

    2) Ext4 is officially recommended for Linux file systems serviced by RocketMQ. Because ext4 performs better than ext3 when it comes to deleting files