1. Background

This is a temporary file cleaning service that uses the consumer producer model.

The cleaning mode is segmented. Because the amount of data to be cleaned is large, data needs to be segmented to facilitate fault-tolerant processing.

The cleaning service model is shown as follows:

The time required for a consumer process to perform a section of cleaning task per consumer instance is approximately15 minutes.

Producers produce news much faster than they consume it.

2. Problems arise

The cleaning program started in the early hours of the morning, but after a few minutes, a crazy alarm was received:

From the log information, it is a crazy GC but no garbage can be collected and the heap is out of memory.

Let’s take a look at JVM monitoring:

It is true that the heap is running low, so why is it running low?

3. Problem positioning and solution

1) Heap memory dump

In the Settings – XX: HeapDumpPath = / app/logs – XX: + HeapDumpOnOutOfMemoryError, the JVM will happen in OOM the memory dump down condition.

So let’s do a wave of memory

Tool: JVisualVM

2) Problematic threads

I ran out of memory when copying byte arrays.

So let’s look at which instances in the heap are taking up too much memory

3) Heap analysis

Sort by size first:

This byte array takes up about 93% of memory, so that’s pretty much the problem.

Click inside to see what the reference to this instance is:

In order from largest to smallest, see thisMessageClientExtYou can basically guess, is it caused by too much message body?

Let’s go to source code analysis of a wave, this message body exactly how to exist and disappear.

4) RocketMQ Consumer source code analysis (RocketMQ-Client :4.3.2)

Before we do that, let’s look at some of RocketMQ’s key model diagrams:

Relationship between Topic and Queue:

Each Topic can have a different number of queues within different brokers, all of which need to be connected by consumers.

The Queue is selected by the producer according to its policy, which you can Google but I won’t go into here.

We currently have 10 brokers on line, each with 16 queues.

Relationship between Queue and consumer instance:

Here, for simplicity, all brokers are merged into one.

According to the RebalanceService policy, the Broker Queue is matched to the ProcessQueue that is local to the consumer instance.

Logic of consumption

The focus here is on when messages are pulled locally, ignoring how offsets are updated, sequential consumption, consumption failures, and so on.

Source code analysis of message body flow

1. Where is the message stored after it is pulled?

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage

/ / do some restrictive check first, focus on defaultMQPushConsumer. PullThresholdForQueue and defaultMQPushConsumer pullThresholdSizeForQueue
// Get the number of messages currently in the local message queue
long cachedMessageCount = processQueue.getMsgCount().get();
// Get the total size of the local message queue body (in MB)
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

// Check whether the number of messages exceeds the limit (1000 by default). If the number exceeds the limit, no message is pulled this time and the delayed task request is submitted
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) = =0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}".this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
// Determine whether the size of the message exceeds the limit (default value: 100M). If the size exceeds the limit, the message is not pulled this time and the delayed task request is submitted
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    ......
    return; }...// The onSuccess() method in PullCallback focuses on the action to be performed after the pull message succeeds:
// Commit pulled messages to the local message processing queue
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
// Submit the consumption request
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
// Submit the next pull message request
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
Copy the code

After the message is pulled, the processQueue.putMessage() method is called. Let’s look at this method:

// Add the message to the msgTreeMap in processQueue
public boolean putMessage(final List<MessageExt> msgs) {
    boolean dispatchToConsume = false;
    try {
        this.lockTreeMap.writeLock().lockInterruptibly();
        try {
            int validMsgCnt = 0;
            // Loop to add messages
            for (MessageExt msg : msgs) {
                MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                if (null == old) {
                    validMsgCnt++;
                    this.queueOffsetMax = msg.getQueueOffset();
                    // Update the count of the total size of messagesmsgSize.addAndGet(msg.getBody().length); }}// Atomic class to count how many new message bodies are added for the previous restriction judgmentmsgCount.addAndGet(validMsgCnt); . }finally {
            this.lockTreeMap.writeLock().unlock(); }}catch (InterruptedException e) {
        log.error("putMessage exception", e);
    }

    return dispatchToConsume;
}
Copy the code

Highlight: After the message body is pulled, it is placedprocessQueueIn the methodmsgTreeMapinside

2. Then how to consume and delete?

Going back to the onSuccess() method in the PullCallback, it says:

DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest

This is to submit a task to the thread pool, which has been touched upon here, to perform specific message consumption logic

There is one for consumption in ConsumeMessageConcurrentlyService this class message thread pool consumeExecutor

Look at the constructor of this class:

public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl, MessageListenerConcurrently messageListener) {
    this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
    this.messageListener = messageListener;

    this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
    this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

    This is where the thread pool is initialized. The core thread and the maximum thread are consumeThreadMin and consumeThreadMax
    this.consumeExecutor = new ThreadPoolExecutor(
        this.defaultMQPushConsumer.getConsumeThreadMin(),
        this.defaultMQPushConsumer.getConsumeThreadMax(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.consumeRequestQueue,
        new ThreadFactoryImpl("ConsumeMessageThread_"));

    this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
    this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
Copy the code

Let’s look at this method for submitting tasks:

public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
    if (msgs.size() <= consumeBatchSize) { // If the number of commit messages is smaller than the number of batch messages, submit the consumption request directly
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            // Thread pool reject exception will be submitted later
            this.submitConsumeRequestLater(consumeRequest); }}else { // The number of commit messages is greater than the number of batch messages, split into multiple consumption requests
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                this.submitConsumeRequestLater(consumeRequest); }}}}Copy the code

Now that you’ve seen this, it’s not too much to take a look at ConsumeRequest’s run method

Let’s focus on how our own defined consumption logic is invoked. What happens when message consumption completes?

public void run(a) {
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }
    // Get our custom consumption logic here
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null; .long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
        if(msgs ! =null && !msgs.isEmpty()) {
            for(MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); }}/ / perform our logic here, after obtaining an execution state (success | | failure)
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true; }...// Process the consumption result
    if(! processQueue.isDropped()) {/ / call processQueue. RemoveMessage kill () method will be completed consumption
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs); }}Copy the code

This code:

ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);

The message body is wiped out.

Consumer source code summary

Before we get to the source code, we want to know how a message body flows through the consumer side, and now we have a general understanding of the flow chart and source code.

For us to set parameters: pullThresholdForQueue, pullThresholdSizeForQueue, consumeThreadMin, consumeThreadMax also know exactly how to play a role.

6) OOM problem solved

The message body is stored in the msgTreeMap of processQueue. The message body is MessageExt, which has a subclass called MessageClientExt.

Let’s start by wondering if it’s a memory leak. Analysis of a wave, in fact it should not be, memory leaks, should be the message body is crazy pulled down, but after the completion of the consumption is not to kill, this time may be memory leaks, but RocketMQ our other online application has been running for a long, long time, if it is a memory leak, the online service should be a is OOM, This is a major MQ BUG.

Going back to the data we saw in our heap analysis, take a little look and calculate:

There are 55 ProcessQueue instances in memory, that is, the consumer instance is connected to 55 MessageQueue.

The parameters we set for the consumer instance look like this:

A rough calculation: 55(ProcessQueue) * 100(queue capacity: pullThresholdForQueue) * 428311(message body size, in bytes) = 2246MB

That is, if all the message bodies are pulled down as we are doing now, 2246MB of memory is required.

Take a look at the number of Messageclientexts currently in memory:

Obviously, it’s easy to run out of memory.

That problem is very good to solve ah, according to the calculation to adjust the parameters.

Attached: Failed to set location parameters

The pullThresholdForQueue will change to 100 if the message body is too large, and the pullThresholdForQueue will change to 100 if the message body is too large. It’s weird because we set it to 40.

So we tracked the change of this value:

With a breakpoint in front of the property, Debug starts:

The first time I go into this place, I set it to zero, okay, initialization

The second time I go in, I set it to 40, and that’s fine

Oh oh, let’s do it again. Set it to 100. It’s unbearable

Flipping is one way to get in here, and sure enough:

There is a BUG, a small BUG in the packaged components of the company, difficult to top.

The problem summary

It’s hard to know what this pullThresholdForQueue parameter means for the size of a single local queue without analyzing a wave of source code, but you can’t have just one local queue in a consumer instance.

In this case, if the producer is fast, the consumer is slow, and the message body is large, it will be easier to OOM.

Small questions and guesses

PullThresholdForQueue = 10

Pick up any ProcessQueue instance and observe that the msgCount value is not strictly less than 10, but rather a bit larger.

Why is that?

Here’s a half-baked guess:

In the processQueue.putMessage() method, the pull message is pulled multiple times at a time, and the value is incremented without judging whether the limit has been exceeded, nor does it seem to be. Only before preparing to pull the message body, a judgment is made, if more than, no pull.

As a result, msgCount is not completely accurate.