Author | Shao Shu

Review & proofreading: years, jia Jia

Editing & Typesetting: Wen Yan

preface

Message queue is an important infrastructure of distributed Internet architecture and has important applications in the following scenarios:

  • The application of decoupling
  • Peak peel
  • Asynchronous notification
  • Distributed transaction
  • Big data processing

It also involves interactive live broadcasting, mobile Internet & Internet of Things, IM real-time communication, Cache synchronization, log monitoring and other fields.

This article focuses on comparing the commercial version of the message queue RocketMQ with the open source version, and demonstrates some of the best practices in the upper cloud for large distributed applications with some real-world scenarios.

Core competence

The commercial version of the message queue RocketMQ has several advantages over the open source version and its competitors.

  1. Out of the box, rich in features
  2. High performance, unlimited expansion capability
  3. Observable and o&m capability
  4. High SLA and stability

Out of the box, rich in features

Message queue RocketMQ supports multiple types of messages, including timed, transactional, and sequential messages. It also supports broadcast and cluster consumption modes. In addition, at the protocol level, TCP/HTTP multi-protocol support is provided, and TAG/SQL attribute filtering function is also provided, which greatly expands the usage scenarios of users.

High performance, unlimited expansion ability

Message queue RocketMQ has stood the test of alibaba’s core e-commerce business on November 11, supporting the ability to send and receive tens of millions of TPS messages and stack hundreds of millions of TPS messages. It also provides the end-to-end delay guarantee of milliseconds for messages. It also provides tiered storage to support the arbitrary storage time of massive messages.

Observable and o&m capability

Message queue RocketMQ provides an observability platter, supports fine-grained data platters, provides whole-link message lifecycle tracking and query capabilities, and provides corresponding monitoring and alarm functions for each indicator; In addition, it also provides message backtracking and dead-letter queue functions to ensure that users’ messages can be backtracked and consumed at any time.

High SLA and stability assurance

The stability of message queue RocketMQ is an important area where we consistently, consistently and consistently invest, providing high availability deployment and multi-copy write capabilities; In addition, same-city MULTI-AZ Dr And remote multi-live Dr Are supported.

Product profile

Next, we will select several sections from the above product core competencies and make further introductions based on specific scenarios and practices.

Multiple message type support

Highly available sequential messages

Commercial version message queue RocketMQ uses what we call highly available sequential messages. Before introducing highly available ordering messages, let’s take a quick look at the ordering messages of the open source version of RocketMQ.

There are two types of sequential messages, global sequential messages and partitioned sequential messages.

  • Global ordering messages: Only one partition is allocated in the RocketMQ storage tier, meaning that the availability of global ordering topics is strongly related to the availability of single replicas and is not scalable.
  • Partitioned order messages: All messages are partitioned according to Sharding keys. Messages within the same partition are published and consumed in a strict FIFO order. Sharding Key is a Key field used to distinguish partitions in sequential messages.

The following figure shows the application scenario of partitioned sequential messages, where order ID is the Sharding Key of the sequential messages.

As you can see, both in the global order messages or zoning sequence, rely on a single partition natural FIFO features to ensure that order, so the sequentiality can only guarantee within the same partition, when the copy is not available, in which the partition order message does not have the ability to retry to other copies, the sequence of messages at this time it will be difficult to guarantee.

To solve this problem, we designed and implemented highly available sequential messages.

Highly available sequential messages have the following characteristics:

  • There are multiple physical partitions under a logical sequential PartitionGroup.
  • If any of the physical partitions is writable, the entire logical partition is writable and ordered.
  • We design a sorting algorithm based on partition sites based on the principle of HAPPENED -before.
  • According to the algorithm, when consumers consume a logical partition, they will pull messages from each physical partition to which they belong and merge them to sort, so as to get the correct message sequence flow.

With this design, highly available sequential messages solve the following problems:

  • Availability issues: High-availability sequential messages will have the same availability as normal messages and can be quickly retried to another copy if one copy is unavailable.
  • Extensibility issues: Ordinary sequential messages, especially ordinary global sequential messages, are not well extensible and can only be fixed in a specific copy. Logical sequential partitioning of highly available sequential messages can spread physical sequential partitioning across multiple copies.
  • Hot spot: Common sequential messages Hash a type of message to the same partition based on a Key. Hot spot keys cause hot spot partitions. High-availability sequential messages can scale horizontally and add multiple physical partitions to logical sequential partitions to eliminate hot spots.
  • Single point of failure: Common global order messages contain only a single partition and are prone to single point of failure. Highly available order messages can eliminate the single point of failure of global order messages.

In particular, attention should be paid to the hot issue. When a certain e-commerce business within Alibaba is greatly promoted, due to the excessive number of a specific ShardingKey sent to the sequence Topic, a copy in the cluster receives a large number of messages of this ShardingKey, resulting in the copy exceeding its load limit. As a result, messages are delayed and piled up, which affects business to a certain extent. After the use of high-availability sequential messages, due to its load balancing characteristics in multiple physical partitions, the load-carrying capacity of cluster sequential messages is improved, thus avoiding the occurrence of hot issues.

Precise timing message at the second level

Scheduled messages are messages sent by the client but expected to be received at a certain time in the future. Timing messages are widely used in various scheduling systems or service systems. For example, to pay an order, a payment message is generated, and the system usually needs to process the message after a certain period of time to judge whether the user has paid successfully, and then the system does the corresponding processing.

The open source version of RocketMQ only supports a few specified latency levels and does not support timed messages with second-level accuracy. However, due to the diversified demands within the group and on the cloud, the open source version of timing message cannot meet our needs, so we launched the second-level precise timing message.

As shown in the figure below, we designed and realized the second-level precise timing message supporting any timing time based on the time wheel, and it also met the following characteristics:

  • Arbitrary timing time
  • Super long timing time
  • Massive timing message
  • Deleting a Scheduled Message
  • High availability
  • A high performance

An internal user expects to process such a timing request in a certain 30 seconds in the future. The timing message of the open source version does not meet his requirements, but the precise timing message of the second level satisfies his service requirements while ensuring high availability and high performance.

Distributed transaction messages

As shown in the figure below, in traditional transaction processing, the interaction between multiple systems is coupled to one transaction, resulting in a long corresponding time of the whole and a complex rollback process, thus potentially affecting the availability of the system. RocketMQ provides distributed transaction functionality that enables distributed transactions while ensuring loose coupling and ultimate data consistency.

Message queue RocketMQ provides the following transaction message processing steps:

  • The sender sends a semi-transactional message to the RocketMQ version of the message queue server.
  • Message queue After the RocketMQ server persists the message successfully, it sends an Ack to the sender to confirm that the message has been successfully sent. In this case, the message is a semi-transaction message.
  • The sender starts executing the local transaction logic.
  • According to the local transaction execution result, the sender submits a secondary confirmation (Commit or Rollback) to the server. When the server receives the Commit status, the semi-transaction message is marked as deliverable, and the subscriber finally receives the message. The semi-transaction message is deleted when the server receives the Rollback status and will not be accepted by the subscriber.

Based on this implementation, we implement the distributed transaction feature through messages, where the execution result of a local transaction ultimately reflects whether the subscriber can receive the message.

Message queue RocketMQ distributed transaction message is widely used in Alibaba core transaction link. Through distributed transaction message, the minimum transaction unit is realized. A transaction is formed between the transaction system and the message queue. Downstream systems (shopping carts, credits, others) are isolated and processed in parallel.

Hierarchical storage

background

With the increasing number of customers in the cloud, storage becomes an important bottleneck for RocketMQ operations, including but not limited to:

  • The server cannot cache all user data in the memory because the memory size is limited. In a multi-tenant scenario, when a user pulls cold data, I/O pressure is high on disks, affecting other users in the shared cluster. Therefore, hot and cold data must be separated.
  • There is a requirement on the cloud for a single tenant to customize the message storage duration. Messages from all users in RocketMQ Broker are stored in a contiguous file, and the storage duration cannot be customized for any single user, that is, the existing storage structure cannot meet this requirement.
  • Providing a cheaper way to store massive amounts of data could significantly reduce the cost of disk storage for RocketMQ in the cloud.

Based on the above situation, the storage tiering scheme emerges.

architecture

The overall architecture of DST is as follows:

  1. The Connector node is responsible for synchronizing messages on the Broker to OSS in real time
  2. The historyNode node forwards user pull requests for cold data to OSS
  3. In OSS, the file structure is organized by Queue granularity, that is, each Queue is stored by a separate file, which ensures that the message storage duration can be defined for tenants.

Through this design, we achieve hot and cold separation of message data.

Usage scenarios

Based on tiered storage, we further expand the usage scenarios of users:

  1. Custom storage time: After separating hot and cold message data, we store cold data in storage systems such as OSS to achieve user-defined storage time.
  2. Message audit: After the storage of messages is extended from days to custom, the attributes of messages are changed from temporary transit data to data assets of users, and the message system is transformed from data center to data warehouse. Users can implement a wide variety of auditing, analysis, and processing functions based on the data warehouse.
  3. Message playback: In the flow computing scenario, message playback is a very important scenario. By extending message storage time, stream computing can achieve richer computational analysis scenarios.

The stability of

The stability of message queue RocketMQ is an important area where we consistently, consistently and consistently invest. Before introducing our latest work on stability, let me take you through the evolution of the RocketMQ high availability architecture.

Highly available architecture evolution

In 2012, RocketMQ was launched as a new generation of Messaging engine for Alibaba, and was subsequently opened to the community, resulting in the first generation of RocketMQ high availability architecture. As shown in the following figure, the first-generation HA architecture adopted the popular master-slave master-slave architecture. Write traffic passed through the Master node and read traffic passed through the Master node and consumption records were synchronized to the Slave node. When the Master node is unavailable, the entire copy group is readable but not writable.

In 2016, RocketMQ cloud products were commercialized. In the cloud era, single points of failure are frequent, and cloud products need to be designed for failure. Therefore, RocketMQ launched the second-generation multi-copy architecture, which relies on Zookeeper’s distributed locking and notification mechanism. The Controller component is introduced to monitor the status of the Broker and switch between the active and standby state machines. When the active Broker is unavailable, the standby Broker automatically switches to the active one. The second generation architecture is the core high availability architecture in the process of cloud product scale, and has made great contributions to cloud product scale.

In 2018, the RocketMQ community was enthusiastic about the introduction of distributed protocols for Paxos and Raft. The RocketMQ development team launched a Dledger storage engine based on the Raft protocol in the open source community with native support for Raft multiple copies.

The RocketMQ high availability architecture has gone through three generations, and we have found that all three of these high availability architectures have some drawbacks in our practice in diverse scenarios of corporate, public, and private clouds:

  • The first-generation active/standby architecture only performs cold backup, and the active/standby switchover requires manual intervention, resulting in a large waste of resources and o&M costs in large-scale scenarios.
  • In the second generation architecture, Zookeeper and Controller nodes are introduced, which is more complex in architecture. Active/standby switchover is automated, but failover takes a long time, usually about 10 seconds to complete master selection.
  • The third generation OF Raft architecture has not been applied in the cloud and Ali Group on a large scale, and Raft protocol determines the need to choose the master, the new master also needs to be discovered by the client route, the whole failover time is still long; In addition, the strongly consistent Raft release does not support flexible degradation strategies that allow for flexible trade-offs between availability and reliability.

In order to cope with the growing business scale on the cloud, more stringent SLA requirements, and complex and changeable proprietary cloud deployment environment, the current messaging system needs a simple architecture, simple operation and maintenance, and a solution based on the landing path of the current architecture, which we call the second-level RTO multi-copy architecture.

New generation of second level RTO multi-copy architecture

Second RTO multi-replica architecture is a new generation of high availability architecture designed and implemented by messaging middleware team, including replica composition mechanism, Failover mechanism, intrusive modification of existing components, etc.

The whole copy group has the following characteristics:

  • Strong Leader/No Election: The Leader determines at deployment time that No switch will occur during the life cycle, but can be replaced in the event of a failure.
  • Only the Leader supports message writing: Each copy group accepts message writing only by the Leader. When the Leader is unavailable, the entire copy group cannot be written.
  • All replicas support message reading: Although the Leader has a full number of messages and the Follower has an unequal number of messages, all replicas support message reading.
  • Flexible number of copy groups: The number of copy groups can be freely selected based on reliability, availability, and cost.
  • Flexible Quorum: Eventually all messages will be synchronized to the entire copy group, but the minimum number of copies successfully written can be flexibly configured within the copy group. For example, in 2-3 mode, if the two copies are successful, the data is written successfully. At the same time, in the event that a copy is not available, the Quorum can dynamically degrade itself.

Under the concept of a replica group, failover can be done by reusing the mechanisms of the current RocketMQ client. As shown below:

  1. Producer flexibly and quickly switches to another copy group when the master is unavailable.
  2. A Consumer can quickly switch to another copy of the same copy group for message consumption if one copy is unavailable.

observability

Health market

We’ve also done a lot of work on observability, providing users with a messaging system of observable sexual health data platters. As shown in the following figure, users can clearly see various monitoring data at the instance level, topic level, and group level to comprehensively monitor and diagnose problems.

Message link tracing

In addition, we also provide message track tracking function based on message track. As shown in the figure below, users can see the complete message life cycle on the console, from message sending, storage, to consumption, the entire link can be completely recorded.

Application scenarios

Customer pain points: Users with accumulated consumption need to sample data based on message track and make comprehensive analysis to roughly determine the cause of the problem and troubleshoot difficulties.

Core value: Improve the efficiency of troubleshooting online operation problems and the accuracy of problem location. Directly find the Topic and Group with the highest risk in the healthy market, and quickly locate the cause according to the changes of each indicator. For example, if the message processing time is too long, you can expand the capacity of the consumer machine or optimize the consumption service logic. If the failure rate is too high, you can quickly view logs to troubleshoot the fault.

event-driven

You must be familiar with Gartner. In a 2018 evaluation report, Gartner listed event-driven Models as one of the top 10 strategic technology trends for the future and made two predictions:

  • In 2022, over 60% of new digital business solutions will use event notification software models.
  • By 2022, more than 50 percent of business organizations will participate in the EDA ecosystem.

In the same year, THE CNCF Foundation also proposed CloudEvents, which aims to standardize the event communication protocol between different cloud services. CloudEvents has also released several binding specifications for messaging middleware to date.

It can be seen that event-driven is an important trend of future business systems, and messages naturally have proximity to events, so message queue RocketMQ firmly embraces event-driven.

Speaking of messages and events, here’s a quick note: Messages and events are abstractions of two different forms, meant to satisfy different scenarios:

  • Messages: Messages are more general abstractions than events and are often used for asynchronous decoupling between microservice invocations, which are asynchronized by messages when service capabilities are not equal. The content of a message is often bound to strong business attributes, and the sender of the message has clear expectations about the message processing logic.
  • Events: Events are more symbolic than messages, representing the sending of things, conditions, and state changes; Event sources come from different organizations and environments, so the event bus naturally needs to cross organizations; Event sources have no expectation of how events will be responded to, so event-based application architectures are more completely decoupled and more extensible and flexible.

In 2020, ali cloud released EventBridge, an event bus product, whose mission is to serve as the hub of CloudEvents, connect cloud products and cloud applications with standardized CloudEvents 1.0 protocol, and provide centralized event governance and driving capabilities. Help users easily build loosely coupled, distributed event-driven architectures; In addition, there are a large number of vertical SaaS services in the cloud market outside AliYun. EventBridge will help customers build a complete, event-driven, efficient and controllable new interface to the cloud with its outstanding integration and being integrated across products, organizations and clouds.

With EventBridge’s event source function, we can link messages to events, enabling message queue RocketMQ to embrace the whole event ecosystem with event-driven power. Next we will use an example, as shown below, to demonstrate this functionality.

Create message queue RocketMQ topic

Create the target service

We quickly create an event-driven service based on the container service, calculate the YAML of the load Deployment as follows, and the service can respond to events and print the results to the standard output.

ApiVersion: apps/v1 # for versions before 1.8.0 use apps/v1beta1 kind: Deployment metadata: name: eventbridge-http-target-deployment labels: app: eventbridge-http-target spec: replicas: 2 selector: matchLabels: app: eventbridge-http-target template: metadata: labels: app: eventbridge-http-target spec: containers: - name: Eb-http-target # The following image exposes the HTTP address (/cloudevents) used to receive cloudevents. https://github.com/aliyuneventbridge/simple-http-target image: registry.cn-hangzhou.aliyuncs.com/eventbridge-public/simple-http-target:latest ports: - containerPort: 8080Copy the code

Go to the Container Services console, access the Services page of Services and Routing, create a private network access Service, and perform port mapping.

Create the EventBridge custom bus

Let’s go to the EventBridge console and create a custom bus, Demo-with-K8s.

Create event bus EventBridge custom bus rules

We create a rule for bus Demo-with-k8s, select HTTP as the event target, select the private network type, select the corresponding VPC, VSwitch and security group, and specify the target URL, as shown in the following figure:

Create the eventBus EventBridge event source

We add a custom event source for the RocketMQ version of the message queue to this custom event bus.

Send a RocketMQ message

Next we go back to the Message queue RocketMQ console and use the console’s quick experience message production capability to send a message with the content Hello EventBridge to the corresponding topic.

Then we can see that the RocketMQ message is delivered to the service as CloudEvent, and we are able to link the message to the event. At the same time, based on the tiered storage capabilities we mentioned above, message queue RocketMQ is transformed into a data warehouse capable of providing a continuous stream of events, providing a broader scenario for the overall event ecosystem.

Event-driven is an important trend in the future of business organizations and business systems, and message queue RocketMQ firmly embraces this trend by integrating messages into the ecosystem of events.

conclusion

We selected several product profiles of Message queue RocketMQ, from multi-message types and tiered storage to stability, observability, and future-oriented event-driven, combined with a comparison with open source RocketMQ and an analysis of specific application scenarios. Presents cloud best practices for large distributed applications based on message queue RocketMQ.