A problem with RocketMQ sequential consumption delay was identified

Problem background and phenomenon

Received application alert last night, found a business consumption message online delayed more than 54s (from the time the message was sent to MQ to be consumed) :

2021-06-30T23:12:46.756 message processing is incredibly delayed! (Current delay time: 54725, incredible delay count in 10 seconds: 5677) 

Looking at the RocketMQ monitoring shows that a relatively large backlog of messages does occur:

View Topic consumers from rocketMQ-Console:

For this Topic, the business requirements are ordered. Therefore, when sending, the business Key is specified, and when consuming, the sequential consumption mode is used.

We used a RocketMQ cluster with three brokers, each with eight ReadQueues and WriteQueue for this Topic. Here’s a quick mention of the meanings of ReadQueue and WriteQueue:

In RocketMQ, the number of WriteQueue is used to return routing information when the message is sent, and the number of ReadQueue is used to return routing information when the message is consumed. At the physical file level, only WriteQueue creates files. Here’s an example: Set WriteQueueNum = 8 and ReadQueueNum = 4 to create 8 folders, representing the 8 queues 0 1 2 3 4 5 6 7. However, when the message is consumed, the route information only returns 4. So 0, 1, 2, 3 is consumed, 4, 5, 6, 7 is not consumed at all. On the other hand, if WriteQueueNum = 4 and ReadQueueNum = 8, the message will be produced from 0, 1, 2, 3, and consumed from 0, 1, 2, 3, 4, 5, 6, 7. Of course, there are no messages in 4, 5, 6, 7. Suppose the consumption is Group consumption, and there are two consumers in the Group. In fact, only the first consumer actually consumes the message (0, 1, 2, 3), and the second consumer does not consume the message at all (4, 5, 6, 7). Generally, we will set these two values to be the same, and only set them to be different if we need to reduce the number of queues for a topic.

Problem analysis

The first thing to think about is, is the consumer thread stuck? Threads generally get stuck because:

  1. Stop-the-wolrd happened:

    1. The GC to
    2. Other SafePoint reasons (e.g. Jstack, timing safePoint, etc., see my article on JVMS – Full solutions to SafePoint and Stop The World)
  2. It takes too long for the thread to process the message. The lock may not be acquired and may be stuck in some IO

Collect the JFR at that time (for JFR, please refer to my other series of FULL JFR solutions) and find:

  1. No long stop-the-world GC and other SafePoint events occur during this period:

  1. During this time, the thread is park and the stack shows that the consuming thread has no messages to consume:

Since there’s nothing wrong with the application, let’s look at RocketMQ. For RocketMQ Broker logs in general, we are concerned with:

  1. If an exception occurs here, we need to tune the parameters related to Java MMAP. Please refer to:
  2. If message persistence fails, check storeerr.log
  3. If the lock is abnormal, check lock.log

Which broker should you go to? As mentioned earlier, the Topic is sent with a hashKey, and we can use the hashKey of the message to locate the broker:

Int hashCode = "our hashKey".hashcode (); log.info("{}", Math.abs(hashCode % 24));

We find the hashKey for the message, and the result is 20, which is queue 20. From the previous description, we know that each broker has eight queues, and 20 corresponds to the queue at broker-2. Broker -2 queueId = 5 Let’s look at the log at broker-2 to locate the problem.

We found an exception in lock.log, as shown below. There are many similar exceptions, and they lasted about 54s, which is consistent with the thread park time, and also consistent with the message delay:

2021-07-01 07:11:47 WARN AdminBrokerThread_10 - tryLockBatch, message queue locked by other client. Group: Consumer group OtherClientId: 10.238.18.6@29 NewClientId: 10.238.18.122@29 MessageQueue [topic= Message topic, brokerName=broker-2, queueId=5]

The instance 10.238.18.122@29 failed to lock queueId = 5 because 10.238.18.6@29 is holding the lock. So why does this happen?

The principle of sequential consumption for RocketMQ multiple queues

To achieve sequential consumption of multiple queues, RocketMQ first needs to specify a hashKey, through which messages will be put into a specific queue. When a consumer consumes the queue, if the ordered consumption is specified, it will be consumed in a single thread, thus ensuring order within the same queue.

So how do you ensure that each queue is consumed single-threaded? Each Broker maintains one:

private final ConcurrentMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);

It is a ConcurrentMap< consumer group name, ConcurrentHashMap< Message queue, Lock object >>. The lock object LockEntry includes:

RebalanceLockManager.java:

/ / read rocketmq. Broker. Rebalance. LockMaxLiveTime this environment variable, Default 60s private final static long REBALANCE_LOCK_MAX_LIVE_TIME = long.parselong (system.getProperty ( "rocketmq.broker.rebalance.lockMaxLiveTime", "60000")); Static class LockEntry {//RocketMQ clientId private String clientId; private volatile long lastUpdateTimestamp = System.currentTimeMillis(); Public Boolean isLocked(final String clientId) {Boolean eq = this.clientid.equals (clientId); return eq && ! this.isExpired(); } public Boolean isExpired() {// if REBALANCE_LOCK_MAX_LIVE_TIME expires after this long, Boolean expired = (system.currentTimemillis ()) - this.lastUpdateTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; return expired; }}

The RocketMQ client sends a LOCK_BATCH_MQ request to the Broker. The Broker encapsulates the request as a LockEntry and attempts to update the Map. If the update succeeds, the lock is acquired; if it fails, the lock is not acquired. The update logic of the Broker is as follows:

Public Boolean tryLock(final String group, final MessageQueue mq, final String clientId) {if (! Enclosing isLocked (group, mq, clientId)) {try {/ / acquiring a lock, the lock is the instance, because each broker maintains its own queue lock table, do not share this. Lock. LockInterruptibly (); ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqlockTable.get (group); ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqlockTable. if (null == groupValue) { groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } LockEntry lockEntry = groupValue.get(mq); if (null == lockEntry) { lockEntry = new LockEntry(); lockEntry.setClientId(clientId); groupValue.put(mq, lockEntry); log.info("tryLock, message queue not locked, I got it. Group: {} NewClientId: {} {}", group, clientId, mq); } if (lockEntry.isLocked(clientId)) { lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); return true; } String oldClientId = lockEntry.getClientId(); if (lockEntry.isExpired()) { lockEntry.setClientId(clientId); lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); log.warn( "tryLock, message queue lock expired, I got it. Group: {} OldClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq); return true; Log.warn ("tryLock, message queue locked by other client.group: {} OtherClientId: {} NewClientId: {} {}", group, oldClientId, clientId, mq); return false; } finally { this.lock.unlock(); } } catch (InterruptedException e) { log.error("putMessage exception", e); } } else { } return true; } private Boolean isLocked(final String group, final MessageQueue mq, Final String clientId) {// Obtain ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); If (groupValue! = null) {lockEntry = groupvalue.get (mq); if (lockEntry ! Boolean locked = lockentry.islocked (clientId); Boolean locked = lockentry.islocked (clientId); if (locked) { lockEntry.setLastUpdateTimestamp(System.currentTimeMillis()); } return locked; } } return false; }

Each MQ client periodically sends LOCK_BATCH_MQ requests and locally maintains all queues of acquired locks:

ProcessQueue.java:

**LOCK_BATCH_MQ** interval public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));

ConsumeMessageOrderlyService.java:

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            ConsumeMessageOrderlyService.this.lockMQPeriodically();
        }
    }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}

The flowchart is as follows:

ConsumeMessageOrderlyService at the time of closing, will unlock all queues:

public void shutdown() { this.stopped = true; this.scheduledExecutorService.shutdown(); this.consumeExecutor.shutdown(); if (MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) { this.unlockAllMQ(); }}

Cause of the problem

The default interval for the client to send LOCK_BATCH_MQ is 20s, and the default interval for the Broker to expire is 60s.

Our cluster container orchestration uses K8S and has instance migration capabilities. When the cluster is under heavy pressure, new nodes (VMS) are automatically added and new service instances are created on them. When the pressure on some services in the cluster is low, some service instances will be shrunk, and some nodes will be reclaimed. However, there are service instances that cannot be reduced on the reclaimed nodes. In this case, these service instances need to be migrated to other nodes. This is what happened here in our business instance.

When problems occur, the migration, the old instance is shut down, but there is no waiting ConsumeMessageOrderlyService# shutdown to perform, results in the release, the lock is not active but wait for 60 s lock after the expiration date, the new instance to get the queue lock begin to consume.

Problem solving

  1. In the next version, add elegant shutdown logic for RocketMQ clients
  2. All service instances (RocketMQ clients) are configuredrocketmq.client.rebalance.lockIntervalShorten heartbeat time (5s), RocketMQ Broker configurationrocketmq.broker.rebalance.lockMaxLiveTimeShorten the expiration time (for example, 15s), but keep the expiration time 3 times the heartbeat time (3 times the design axiom in clusters)

Wechat search “my programming cat” to follow the public account, every day, easy to improve technology, win a variety of offers: