RocketMQ has made a number of improvements to its messaging system in version 4.9.1.

According to the update log of RocketMQ4.9.1, we extracted [Issues:2883] about message sending performance optimization, and the detailed link is as follows:

First of all, try to make a simple introduction to the above optimization points:

  • Optimize the lock on WaitNotifyObject (Item2)
  • Remove lock from HAService (Item3)
  • Remove lock from GroupCommitService (Item4)
  • Eliminate unnecessary array copies in HA (Item5)
  • Adjust the default values of several parameters for sending messages (item7)
    • sendMessageThreadPoolNums
    • useReentrantLockWhenPutMessage
    • flushCommitLogTimed
    • endTransactionThreadPoolNums
  • Reduce the scope of the TAB (Item8-12)

Through reading the above changes, it is concluded that the optimization means mainly include the following three points:

  • Remove unnecessary locks
  • Reduce lock granularity (range)
  • Modify message sending parameters

Next, combined with the source code, select a representative function for detailed analysis, together with the charm of Java high concurrency programming.

1. Remove unnecessary locks

This performance optimization focuses on the RocketMQ synchronous replication scenario.

Let’s start with a brief introduction to the programming techniques of RocketMQ master-slave synchronization.

After the RocketMQ master node writes the message to the memory, if synchronous replication is used, it needs to wait for the slave node to write the message successfully before sending the message to the client. It is also very skillful in code writing, as shown in the following figure:

Warm tip: In RocketMQ4.7, message sending is optimized, and the synchronous message sending model introduces the CompletableFuture of the JDK to realize asynchronous message sending.

Interpretation of core steps:

  1. The message sending thread calls the Commitlog aysncPutMessage method to write the message.
  2. Commitlog calls the submitReplicaRequest method, submits the task to GroupTransferService, and obtains a Future for asynchronous programming. Note that there is a wait for data to be written successfully to the slave node (internal ForkJoin, an internal thread pool based on the CompletableFuture mechanism).
  3. GroupTransferService checks the submitted tasks one by one to determine whether the requests are synchronized to the secondary node.
  4. If it has been copied to a slave node, it wakes up with the Future and returns the result to the message sender.

The GroupTransferService code is shown below:

In order to make it easier for everyone to understand the following optimization points, first summarize and refine the design concept of GroupTransferService:

  • We first introduce the combination of two lists named read and write linked lists.
  • External putRequest requests that call GroupTransferService will be stored in requestWrite.
  • The GroupTransferService’s run method obtains tasks from the requestRead list and determines whether the data corresponding to these tasks has been successfully written to the slave node.
  • Each time there is no data to read in the requestRead, the two queues interact to separate reads and writes and reduce lock contention.

The optimization points of the new version mainly include:

  • Change the lock type of putRequest to replace synchronized with spin locks
  • Remove unnecessary locks from the doWaitTransfer method

1.1 Replacing Synchronized with spin Lock

As shown in the following figure, GroupTransferService provides an interface putRequest to accept external synchronization tasks. The ArrayList needs to be locked for protection. Adding data to the ArrayList is a memory operation, which takes little time.

Therefore, synchronized is not necessary here, but can be spunlock. The implementation of spunlock is very lightweight, as shown in the figure below:

The entire lock implementation only needs to introduce an AtomicBoolean, lock, lock release is based on CAS operation, very lightweight, and the spin lock does not occur thread switch.

1.2 Removing Unnecessary locks

The abuse of “lock” is a very common phenomenon, multithreaded programming environment is a very complicated process of interaction in the process of writing code, we may feel unable to predict whether this code will be multiple threads concurrently, in order to be prudent, the simple and crude directly on the lock, the nature is the loss of performance, here to remove the lock, We need to combine the call chain of the class to determine whether we need to lock.

In the whole GroupTransferService running in a multi-threaded environment, the main things that need to be protected are requestRead and requestWrite sets. The purpose of the lock introduced is to ensure that these two sets can be accessed safely in a multi-threaded environment. Therefore, we should first sort out the operation process of the core method of GroupTransferService:

The main object of the doWaitTransfer method is the requestRead list, and this method is only called by the GroupTransferService thread, and the requestRead method is modified in the swapRequest, but the two methods are executed in serial. And in the same thread, there is no need to introduce the lock, which can be removed.

However, since the lock is removed, swapRequests will be locked because requestWrite will be accessed by multiple threads. The optimized code looks like this:

From this point of view, the main thing is to replace the lock type from synchronized to the more lightweight splock.

2. Reduce the range of locks

The code block wrapped in the lock is executed in serial, that is, it cannot be concurrent. In the case that locking cannot be avoided, reducing the lock code block can effectively improve the concurrency, as shown in the diagram below:

If multiple threads access lock1,lock2, in lock1 both methods domSomeThing1 and domSomeThing2 must be executed serially, and multiple threads access the Lock2 method at the same time, doSomeThing1 can be executed by multiple threads simultaneously, Serial execution is only needed when doSomething2, and the overall concurrency effect must be lock2, based on the theory that the best practice for a lock use is to have as few blocks of code wrapped within the lock as possible.

In older versions, the block of code in which messages were written to the lock was larger, and some actions that could be executed concurrently were wrapped in the lock, such as generating offsetMsgId.

The new version uses the idea of functional programming, just define the method to get msgId, will not be executed when the message is written, reduce the granularity of the lock, make offsetMsgId generation parallel, its clever programming means, worth our learning.

3. Adjust parameters related to sending messages

  1. sendMessageThreadPoolNums

    The default value is 1 prior to version 4.9.0. This value is adjusted to the number of CPU cores of the operating system and not less than 4. The adjustment of this parameter has advantages and disadvantages. This improves the concurrency of message sending, but at the same time, the message order will be out of order. The following example shows that there is no order problem in synchronous sending, so you can rest assured to modify it

    You are not advised to change this parameter in sequential consumption scenarios. In practice, the RocketMQ cluster should be governed, using a dedicated cluster for sequential consumption scenarios.

  2. UseReentrantLockWhenPutMessage MQ message written over the use of memory lock lock type, low version before the default is false, said the default using a spin lock; The new version uses ReentrantLock. The main advantage of spin is that there is no thread switching cost, but it is easy to waste CPU because of spin. Memory writes are fast in most cases, but RocketMQ relies on page caching, and if cache jitter occurs, the CPU waste is not worth it. After sendMessageThreadPoolNums set up more than 1, the type of lock to use already more stable.

  3. FlushCommitLogTimed: flushCommitLogTimed

    FlushCommitLogTimed is false in the lower version and CountDownLatch is used by default. In the higher version, Thread.sleep is used directly. The presumed reason is that flush threads are independent and do not need to interact directly with other threads, so there is no need to use CountDownLatch, a “foreign monk” specifically used for thread collaboration.

  4. endTransactionThreadPoolNums

    Mainly used to set the size of the transaction message thread pool.In the new version, transaction message values can be adjusted dynamically by adjusting the sending thread pool, which can be adjusted dynamically based on the results of the pressure measurement.


One last word (attention, don’t fuck me for nothing)

If this article is helpful or inspiring, please give it a thumbs up.

To master one or two Java mainstream middleware is a necessary skill to knock on BAT and other big factories. It gives you a learning route of Java middleware and helps you realize the transformation of the workplace.

Java advanced ladder, growth path and learning materials to help break through the middleware field