preface

Through a brief review of the history of Aliware messaging engine, this paper begins with the challenge of low latency faced by the Double eleven messaging engine. Through the classic application scenario, it describes the problems that may be faced — slow response, avalanche, poor user experience, and subsequent trading decline. In order to deal with these uncontrollable flood peak data, the middleware team, through a lot of research and practice, launched a low latency and high availability solution, which has certain universality in the field of distributed storage. On this basis, through the planning of the existing limited resources, it also launched a hierarchical capacity guarantee strategy, which can effectively guarantee the high throughput of key businesses through flow limiting, downgrade, and even circuit breaker technology. The successful support group, including overseas businesses, smoothly and comfortably passed the Double 11 peak. At the same time, in some highly reliable and highly available scenarios, the middleware team focused on the introduction of high-availability solutions based on multi-copy mechanism, which can dynamically identify disaster scenarios such as machine downtime and machine room disconnection, and automatically realize the master/standby switchover. The whole switching process is transparent to users and requires no intervention from operation and maintenance developers, greatly improving the reliability of message storage and the high availability of the entire cluster.

1. Family history of messaging engines

Ali middleware message engine has gone through three generations of evolution until today. The first generation, push mode, data storage using relational databases. In this mode, the message has a very low latency, especially in the high-frequency trading scenario of Ali Taobao, which is very widely used. Second generation, pull mode, proprietary message storage of your own development. It can match the throughput performance of Kafka, but considering the application scenarios of Taobao, especially its high reliability scenarios such as transaction links, the message engine does not pursue throughput, but puts stability and reliability in the first place. Because of the use of long – link pull mode, the real-time message is not inferior to push mode. After years of online matchmaking for the first two generations, the middleware team developed RocketMQ in 2011, a high-performance, low-latency messaging engine that features pull mode and push mode. In 2012, it was open source, and experienced 6 years of Double 11 core transaction link test, which became stronger as time went by. It has been donated to the Apache Foundation (ASF) and is expected to become the third heavyweight distributed messaging engine in the Apache community after ActiveMQ and Kafka. Up to now, RocketMQ has served thousands of applications of Alibaba Group well. On The day of Double 11, there was even an incredible trillion-dollar message flow, playing a pivotal role in the stability of the group.

2. Low latency usability exploration

The wind is blowing fast, and the sails are flying. Thousands of miles in the Russian ares, three rivers sit super suddenly. – meng haoran

2.1 Low latency and availability

As the Java language ecosystem improves and JVM performance improves, C and C++ are no longer the only options for low-latency scenarios. This section highlights some of RocketMQ’s explorations in low-latency availability. Application performance metrics are generally measured in terms of throughput and latency. Throughput refers to the number of requests a program can handle over time. Latency is the end-to-end response time. Low latency can be defined differently in different environments. For example, low latency can be defined as within 200ms in a chat application and within 10ms in a transaction system. Latency is influenced by many factors as opposed to throughput, such as CPU, network, memory, operating system, etc. According to Little’s Law, when the latency becomes high, the requests residing in the distributed system will increase dramatically, resulting in the unavailability of some nodes, and even the unavailability state will spread to other nodes, resulting in the loss of service capacity of the whole system, which is also known as avalanche. So building low-latency applications is a great way to improve the usability of the overall distributed system.

2.2 Low latency exploration path

RocketMQ, as a messaging engine, is best used for asynchronous decoupling and peak filling. On the one hand, distributed applications use RocketMQ for asynchronous decoupling, allowing applications to expand and shrink at will. On the other hand, when the flood peak data arrives, a large number of messages need to be piled into RocketMQ, and the back-end application can read the data at its own consumption rate. It is therefore critical to ensure low latency for the RocketMQ write message link. During this year’s Double 11, Tmall released a new game called Hongbao Volcano. The game is very latency sensitive and can only tolerate delays of up to 50ms, and RocketMQ wrote messages with a significant delay of 50 to 500ms in the early stages of the pressure test, resulting in a large number of failures at the peak of the red envelope eruption that severely impacted the front-end business. The graph below shows the statistics of the delay of writing messages in the compression red packet cluster.

As a pure Java messaging engine, RocketMQ’s indigenous storage components rely on Page Cache for acceleration and heap, which means performance is affected by factors such as the JVM, GC, kernel, Linux memory management mechanisms, file IO, and so on. As shown in the figure below, there is a risk of delay at every step from sending a message from the client to persisting. Based on observations of online data, the RocketMQ write message link had occasional delays of up to several seconds.

2.2.1 the JVM pauses

JVM (Java virtual machine) can produce many pauses during running. Common examples are GC, JIT, RevokeBias, RedefineClasses, etc. The biggest application impact is GC pauses. RocketMQ tries to avoid Full GC, but pauses caused by Minor GC are inevitable. Tuning for GC is a very Galileo problem and requires extensive testing to help applications tune GC parameters, such as by adjusting heap size, timing of GC, optimizing data structures, and so on. For other JVM pauses, can pass – XX: + PrintGCApplicationStoppedTime the JVM pauses output to the GC log. + PrintSafepointStatistics – by – XX: XX: PrintSafepointStatisticsCount = 1 output specific pause, and targeted optimization. For example, in RocketMQ it was found that RevokeBias generated a lot of pauses and the bias locking feature was turned off with -xx: -usebiasedlocking. In addition, the output of GC logs can cause file IO, sometimes causing unnecessary pauses. You can export GC logs to TMPFS (memory file system), but TMPFS consumes memory. To avoid wasting memory, you can use -xx :+UseGCLogFileRotation to roll GC logs. In addition to the FILE IO generated by GC logs, the JVM will output some statistics required by the jstat command to the/TMP (hsperfData) directory. You can disable this feature by using -xx :+PerfDisableSharedMem and use JMX instead of Jstat.

2.2.2 Lock — The “benefit” of synchronization

As a critical section protection mechanism, locks are widely used in the development of multithreaded applications. However, locks are a double-edged sword. Excessive or incorrect use of locks will lead to performance degradation of multi-threaded applications. In Java, unfair lock is adopted by default. The lock is directly obtained without considering the queuing problem. If the lock fails to be obtained, the lock is automatically queued. Unfair locks cause threads to wait too long and delay too much. If fair lock is adopted, it will bring greater performance loss to the application. Synchronization, on the other hand, causes context switching, which incurs some overhead. Context switching is generally microsecond level, but when there are too many threads and the race pressure is high, tens of milliseconds of overhead can be incurred. Locksupport. park can be used to simulate a context switch for testing. To avoid the latency associated with locking, the CAS primitive was used to make RocketMQ core links lock-free, significantly improving throughput while reducing latency.

2.2.3 Memory – not so fast

Due to the memory management mechanism of Linux, applications can sometimes experience high latency when accessing memory. There are two main types of memory in Linux: anonymous memory and Page Cache. Linux uses as much memory as possible for caching, and in most cases, the server has less memory available. When the available memory is small, the application application or access to new memory pages will cause memory reclamation. When the speed of background memory reclamation is not as fast as the speed of allocated memory, the application will enter Direct Reclaim, and the application will spin to wait for the completion of memory reclamation, resulting in a huge delay, as shown in the following figure.

On the other hand, the kernel also reclaims anonymous memory pages, and the next access after the anonymous memory page is paged out will generate file IO, resulting in latency, as shown in the figure below.

The delays in both cases can be avoided by tuning kernel parameters (vm.extra_free_kbytes and vm.swappiness). Linux generally manages memory on a per-page basis. A page is usually 4k in size. When there is read and write contention on the same page of memory, there will be latency.

2.2.4 Page Cache — Pros and cons

Page Cache is a Cache of files that is used to speed up reading and writing to files, and it provides RocketMQ with more heap power. RocketMQ maps data files to memory, writes messages to the Page Cache first, and persists messages in an asynchronous flush mode (which also supports synchronous flush). Messages can be read directly from the Page Cache. This is the pattern commonly used by distributed storage products in the industry, as shown in the following figure:

In most cases, the read/write speed of this mode is relatively fast. However, when the operating system performs dirty page write back, memory reclamation, and memory swap out, a large read/write delay is generated, resulting in high latency of the storage engine occasionally. RocketMQ addresses this by using optimization techniques such as memory preallocation, file preheating, Mlock system calls, and read/write separation to ensure that the Page Cache benefits while eliminating latency.

2.3 Optimization Results

RocketMQ successfully eliminated the high latency of writing messages by optimizing the above situation and passed the test of This year’s Double 11. The optimized message write time thermal diagram is shown in the following figure.

Optimized RocketMQ write message latency is 99.995% within 1ms and 100% within 100ms, as shown in the figure below.

3 Capacity guarantee three magic weapons

He did what he did, and the wind blew over the hills. He let him cross, the moon shines on the river. — Jiuyang true Heart method

The guarantee of low-latency optimization does not mean that the messaging engine can rest easy. To give applications a silky, smooth experience, messaging engines must be flexible in capacity planning. How to make the system laugh in the face of surging flood peaks? Downgrade, limit current, fuse three magic weapons have come into use. Discarding the pawn guarantees the vehicle, at the cost of degrading, suspending edge services, components to protect the resources of core services, the system is not destroyed by sudden traffic as the first priority. As is called, he is strong, he is strong, the wind blows the hills. He let him cross, the moon shines on the river! From an architectural stability point of view, with limited resources, there is a limit to what can be provided per unit of time. If the capacity is exceeded, the whole service may stop, the application Crash, and then the risk may be passed to the service caller, resulting in the loss of the service capacity of the whole system, and then trigger an avalanche. In addition, according to the queuing theory, the average response time of services with latency will increase rapidly as the number of requests increases. In order to ensure the SLA of services, it is necessary to control the number of requests per unit time. That’s why limiting the flow is increasingly important. The concept of Traffic Shaping is also called Traffic Shaping in academic circles. Originating in the field of network communications, the leaky bucket algorithm and token bucket algorithm are typical.

The basic idea of the leaky bucket algorithm is that there is a bucket (which leaks), water drips out at a constant rate, and water droplets from above (request) enter the bucket. If the rate at which the top drops enter exceeds the rate at which the water drops out, the bucket overflows, i.e., request overload. The basic idea of token bucket algorithm is that there is also a bucket, tokens are put into the bucket at a constant rate, the number of tokens in the bucket has an upper limit, each request will acquire a token, if a request comes and there are no tokens in the bucket, then the request is overloaded. Obviously, token buckets have the problem of sudden surges in requests.

Leaky buckets, token buckets, or any other variant of the algorithm can be regarded as speed limiting, as can RateLimiter in Guava, or TrafficShaping in Netty. In addition, there are limiting modes that control concurrency, such as Semaphore in the OPERATING system and Semaphore in the JDK. Asynchronous decoupling, peak-cutting, and valley filling are the capabilities of message engines. Try your Best itself is the original intention of its design (in RPC, application gateway, container and other scenarios, speed control should be the first choice for flow control). But even so, some necessary flow control needs to be considered. Unlike the previous ones, RocketMQ doesn’t have Guava, Netty and other out-of-the-box speed flow control components built into it. Instead, the slow requests are fault-tolerant by referring to queuing theory. Slow requests refer to those whose queuing time and service time exceed a certain threshold. In offline application scenarios, fault tolerance is to use the sliding window mechanism to slowly shrink the window to slow down the frequency and size of messages pulled from the server and reduce the impact on the server. In high frequency trading and data replication scenarios, a fast failure policy is adopted to prevent application avalanches caused by resource exhaustion of interlocking applications, reduce server pressure effectively, and ensure reliable end-to-end latency. Service degradation is a typical lose pawn security car, 20-80 principle practice. And the means of degradation is no more than closing, offline and other “simple and rough” operation. The choice of degraded target comes more from the definition of service QoS. The early processing of message engine for degradation mainly comes from two aspects, on the one hand, the collection of user data, and on the other hand, the service QoS setting of engine components. For the former, the application QoS data is pushed by the O&M management and control system. The following table is generally output. The service QoS of engine components, such as the link trace component for message problem tracing, is relatively low rated for core functions and can be turned off before the flood peak arrives.

When it comes to fuse, we have to mention the fuse in the classical power system. When the load is too large, or the circuit is faulty or abnormal, the current will continue to rise. In order to prevent the rising current may damage some important or valuable components in the circuit, burn the circuit or even cause a fire. The fuse will fuse and cut off the current when the current rises to a certain height and heat, thus protecting the safe operation of the circuit. Similarly, in a distributed system, if a remote service or resource is called that is unavailable for some reason, without this overload protection, the requested resource can be blocked waiting on the server, depleting the system or server resources. Most of the time, there may be a partial or small-scale fault in the system at the beginning. However, due to various reasons, the scope of the impact of the fault becomes larger and larger, resulting in global consequences. This overload protection is known as a Circuit Breaker. Netflix has opened source their circuit-breaker solution, Hystrix, to solve this problem.

The three diagrams above describe the system from its initial health state to a scenario where a critical dependent component is blocked downstream in a high-concurrency scenario. This situation can easily trigger an avalanche effect. By introducing Hystrix’s circuit breaker mechanism, apps fail quickly, thus avoiding the worst.

Borrowed from Hystrix, the middleware team developed a message engine circuit breaker mechanism. During the pressure test preparation period, there was a service failure due to machine hardware. With normal fault tolerance, there is a 30-second wait for unusable machines to be removed from the list. But with this circuit-breaker mechanism, abnormal services can be identified and isolated within milliseconds. Further improved engine availability.

4. High availability solutions

Those who were good at fighting in the past must be invincible before the enemy can win. You cannot overcome yourself, but you can overcome the enemy. Therefore, the good fighter can make the invincible, but cannot make the enemy invincible. Thus said: win, but not for. – sun tzu

Although there are three magic weapons of capacity guarantee as the basis, but with the continuous rise of the size of the message engine cluster, the possibility of machine failure in the cluster increases after reaching a certain degree, which seriously reduces the reliability of the message and the availability of the system. At the same time, the cluster mode based on multi-machine room deployment will also cause machine room disconnection and further reduce the availability of messaging system. To this end, Aliware focuses on the introduction of high-availability solutions based on multiple copies, dynamic recognition of machine failures, machine room disconnection and other disaster scenarios, to achieve automatic fault recovery; The recovery process is transparent to users without intervention of O&M personnel, which greatly improves the reliability of message storage and ensures the high availability of the entire cluster. High availability is an important feature that must be considered in the design of almost every distributed system. Consistency, availability and fault tolerance of partitions cannot be met simultaneously in distributed systems, and only two of them can be satisfied at most). Some common high availability solutions for distributed systems have been proposed, as shown in the following figure:

Where, rows represent common high availability solutions in distributed systems, including cold backup, Master/Slave, Master/Master, two-phase commit, and paXOS-based solutions. Columns represent the metrics of concern to distributed systems, including data consistency, transaction support, data latency, system throughput, data loss probability, and automatic recovery from failures. As can be seen from the figure, different solutions have different support levels for each indicator. Based on the CAP principle, it is difficult to design a high availability solution that can simultaneously meet the optimal values of all indicators. Taking Master/Slave as an example, it generally meets the following characteristics: 1) Slave is a backup of Master, and the number of slaves can be set according to the importance of data. The data write request matches the Master, and the data read request matches the Master or Slave. 2) After a write request matches the Master, data can be synchronized or asynchronously copied from the Master to the Slave. In synchronous replication mode, data is reported to the client only after data is written successfully to both the Master and Slave. In asynchronous replication mode, the Master writes data successfully to the client. Data is copied from the Master to the Slave synchronously or asynchronously. Therefore, the Master/Slave structure ensures data consistency. In asynchronous replication mode, data written to the Master is reported to the client successfully. Therefore, the system has low latency and high throughput, but the Master may lose data due to faults. If data is not lost when the Master fails in asynchronous replication mode, the Slave can Only wait for the Master’s recovery in read-only mode, which prolongs the system recovery time. On the contrary, synchronous replication in the Master/Slave structure increases the data writing delay and reduces the throughput of the system to ensure that data is not lost when a machine fails and the system fault recovery time is reduced.

RocketMQ high availability architecture

RocketMQ designs and implements a Master/Slave high availability architecture based on the original multi-room cluster deployment using distributed locking and notification mechanisms and the Controller component, as shown in the following figure:

As A distributed scheduling framework, Zookeeper must be deployed in at least three machine rooms A, B, and C to ensure its high availability. Zookeeper provides the following functions for the RocketMQ high availability architecture: 1) Maintaining PERSISTENT nodes and storing active and standby state machines; 2) Maintain the EPHEMERAL node to save the current state of RocketMQ; 3) Notify the observer when the status of the active and standby state machines and servers changes. RocketMQ uses the Master/Slave structure to implement peer-to-peer deployment of multiple equipment rooms. Write requests match the Master and are then copied to the Slave for persistent storage synchronously or asynchronously. The read requests of messages preferentially match the Master. If the disk pressure is heavy due to message accumulation, the read requests are transferred to the Slave. RocketMQ interacts directly with Zookeeper by: 1) Reporting the current status to Zookeeper as a temporary node; 2) Monitor the change of the active and standby state machines on Zookeeper as an observer. If the active and standby state machines change, change the current state based on the latest state machine. The RocketMQ HA Controller is A stateless component that reduces the system fault recovery time in the messaging engine high availability architecture. It is distributed in three machine rooms A, B, and C. Its main responsibilities are as follows: 1) Monitor RocketMQ status changes on Zookeeper. 2) Control the switchover between active and standby state machines based on the cluster status and report the latest active and standby state machines to Zookeeper. Due to the complexity of the system and the adaptation of the messaging engine to CAP principles, the RocketMQ high availability architecture uses a Master/Slave structure to provide low latency and high throughput messaging services, and uses a synchronous replication of Master and Slave to avoid message loss in the event of a failure. During data synchronization, an increasing globally unique SequenceID is maintained to ensure strong data consistency. In addition, an automatic fault recovery mechanism is introduced to reduce the fault recovery time and improve system availability.

5.1 Usability evaluation

System Availability is used by the information industry to measure the ability of an information system to provide continuous service. It refers to the probability that the system or a system capability can work normally in a specific environment within a given time interval. Simply put, availability is the sum of mean time between failures (MTBF) divided by mean time between failures (MTBF) and mean time to repair (MTTR), i.e. :

Generally, N nines are used to represent system availability in the industry. For example, 99.9% represents the availability of three nines, which means that the annual unavailable time is less than 8.76 hours. 99.999% represents the availability of five nines, which means that the time of unavailability must be guaranteed within 5.26 minutes throughout the year. A system without automatic fault recovery mechanism will be difficult to achieve the high availability of five nines.

5.2 RocketMQ high availability guarantee

According to the availability calculation formula, in order to improve the availability of the system, it is necessary to ensure the robustness of the system to prolong the average failure time, and further strengthen the automatic fault recovery ability of the system to shorten the average fault recovery time. The RocketMQ high availability architecture designs and implements the Controller component, which transitions from a finite state machine of single master state, asynchronous replication state, semi-synchronous state, and finally synchronous replication state. In the final synchronous replication state, if any of the Master and Slave nodes fails, the other nodes can switch to the single Master state within seconds. Compared with manual restart to restore services, the RokcetMQ high availability architecture enables automatic fault recovery, which greatly reduces the average fault recovery time and improves system availability.

The following diagram depicts the transition of finite state machines in the RocketMQ high availability architecture:

1) After the first node is started, the Controller state machine switches to the single Master state and notifies the starting node to provide services as Master. 2) After the second node is started, the Controller controls the state machine to switch to the asynchronous replication state. The Master asynchronously replicates data to the Slave. 3) When the Slave data is about to catch up with the Master, the Controller controls the state machine to switch to semi-synchronous state, and any write request that hits the Master will be held until the Master asynchronously copies all the differential data to the Slave. 4) When the Slave completely catches up with the Master in the semi-synchronous state, the Controller switches the state machine into synchronous replication mode, and Mater starts to synchronize data to the Slave. In this state, if any node fails, other nodes can switch to the single active state within seconds to continue providing services. The Controller component controls RocketMQ to switch state machines in the order of single master, asynchronous replication, semi-synchronous, and synchronous replication. The residence time of the intermediate state is related to the data difference between the master and the slave and the network bandwidth. However, the intermediate state stabilizes in the synchronous replication state.

Looking forward to

Although ali middleware message engine has experienced so many years of harsh tests of online comparable conditions, there is still room for optimization. For example, the team is trying to further reduce message low-latency storage by optimizing storage algorithms, cross-language calls and other strategies. In the face of the mobile Internet of things, big data, VR emerging scenarios, such as in front of the open and commercial ecological worldwide, team set out to build 4th generation messaging engine, QoS, multi-stage agreement across a network, across different terminals, language support, online application oriented lower response time, higher throughput for offline applications, adhere to the open source, back to the idea of open source, Believe RocektMQ is moving towards a healthier ecology.

reference

[1] Ryan Barrett. Snarfed.org/transaction… [2] www.slideshare.net/vimal25792/… [3] systemdesigns.blogspot.com/2015/12/rat… [4]Little J D C, Graves S C. Little’s Law [M]//Building Intuition. Springer US, 2008: The 81-100. [5] access.redhat.com/documentati… [6] highscalability.com/blog/2012/3… [7] www.azul.com/files/Enabl…