This program recording

    • 1, the phenomenon of
    • 2. Interpretation of principles
      • 2.1 Overview of RocketMQ Network processing mechanism
      • 2.2 pair. GetObject1 (.) rejectRequest ()
        • 2.2.1 isOSPageCacheBusy ()
        • 2.2.2 isTransientStorePoolDeficient ()
      • 2.3 transientStorePoolEnable
      • 2.3.1 MappedFile
      • 2.3.2 TransientStorePool Initialization
    • 3. Solution to the phenomenon
      • 3.1 [REJECTREQUEST] system busy
      • 3.2 too many requests and system thread pool busy, RejectedExecutionException
      • 3.3 [PC_SYNCHRONIZED] broker busy
      • 3.4 Broker busy, period in queue: % SMS, size of queue: %d
    • 4. Practical suggestions
      • 4.1 open transientStorePoolEnable
      • 4.2 Expanding Broker Servers

1, the phenomenon of

I have recently received a number of feedback from RocketMQ users who occasionally receive one of the following four error messages during message delivery in production environments: 1) [REJECTREQUEST]system busy, start flow control for a while 2) Too many requests and system thread pool busy, RejectedExecutionException 3) [PC_SYNCHRONIZED] broker busy, 4) [PCBUSY_CLEAN_QUEUE] Broker busy, start flow control for a while, period in queue: %sms, size of queue: %d

2. Interpretation of principles

In the selection of message-oriented middleware, it is suggested to take the implementation language of middleware into consideration if the middleware to be selected can meet the needs of business both in function and performance. After all, it is more controllable to choose a middleware implemented with a language one is good at. In the case of exceptions, we can extract the error message keyword System BUSY based on our experience, search RocketMQ source code directly, and get the code that throws the above error message as follows:



The following code entry: org.apache.rocketmq.remoting.netty.Net tyRemotingAbstract# processRequestCommand. Can be seen from the diagram, throw out the key reason: wrong pair. GetObject1 () rejectRequest (), and throw RejectedExecutionException anomalies.

Note: In this article, the source code is only used as the key evidence of the analysis, so this article will only point out the key source code, and will not track the entire implementation process in detail. If you want to learn more about its implementation, check out the author’s RocketMQ Technology Insider.

2.1 Overview of RocketMQ Network processing mechanism

RocketMQ’s network design is worth learning from. Firstly, different request codes are defined for different requests on the client side. The server will classify client requests, and each command or type of request command defines a processor (NettyRequestProcessor). Each NettyRequestProcessor is then bound to a separate thread pool for command processing. Different types of requests are processed using different thread pools to achieve thread isolation.

NettyRequestProcessor, Pair, and RequestCode are introduced to facilitate the following description. The key points are as follows:

  1. NettyRequestProcessor RocketMQ Server request processor, for example, SendMessageProcessor is the message sending processor and PullMessageProcessor is the message pulling command processor.
  2. RequestCode Specifies the request type. For example, SEND_MESSAGE indicates that the request is to send a message, and PULL_MESSAGE indicates that the request is to pull a message.
  3. Pair encapsulates the binding relationship between NettyRequestProcessor and ExecuteService. In RocketMQ’s network processing model, each NettyRequestProcessor is bound to a specific thread pool in which all the processing logic for that NettyRequestProcessor runs.

2.2 pair. GetObject1 (.) rejectRequest ()

This article focuses on the SendMessageProcessor#rejectRequest method because the problems raised by readers are all in the process of sending messages. SendMessageProcessor#rejectRequest

public boolean rejectRequest(a) {
    return this.brokerController.getMessageStore().isOSPageCacheBusy() ||               / / @ 1
        this.brokerController.getMessageStore().isTransientStorePoolDeficient();        / / @ 2
}
Copy the code

There are two conditions for rejecting a request, and true is returned if either of them is met.

@1: Os PageCache busy, check whether the Os PageCache is busy, if busy, return true. I’m sure you’re just as curious as I am, how does RocketMQ tell if pageCache is busy? The following will focus on the analysis.

Is the code @2: transientStorePool insufficient?

2.2.1 isOSPageCacheBusy ()

DefaultMessageStore#isOSPageCacheBusy()

public boolean isOSPageCacheBusy(a) {
    long begin = this.getCommitLog().getBeginTimeInLock();  // @1 start
    long diff = this.systemClock.now() - begin;                         // @1 end

    return diff < 10000000
                && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills();     / / @ 2
}
Copy the code

Code @1: First explain the meanings of the two local variables begin and diff:

  • Begin is the time when a message is written to a Commitlog file and the lock is held. To be precise, it is the time when the message body is appended to a memory mapped file (DirectByteBuffer) or pageCache(FileChannel#map) For details, refer to CommitLog#putMessage.
  • Diff Total duration of holding a lock during a message appending, that is, the time it takes to appending a message to a memory-mapped file or pageCache.

Code @ 2: if the additional process for more than a message Broker configuration file osPageCacheBusyTimeOutMills, thinks pageCache busy, osPageCacheBusyTimeOutMills the default value is 1000, 1 s.

2.2.2 isTransientStorePoolDeficient ()

DefaultMessageStore#isTransientStorePoolDeficient

public boolean isTransientStorePoolDeficient(a) {
    return remainTransientStoreBufferNumbs() == 0;
}
public int remainTransientStoreBufferNumbs(a) {
    return this.transientStorePool.remainBufferNumbs();
}
Copy the code

The TransientStorePool#remainBufferNumbs method is finally called.

public int remainBufferNumbs(a) {
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        return Integer.MAX_VALUE;
}
Copy the code

If you enable transientStorePoolEnable mechanism, returns the current number of ByteBuffer available which whole isTransientStorePoolDeficient method purpose is whether there is available ByteBuffer, if not, The pageCache is busy. What is the transientStorePoolEnable mechanism?

2.3 transientStorePoolEnable

Java NIO memory mapping mechanism, provides the file is mapped to the memory of the file system mechanism, to achieve conversion to the operation of the file to the operation of the memory address, greatly improves the IO characteristics, but this part of the memory is not permanent memory, can be replacement to swap memory (virtual memory), RocketMQ in order to improve the performance of message is sent, The memory locking mechanism is introduced to map the commitlog files to memory and provide the memory locking function to ensure that these files always exist in memory. The control parameter of this mechanism is transientStorePoolEnable.

2.3.1 MappedFile

Focus on the initialization of MappedFile’s ByteBuffer writeBuffer and MappedByteBuffer properties, because these two methods are direct data structures for write and lookup operations.



The two key points are:

  • ByteBuffer writeBuffer if opened transientStorePoolEnable, use ByteBuffer. AllocateDirect (fileSize), create (Java. Memory mapping mechanism of nio). If it is not enabled, it is null.
  • MappedByteBuffer MappedByteBuffer is created using the FileChannel#map method, which is a PageCache.

When a message is written:

MappedFile#appendMessagesInner



If writerBuffer is not null, it indicates that the transientStorePoolEnable mechanism is enabled. The message is written to writerBuffer first; if writentStorePoolEnable is null, the message is written to mappedByteBuffer.

Message pull (read message) :

MappedFile#selectMappedBuffer



When a message is read, it is read from the mappedByteBuffer (pageCache).

Write to writerBuffer (transientStorePoolEnable) and read from mappedByteBuffer (mappedByteBuffer)

To clarify the intent of transientStorePoolEnable introduction, I’ve introduced the insights of Rocketmq community contributor Zongtang Hu on this issue.

There are two ways to read and write:

  1. The first way, Mmap+PageCache, read and write messages are going to PageCache, this way read and write in PageCache will inevitably have a lock problem, in the case of concurrent read and write operations, there will be page missing interrupt reduction, memory lock, polluted page write back.
  2. The second, DirectByteBuffer +PageCache two-tier architecture, this can achieve read and write message separation, When a message is written to a DirectByteBuffer — in out-of-heap memory, the message is read with the PageCache(for DirectByteBuffer, there is a two-step flush to the PageCache and another step to the disk file). Avoid many of the memory operations are easy to block, reduce latency, such as page missing interrupt reduction, memory locking, contaminated page write back.

Tips: If you want to further communicate with Great God Hu zongtang, you can follow his Github account: github.com/zongtanghu

If you enable memory locking on transientStorePoolEnable, is memory overflow caused by increasing commitlog files?

2.3.2 TransientStorePool Initialization



By default, five DirectByteBuffers (external memory) are initialized and memory locking is enabled for TransientStorePool. That is, this portion of memory will not be replaced. You can control this by using the transientStorePoolSize parameter.

When a message is written to a message, a DirectByteBuffer is first fetched from the pool to append the message. What happens when all five DirectByteBuffers are full of messages? From the design of RocketMQ, only one commitlog file is written sequentially at a time, and a new commitlog file is created after one is written. Therefore, the design idea of TransientStorePool is to recycle the five DirectByteBuffers. Only the contents written to DirectByteBuffer can be reused after being committed to PageCache. The corresponding code is as follows: TransientStorePool#returnBuffer

public void returnBuffer(ByteBuffer byteBuffer) {
    byteBuffer.position(0);
    byteBuffer.limit(fileSize);
    this.availableBuffers.offerFirst(byteBuffer);
}
Copy the code

The call stack is as follows:



From the above analysis, there is no memory overflow as messages are written.

3. Solution to the phenomenon

3.1 [REJECTREQUEST] system busy



It throws a source entry point: NettyRemotingAbstract#processRequestCommand. Its implementation principle has been described in detail in the principle analysis section above, summarized below.

This error is thrown if the Broker PageCache is busy when transientStorePoolEnable is disabled. If the PageCache is busy, this error is thrown if the PageCache has been held for more than 1s while appending messages to the PageCache. If the transientStorePoolEnable mechanism is enabled, this error is thrown if no out-of-heap memory is available in TransientStorePool.

3.2 too many requests and system thread pool busy, RejectedExecutionException



NettyRemotingAbstract#processRequestCommand, which is called immediately after 3.1, is thrown when a task is being executed to a thread pool and rejected by the thread pool.

BrokerController#registerProcessor



The queue length of the thread pool defaults to 10000, we can change the default value by sendThreadPoolQueueCapacity.

3.3 [PC_SYNCHRONIZED] broker busy



DefaultMessageStore#putMessage. If the PageCache is busy, it will throw the above error.

3.4 Broker busy, period in queue: % SMS, size of queue: %d



Brokerfastfail #cleanExpiredRequest is the entry point for the source code. This method is called every 10 seconds, but one of the prerequisites for execution is for the Broker to enable fast Failure. This is enabled by default and is set to brokerFastFailureEnable. The main point of this method is to detect the PageCache every 10 seconds. If the PageCache is detected to be busy, and there are queued tasks in the sending queue, the system is directly not waiting, directly throw a busy error, so that the queuing thread fails quickly, and the wait is over.

4. Practical suggestions

The system BUSY and the broker busy are sent because the PageCache is busy. If the PageCache is busy, the PageCache is busy. If the PageCache is busy, the PageCache is busy. . For example:

  • OsPageCacheBusyTimeOutMills PageCache system timeout time, defaults to 1000, said the 1 s, that if we can increase the value, for example, is set to 2000 or 3000. Author’s opinion: Very undesirable.
  • SendThreadPoolQueueCapacity Broker server processing queue, the default is 10000, if a backlog of 10000 requests in the queue, will throw RejectExecutionException. Author’s opinion: Not desirable.
  • BrokerFastFailureEnable Enables fast Failure. Default is True, which means that if the Broker server PageCache is busy, if the sendThreadPoolQueue queue is not empty, there is a queue of send requests waiting to be executed. The wait ends and returns to Broker BUSY. You can also avoid throwing this error if you do not enable fast failure. Author’s opinion: Very undesirable.

The PageCache of the system is busy, and the PageCache of the system is busy. It takes more than 1s to send a message to the PageCache. If you continue to send messages to the Broker server and wait, its TPS is simply not enough, and it is not even high performance messages. This is why a quick failure mechanism is used to return an error directly to the message sender, who by default retries twice to send the message to another Broker to ensure its high availability.

The following solutions are proposed according to personal opinions:

4.1 open transientStorePoolEnable

TransientStorePoolEnable =true in broker.config.

  • Basis of the solution: When read/write separation is enabled, messages are appended to DirectByteBuffer(out-of-heap memory) before being sent. Then, in the asynchronous flush mechanism, the contents of DirectByteBuffer are submitted to PageCache and then written to disk. Message pull, pull directly from the PageCache, achieve read and write separation, reduce the pressure of PageCaceh, can fundamentally solve the problem.
  • Disadvantages: It increases the possibility of data loss. If the Broker JVM process exits unexpectedly, messages committed to PageCache will not be lost, but those messages that are in the out-of-heap memory (DirectByteBuffer) that have not yet been committed to PageCache will be lost. But in general, the RocketMQ process is unlikely to exit.

4.2 Expanding Broker Servers

Plan basis:

When the Broker server itself is busy, it fails quickly and avoids the Broker for a period of time. In this way, the recovery of the Broker provides time assurance. The architecture of the Broker itself supports distributed horizontal scaling, increasing the number of Topic queues and reducing the load on a single Broker server. To avoid PageCache.

Tips: When expansion and Broker, can copy a cluster in a Broker service under ${ROCKETMQ_HOME} / store/config/switchable viewer. The json to the new Broker server specified directory, avoid to Broker on new Broker server create a queue, The routing information for the Topic is then dynamically retrieved by both the message sender and the message consumer.

It is also possible to upgrade the existing Broker, such as increasing memory and replacing mechanical disks with SSDS. However, in this case, the Broekr server usually needs to be restarted, which is not convenient to expand.

This article is introduced here, if you think the article useful, please help to like, forward, thank you. Dear readers, is there a better solution? Welcome to leave a message and the author interaction, common discussion.


See the article, I am Wei Ge, keen on systematic analysis of JAVA mainstream middleware, pay attention to the public account “middleware interest circle”, reply column can get systematic column navigation, reply information can get the author’s learning mind map. \