I. Project background

RocketMQ (MQ) is widely used as messaging middleware in transaction management, asynchronous decoupling, peak load balancing, and data synchronization applications. When the business system performs grayscale publishing, Dubbo and HTTP calls can be implemented on our micro-service governance and gateway platform based on the industry’s general grayscale method. However, the existing grayscale schemes of MQ cannot completely solve the problem of message isolation and switch connection. Therefore, We added an extended implementation of MQ grayscale functionality to the Luban MQ platform, which includes extensions for root cause analysis, resource management, subscription validation, delay optimization, and so on.

RocketMQ technology features

Why has MQ’s grayscale scheme not been implemented? Let’s start by reviewing some of the core technical points of RocketMQ.

2.1 Brief introduction to the storage model

(Figure 2.1 Storage model for MQ)

CommitLog: The place where the message body is actually stored. When any business message is sent, it is ultimately stored on the CommitLog. MQ is deployed in a cluster (which is also concise and does not involve the master/slave part) where the same business messages fall on only one Broker node in the cluster. The commitLog on this Broker stores messages routed to it for all topics, and a new commitLog is generated when the message volume reaches 1 GIGAByte.

Topic: Message Topic, which represents a logical collection of a class of messages. Each message belongs to a single Topic, which contains multiple messages and is the basic unit of MQ’s messaging subscription. Is a level 1 message type with a heavy emphasis on business logic design.

Tag: Message Tag, a secondary message type. Each specific message can optionally be attached with a Tag, which is used to distinguish the message types in the same Topic. For example, order Topic, Tag= TEL can be used to distinguish mobile phone orders, and Tag=iot can be used to represent smart devices. When a producer sends a message, a specific Tag can be assigned to the message, and the consumer can subscribe to the Tag of interest from the Broker, rather than the entire message.

Queue: Actually Topic is more of a logical concept for us to use. At the source level, topics are distributed across multiple brokers in the form of queues. A Topic often contains multiple queues. There is only one Queue for topics with global ordering messages, so global ordering is guaranteed. Queues are mapped to commitlogs. Can be interpreted as an index of messages, and messages can only be found by specifying a specific Queue for a Topic. Partition for those of you familiar with Kafka.

Consumer group and its ID: Represents a class of Producer or Consumer that generally produces or consumes messages in the same application domain and the logic of message production and consumption is consistent. Each consumer group can be identified by defining the global dimension one GroupID, which represents the consumer group. Different consumption groups are isolated from each other during consumption, which does not affect each other’s consumption site calculation.

2.2 Message sending and consumption

(Figure 2.2 Message sending and pulling model)

2.2.1 Client Id

For each instance of an MQ client run in a producer or consumer cluster, the client of MQ is guaranteed to produce a unique ClientID. Note: The ClientID of an application instance that acts as both producer and consumer is actually the same value.

2.2.2 Sending Messages

When a message is sent to a Topic, metadata about the Topic is first obtained, including how many queues it has and which Broker each Queue belongs to. By default, the sender will select a Queue to send the current message. The algorithm type is polling, which means that the next message will select another Queue to send. In addition, MQ also provides a method to specify a Queue or to customize a Queue for sending messages. To customize a Queue, you need to implement the MessageQueueSelector interface.

2.2.3 Message consumption

When consuming messages, consumers in the same consumer group (GroupID) subscribe to a Topic. The consumers first get the metadata of the Topic, that is, they get information about all queues for that Topic. These queues are then allocated to each client (ClientID) according to the rules. Each client calculates the offset of the message to be pulled and generates a PullRequest based on the allocated Queue. After the message is pulled and consumed, the client generates an ACK and updates the consumption progress.

As shown in Figure 2.3, if 1 and 5 in a batch of messages are not consumed, the remaining messages have been consumed, and the updated offset is still 1 at this time. If the consumers restart after downtime, they will consume messages from 1, and then messages 2, 3 and 4 will be consumed repeatedly.

(Figure 2.3 Consumption progress update diagram)

So RocketMQ only guarantees that messages will not be lost, not reused, and that idempotent messages need to be implemented by the business itself.

In addition, the consumer can specify certain Tag messages to be consumed. When the pullRequest is pulled, it will be quickly filtered by the hash value of the Queue index of the storage model in the Broker and returned to the consumer, who will also accurately filter the returned messages.

2.3 Subscription consistency

In the consumer side, the same consumer groups (GroupID, same described in this section is the premise of the same consumer groups) of each application instance MQ client needs to keep the subscription the consistency of the relationship, the so-called subscribe to the consistency of the relationship is within the same consumer groups all subscribe to the Topic by the client and the Tag must be completely consistent, if the subscription relationship within the group, The logic of message consumption can be confused and even lead to message loss.

2.3.1 Maintenance of subscription relationships

There is a separate ClientID for each application instance MQ client. Let’s briefly explain subscription maintenance:

  • Each MQ consumer client with its ClientID sends heartbeat packets to all brokers containing specific topics & tags to which the client is subscribed. The registerConsumer method stores clients grouped by consumer groups. ClientID information within the same consumer group is in the same ConsumerGroupInfo;

  • Broker in the client receives any consumption after sending a heartbeat packet, in ConsumerManager ConcurrentMapconsumerTable in class according to the consumer group name GroupID as key store different consumption group information, The subscription information of the same consumer group will be updated with the current subscription Topic & Tag information every time the heartbeat packet is received. This means that only the latest heartbeat packet subscription information is kept. The Broker only stores subscriptions in the latest heartbeat packet, regardless of which ClientID in the consumer group the heartbeat packet comes from. (Because Tag is dependent on Topic attributes, and it is treated differently by the Broker when Topic and Tag subscriptions are inconsistent, which can be seen with updateSubscription method.)

2.3.2 Impact of Inconsistent Subscription

Examples are used here to illustrate some of the problems associated with inconsistent subscriptions. Suppose clientA subscribes to TOPIC_A and clientB subscribes to TOPIC_B within the same group of consumers. TOPIC_A and TOPIC_B both have four queues. The result of consumption allocation is as follows:

(Table 2.1 Queue allocation results of message feers)

Since clientB does not subscribe to TOPIC_A, and clientA does not subscribe to TOPIC_B, messages in Queue-2 and queue-3 in TOPIA_A and queue-0 and queue-1 in TOPIC_B cannot be consumed. In the actual process, clientA cannot consume TOPIC_B messages normally. In this case, the client will see a lot of exceptions. Some Queue messages cannot be consumed and accumulate.

In addition, in a subscription relationship, when different clients subscribe to different tags, a mismatch can occur between the pulled message and the message to be subscribed to.

Three, the industry MQ grayscale scheme

(Figure 3.1 Gray level call diagram)

Generally, business grayscale only strictly guarantees calls between RPC services, and loss or error of some message grayscale traffic can be tolerated. As shown in Figure 3-1, grayscale messages generated by V_BFF will be received and randomly consumed by the normal version and grayscale version of V_TRADE, resulting in partial grayscale traffic not entering the expected environment. However, the call of the whole RPC service still isolates grayscale and non-grayscale environments. The grayscale of MQ must be implemented when the business makes changes to the logic of message consumption or does not want grayscale messages to affect data on the line.

Due to the restriction of subscription relationship, the current MQ grayscale scheme implemented in the industry is implemented with different GroupID between the normal version and grayscale version. The following schemes all use different GroupID.

3.1 Shadow Topic scheme

Create a new set of topics to handle gray-isolated messages. For example, for TOPIC_ORDER, TOPIC_ORDER_GRAY would be created for use in a grayscale environment.

The sender writes grayscale messages to the shadow Topic as it sends. When consuming, the grayscale environment only uses grayscale groups to subscribe to grayscale topics.

3.2 Tag scheme

When sending, the sender adds grayscale identification to the Tag of the message generated in the grayscale environment. The consumer only subscribes to the grayscale Tag message when the grayscale version is published, while the normal version subscribes to the non-grayscale Tag.

3.3 Scheme of UserProperty

When sending, the sender adds a grayscale identifier to the UserProperty of the message generated in the grayscale environment. The consumer’s client needs to be rewritten to filter according to UserProperty. The normal version skips such messages and the grayscale version processes grayscale messages.

3.4 Defects of the current scheme

The advantages of the above three schemes are not compared here, but they all have the following common defects (there are also other defects or development appeals, but they are not fatal), which can not really realize the message loss processing when the gray state switches back to normal state, resulting in the whole gray scheme is a process from entry to abandonment:

  • Because of the use of different consumer groups, so after the gray version verification, how to correctly link back to the original normal version of the consumer group consumption displacement, do not lose information processing efficiently?

  • How to ensure accurate consumption of grayscale messages, so that the message falls on the grayscale identifier to achieve efficient information processing?

  • When grayscale is turned on, where does the grayscale message point start? How are state details managed?

Four, luban MQ platform gray scheme

In essence, the core of MQ grayscale problem is to effectively separate grayscale and non-grayscale messages, consumers according to their own needs to accurately obtain the corresponding version of the message; When the gray scale is completed, the message displacement can be correctly spliced back, so as not to lose the necessary message processing, that is, the management of state details. In order to achieve this goal, the program has been transformed in the following points.

The code involved in this scheme is test code, mainly used to illustrate the scheme, the actual code will be more refined.

4.1 Use of Queue isolation

(Figure 4.1 Differentiated use of queues)

We have already known that Queue is the actual execution unit of topic. Our idea is to use Queue to distinguish V1 (normal) messages from V2 (grayscale) messages. We can fix the first and last two [configurable] queues for sending and receiving grayscale messages, and the rest queues for sending normal online messages. We use the same consumer group (that is, different from the general scheme in the industry, we will use the same GroupID), so that gray consumers participate in the rebalancing of gray Queue, and non-gray consumers participate in the rebalancing of non-gray Queue.

Here we address the storage isolation of messages.

4.2 Broker subscription transformation

The grayscale version often needs to change Topic or Tag. Since we did not add an independent grayscale consumer group, when the grayscale version changes Topic/Tag, the subscription relationship in the consumer group will be inconsistent. The principle of consistency of subscription relationship has also been briefly explained in the previous article, and we need to make corresponding transformation in the Broker. To compatible grayscale and non-grayscale subscription relationship is inconsistent.

The subscription information of the same consumer group is maintained in the subscriptionTable of ConsumerGroupInfo. You can add a graySubscriptionTable to the ConsumerGroupInfo to store the grayscale version of the subscription information. The heartbeat packets sent by the client to the Broker are modified to have their own grayFlag, which determines whether the subscription relationship is stored in subscriptionTable or graySubscriptionTable. When pulling the message, grayFlag is also passed to the Broker to select whether to get the subscription information from subscriptionTable or graySubscriptionTable.

Here we solve the problem of consumption subscription consistency.

4.3 Reform of Producer

The transformation of sender is relatively simple. It only needs to determine whether the message sent is gray message, and deliver gray message to a specified number of gray Queue by implementing MessageQueueSelector interface. Here we define grayQueueSize for grayscale in the configuration center. Currently it is more common to use the specified Queue number of the Broker as grayscale.

There are 6 queues in TOPIC_V_ORDER, as shown in Figure 4.2. Grayscale messages are only sent to Queue 0 and Queue 5 at the beginning and end, while non-grayscale messages are selected to send messages to the remaining 4 queues.

(Fig.4.2 Sending results)

Here we solve the problem of correct delivery by the producer.

4.4 Transformation of Consumer

The transformation point involved by the consumer is mainly the redistribution strategy of grayscale Queue and non-grayscale Queue, and the update and synchronization of grayFlag on each client.

The core of gray scale rebalancing strategy is to process gray scale and non-gray scale Queue by classification. Gray scale Queue should be allocated to gray scale ClientID and non-gray scale Queue to non-gray scale ClientID. Therefore, before rebalancing, Namesrv is used to obtain the latest grayFlag for all clientId clients in the same group (that is, the status is recorded to Namesrv).

When the grayscale version needs to be changed to the online version, each client will synchronize grayFlag to Namesrv. At the same time, in order to avoid unfinished consumption of grayFlag messages, it will first determine whether there are unconsumed messages in grayFlag Queue before updating grayFlag. GrayFlag will be updated only after the grayscale message consumption is guaranteed.

Consumers need to use AllocateMessageQueueGray as the rebalancing strategy, the number of gray Queue passed in, gray consumers setGrayFlag is true, it can be seen that only messages from Queue 0 and Queue 5 at the beginning and end are consumed. For non-grayscale consumers, setGrayFlag is set to false, so it can be seen that only the messages in the middle four queues will be consumed. The allocation result of queues can also be clearly seen on the console. V2 client with grayFlag set to true allocates the first and last queues. V1 clients with grayFlag false are allocated to the middle four queues.

(Figure 4.3 Consumption and subscription results)

When the grayscale version needs to switch to the online version, just call updateClientGrayFlag to update the status. It can be seen that after calling updateClientGrayFlag, the two grayscale clients of original V2 consume the messages of grayscale Queue, GrayFlag becomes false [saved in namesrv] and is added to the rebalance of the 4 non-grayscale queues in the middle. The original 2 grayscale queues are not subscribed by consumers.

(grayFlag update figure 4.4)

Here we solve the problem of detail control handling for state switching.

4.5 Namesrv modification

As mentioned above, consumers need to obtain grayflags of all clients in the group during rebalancing. Therefore, we need a place to store grayFlags persistently, which is accessible to every consumer. We choose Namesrv to store this information.

  • Namesrv is relatively light and has good stability.

  • The consumer itself will establish a long connection with the Namesrv. If the Namesrv fails, the consumer will automatically connect to the next Namesrv until there is a connection available.

  • The Broker is the place where the messages are actually stored and has a relatively high operating pressure. Synchronizing gray data will increase the pressure on the Broker to some extent.

But Namesrv itself is stateless node, there is no information synchronization between nodes, the consistency of gray data needs to be guaranteed by the database, Namesrv common access to the same set of database, database persistent storage of gray information, each update of V1, V2 gray state, Use Namesrv to modify the database data. Before each rebalance, use Namesrv to pull the grayscale state of all instances in the consumer group.

(Figure 4.5 Namesrv grayscale data storage diagram)

Here we solve the problem of state storage and synchronization.

Five, gray scene calibration

Test is to verify the feasibility of the truth of the scheme, the following with a simple demo to verify the LUban platform MQ grayscale scheme.

5.1 Grayscale version Topic & Tag remains unchanged

This scenario was verified in 4.3 and 4.4 and will not be described again.

5.2 Gray version Topic added

Assuming that the subscription information of V1 and V2 is shown in Table 5.1, the subscription results of Topic are shown in Figure 5.1. TOPIC_V_ORDER is subscribed by V1 and V2 at the same time. The first and last two queues are allocated to the grayscale V2 client, and the middle four queues are allocated to the non-grayscale V1 client. TOPIC_V_PAYMENT is subscribed to v2 only, so only the first and last two queues are allocated to v2 clients, and the remaining four queues are not subscribed to v2 clients. We send 4 non-grayscale messages and grayscale messages respectively to TOPIC_V_ORDER, and 4 grayscale messages to TOPIC_V_PAYMENT. As can be seen from Figure 5.2, the non-grayscale messages in TOPIC_V_ORDER are successfully consumed by the two clients of V1. The grayscale messages of TOPIC_V_ORDER and TOPIC_V_PAYMENT are successfully consumed by the two clients of V2.

(Table 5.1 Subscription Information Table)

(Figure 5.1 Subscription results)

(Figure 5.2 Consumption Results)

5.3 Grayscale version Topic reduced

Assuming that the subscription information of V1 and V2 is shown in Table 5.2, the subscription results of Topic are shown in Figure 5.3. TOPIC_V_ORDER is subscribed by V1 and V2 at the same time. The first and last two queues are allocated to the grayscale V2 client, and the middle four queues are allocated to the non-grayscale V1 client. TOPIC_V_PAYMENT is subscribed to only the non-grayscale version v1, so only the middle four queues are allocated to v1 clients, and the first and last two queues are not subscribed to by clients. We send 4 non-grayscale messages and grayscale messages to TOPIC_V_ORDER and 4 non-grayscale messages to TOPIC_V_PAYMENT respectively. It can be seen from Figure 5.4 that the non-grayscale messages of TOPIC_V_ORDER and TOPIC_V_PAYMENT are successfully consumed by the two clients of V1. The grayscale message in TOPIC_V_ORDER is successfully consumed by the two clients of V2.

(Table 5.2 Subscription Information Table)

(Figure 5.3 Subscription results)

(Figure 5.4 Consumption Results)

5.4 Greyscale Version Tag Changes

Assuming that the subscription information of V1 and V2 is shown in Table 5.3, the subscription results of Topic are shown in Figure 5.5. TOPIC_V_ORDER is subscribed by V1 and V2 at the same time. The first and last two queues are allocated to clients of grayscale V2, and the middle four queues are allocated to clients of non-grayscale V1. We send TOPIC_V_ORDER 4 non-grayscale messages with Tag= V1 and grayscale messages with Tag= V2 respectively. As can be seen from Figure 5.6, the non-grayscale messages with Tag= V1 are successfully consumed by two clients of V1, while the grayscale messages with Tag= V2 are successfully consumed by two clients of V2.

(Table 5.3 Subscription Information Table)

(Figure 5.5 Subscription results)

(Figure 5.6 Consumption Results)

5.5 Greyscale version Topic & Tag mixed change

Assuming that the subscription information of V1 and V2 is shown in Table 5.4, the subscription result of Topic is shown in Figure 5.7, which is the same as that of 5.2. We send 4 non-grayscale messages with Tag= V1 and grayscale messages with Tag=v2 to TOPIC_V_ORDER, and 4 grayscale messages to TOPIC_V_PAYMENT. The consumption results are shown in Figure 5.8. It can be seen that the two clients of V2 successfully consumed the grayscale messages of Tag=v2 in TOPIC_V_PAYMENT and TOPIC_V_ORDER, while the two clients of V1 only consumed the non-grayscale messages of Tag=v1 in TOPIC_V_ORDER.

(Table 5.4 Subscription Information Table)

(Figure 5.7 Subscription results)

(Figure 5.8 Consumption Results)

Six, the concluding

For the actual GRAYscale version of MQ, we have unified encapsulation of MQ sender and consumer. The business side only needs to configure graySwitch and grayFlag. GraySwtich flag will take effect only when graySwitch is enabled. Used to mark whether the current client is a grayscale client.

In multi-system interaction, the business system can control whether grayscale and non-grayscale messages of other systems are consumed in full by switching graySwitch, and whether grayscale or non-grayscale messages are consumed separately by grayFlag. GraySwitch and grayFlag parameters can be placed in the configuration center to take effect. When grayFlag traffic needs to be switched, a script can be developed to change grayFlag uniformly to achieve lossless switching of grayFlag traffic in the whole link.

In addition, we use Namesrv to control the switch state in sufficient detail to ensure that the unconsumed messages are consumed before the switch is actually performed.

Thanks to Alibaba’s RocketMQ messaging middleware!

Author: Ou Erli, Xiong Huanxin, Vivo Process IT Team