Brief introduction: Kuaishuang needs to build a messaging system mainly for online business as a complement to Kafka. RocketMQ, a distributed messaging middleware with low latency, high concurrency, high availability and high reliability, is exactly what we need.

Author: Huang Li

Li Huang, over 10 years of software development and architecture experience, passionate about code and performance optimization, development and involvement in a number of open source projects. He used to work as a business architect in Taobao for many years, and is currently responsible for the construction of online messaging system in Kuaishou.

Why build an online messaging system

Before RocketMQ was introduced Kafka was being used extensively by Fast Hand, but Kafka is not always the best choice, such as in the following scenarios:

  • The business wants to be able to retry individual consumption failures without blocking subsequent consumption of other messages.
  • The business expects messages to be delivered with a delay.
  • The business needs to ensure that the database operation and message sending are consistent (i.e., transaction sending) when sending.
  • To troubleshoot problems, services sometimes require the ability to query a single message.

To cope with these scenarios, we need to build a messaging system that is primarily for online businesses to complement Kafka. RocketMQ was a good match for the business requirements, a simple deployment structure, and a large number of companies in some of the messaging middleware we looked at, so we adopted RocketMQ.

Deployment mode and landing policy

There are usually two ways to create an open software within an existing system:

The first way is to make deep modifications on the basis of open source software, which makes it easy to achieve customized functions needed within the company. But how to upgrade from the community open source version?

The second is to try not to modify the community version (or reduce the number of incompatible changes), but to wrap it around or on top of it to implement the customization needed within the company.

Note: The picture of method 1 is quite extreme, in fact, many companies are a combination of method 1 and method 2.

We chose option two. Initially, we were using version 4.5.2, and since the community 4.7 release significantly reduced the latency of synchronous replication, we were able to easily upgrade the 4.7 series and enjoy the benefits of the new release.

When deploying a cluster, there are many deployment policy choices:

• Large clusters vs. small clusters

• Select the number of copies

• Synchronous versus asynchronous flush

• Synchronous versus asynchronous replication

• SSD vs mechanical hard disk

Large clusters have better performance elasticity, while small clusters have better isolation. In addition, small clusters do not need to be deployed across availability zones /IDC, so they have better robustness. We value stability very much, so we chose small clusters. Cluster synchronous replication Asynchronous disk flushing using SSDS is preferred.

Client encapsulation policy

As mentioned above, we did not make deep changes in Rocketmq, so we need to provide an SDK to provide the customization needed within the company. The SDK looks something like this:

External only provide the most basic API, all access must be through the interface we provide. A clean API is like the tip of an iceberg, except for the simple external interface, everything below can be upgraded and replaced without breaking compatibility.

Business development is also easy, as long as we need to provide Topic (globally unique) and Group, we can produce and consume without providing environment, Name Server address, etc. The SDK internally resolves the address of the cluster Name Server according to the Topic, and then connects to the corresponding cluster. The production and test environments resolve different addresses, thus achieving isolation.

The figure above is divided into three layers. The second layer is generic and the third layer corresponds to a specific MQ implementation, so it is theoretically possible to switch to another messaging middleware without changing the client program.

The SDK integrates the hot change mechanism, which allows dynamic configuration, such as routing policy delivery (changing the address of cluster Name server or connecting to another cluster), client thread count and timeout period, without restarting the client. Maven’s enforced update mechanism ensures that the SDKS used by the business are basically up to date.

Cluster load balancing & equipment room Dr

All topics are allocated to two availability zones by default, and producers and consumers are connected to at least two separate clusters (distributed in different availability zones) at the same time, as shown below:

The producer connects to two clusters at the same time. If availability zone A fails, the traffic is automatically switched to cluster 2 in availability zone B. We developed a widget to achieve adaptive cluster load balancing, which includes the following capabilities:

• 10-million-level OPS

• Flexible weight adjustment strategies

• Health check support/event notification

• Concurrency control (automatically reduces the number of requests to slow servers)

• Resource priority (like Envoy, implement local room priority, or select a subset when there are many servers called)

• Automatic priority management

• Incremental thermal change

In fact, it’s not just for message producers, it’s a generic caller load balancing library, available on Github:

https://github.com/PhantomThief/simple-failover-java

The core SimpleFailover interface and PriorityFailover class pass no third-party dependencies and are very easy to integrate.

Multiple messaging capabilities

Delay message

Delayed messages are a very important business function, but RocketMQ’s built-in delayed messages can only support a few fixed Delay levels, so we developed a separate Delay Server to schedule delayed messages:

Instead of sending the Delay message directly to the Delay Server, the above structure changes the Topic and stores it in RocketMQ. The benefit of this is that you can reuse the existing message sending interface (and all the extensions above). For the business, you only need to specify an additional delay time field when constructing the message, and the rest of the usage is unchanged.

Transaction message

Since version 4.3, RocketMQ has supported transaction messages, which can ensure that both the local transaction and the consumption send succeed or fail at the same time, and can be useful for some business scenarios. The usage and rationale of transaction messages are well documented and will not be covered here. However, there is little practical online information about transaction messaging, and we can give some suggestions.

First of all, the transaction messaging function is constantly improving and should use the latest version, at least after 4.6.1, to avoid many problems.

Second, transaction message performance is not as good as normal message, it actually generates 3 messages internally (1 in phase 1, 2 in phase 2), so the performance is about 1/3 of normal message, if the transaction message volume is large, you need to plan for the capacity. There is only one callback scheduler thread, so don’t put it to extreme stress.

Finally, there are some parameter considerations. In the configuration of the broker:

  • The default value of the transientStorePoolEnable parameter must be false, otherwise there will be serious problems.
  • EndTransactionThreadPoolNums is two-phase transaction message thread size, sendMessageThreadPoolNums specifies the phase processing thread pool size. If the processing speed can’t keep up with the stage of the second phase would lead to two phase missing messages back to check in great quantities, so suggest endTransactionThreadPoolNums should be greater than sendMessageThreadPoolNums, suggest at least 4 times.
  • UseReentrantLockWhenPutMessage is set to true (the default value is false), in order to avoid severe unfair lock the thread, causing two phase processing threads not rob a lock for a long time.
  • The transactionTimeOut default value of 6 seconds is too short and may result in message loss if the transaction takes longer than 6 seconds to execute. It is recommended to change to about one minute.

Producer client also have a note, if you have multiple sets of broker, and 2 copies (a Slave), should open retryAnotherBrokerWhenNotStoreOK, lest some Slave after fault occurs, a large number of messages to fail.

Distributed reconciliation monitoring

In addition to comparing some conventional monitoring methods, we developed a monitoring program to do distributed reconciliation. You can find out if there are any exceptions to our cluster and the SDK we provide.

This is done by creating a Topic dedicated to monitoring on each Broker, using our own SDK framework to connect to clusters (just like our business users), and monitoring producers sending a small number of messages to each cluster. Then check whether the send is successful:

The producer only evaluates these results and does not determine whether they are normal. Specific monitoring (or walkthrough) scenarios can be configured with different alarm rules.

After receiving the message, the consumer will pass the TCP bypass Ack to the producer. The producer will perform distributed reconciliation and calculate the reconciliation result:

  • Messages are received
  • Message loss (or message not received due to timeout)
  • Duplicate message
  • The time lag between message generation and final consumption
  • Ack producer failure (handled by consumer)

Similarly, the monitoring program is only responsible for the dot, and the alarm rules can be configured separately.

This mechanism can also be used for distributed performance testing and fault testing. During a stress test, when each message is Ack, it puts a lot of pressure on the producer’s memory, because the message it sends needs to stay in memory for a certain period of time (until it reaches the checking time for the message), during which time consumer Ack’s or repeated Ack’s need to be recorded. So we implement the function of sampling reconciliation proportionally, so that only the messages that need to be checked are kept in memory for a certain period of time.

By the way, when we do the pressure test, the criteria are that asynchronous production does not fail, consumption does not delay, and every message is not lost. This is done to ensure that the pressure measurement can give more accurate performance numbers that can be referenced by the on-line system, rather than making ideal conditions and pursuing a large number. For example, asynchronous production is more vulnerable than synchronous production (if the client is in synchronous production, the synchronous client will be clogged when the broker jitter, resulting in lower broker pressure and less failure, but you will see the rate fluctuate), which is more close to the actual production environment. We choose asynchronous production to evaluate.

Performance optimization

The default Broker parameters are not optimal for our scenarios (SSD, synchronous replication, asynchronous flush), and some parameters may not be optimal for most scenarios. Here are some important parameters for your reference:

Thanks to a simple, almost zero-dependency deployment pattern, we can deploy small clusters very cheaply; No magic changes to the community version to ensure that we can upgrade in time; The unified SDK portal facilitates cluster maintenance and function upgrade. Multi-machine room multi-activity is realized by composite small cluster and automatic load balancing. Take full advantage of RocketMQ’s capabilities such as transactional messaging, deferred messaging (enhancements) to meet the diversity of business needs; Monitor the correctness of each Broker and our SDK through automated distributed reconciliation; This question also carries on some performance parameter to share, but writes relatively simple, basically only said how to tune, but did not say why, later we will write another article to detail. RocketMQ is currently in use in most of the company’s lines of business, and we look forward to better growth in the future! The original link to this article is ali Cloud original content, shall not be reproduced without permission.