Summary: When we decided to introduce MQ a few years ago, there were already a number of mature solutions on the market, such as RabbitMQ, ActiveMQ, NSQ, Kafka, etc. Considering stability, maintenance costs, and the company’s technology stack, we chose RocketMQ.

background

Why RocketMQ

When we decided to introduce MQ a few years ago, there were already a number of mature solutions on the market, such as RabbitMQ, ActiveMQ, NSQ, Kafka, etc. Considering stability, maintenance cost, company technology stack, etc., we chose RocketMQ:

  • Pure Java development, no dependence, easy to use, problems can hold;
  • After Ali Double Eleven test, performance, stability can be guaranteed;
  • Function practical, sending end: synchronous, asynchronous, unilateral, delay sending; Consumer side: message reset, retry queue, dead letter queue;
  • The community is active, can communicate and solve problems in time.

usage

  • Mainly used for peak clipping, decoupling and asynchronous processing;
  • Has been widely used in train tickets, air tickets, hotels and other core businesses, to withstand the huge WeChat entrance flow;
  • It is widely used in core processes such as payment, order, invoice, data synchronization, etc.
  • 100 + billion messages a day.

The following diagram is the MQ access frame diagram

Due to the company’s technical stack, we provide the Java SDK for Client SDK. For other languages, converge to HTTP Proxy to shield language details and save maintenance costs. According to the major lines of business, the back-end storage nodes are isolated and do not affect each other.



MQ double center reconstruction

Before the single room appeared network failure, a greater impact on the business. In order to ensure high availability of services, the transformation of double centers in the same city has been put on the agenda.

Why do dual centers

  • Single room fault service available;
  • Ensure data reliability: if all the data are in the same machine room, in case of machine room failure, there is a risk of data loss;
  • Horizontal expansion: single room capacity is limited, multiple rooms can share the flow.

Dual center scheme

Before doing double center, the city double center program has made some research, mainly cold (hot) backup, double live two kinds. (At that time, the Community Dledger version was not available, and the Dledger version was fully available as a dual-center option.)

1) In-city cold (hot) backup

Two separate MQ clusters, user traffic written to a primary cluster, and data synchronized to the standby cluster in real time. The community has a mature RocketMQ Replicator solution that periodically synchronize metadata such as topics, consumption groups, consumption schedules, etc.



2) Double activities in the same city

Two independent MQ clusters. User traffic is written to MQ clusters in respective computer rooms, and the data is out of sync with each other.

Usually the business is written to the MQ cluster of the respective machine room. If one machine room is down, all the user request traffic can be cut to another machine room, and the messages will also be produced to another machine room.





For dual live scenarios, you need to address the MQ cluster domain name.

1) If two clusters use one domain name, the domain name can be dynamically resolved to their respective rooms. This method requires production and consumption must be in the same machine room. If the production is in IDC1 and the consumption is in IDC2, then the production and consumption are connected to a cluster respectively, and the data cannot be consumed.

2) If one cluster has one domain name, the business side will make a big change. The cluster we served before was deployed in a single center, and the business side has been connected in large numbers, so the promotion of this scheme is difficult.

In order to reduce the changes of the business side as much as possible, the domain name can only continue to use the previous domain name. Finally, we adopted a Global MQ cluster, spanning two machine rooms. It doesn’t matter whether the business is deployed in a single center or a dual center. And just upgrade the client without changing any code.

Dual center appeal

  • Proximity principle: the producer is in A room, and the messages produced are stored in A room broker; The consumer is in A room, and the consumption message comes from A room broker.
  • Single room failure: normal production, message not lost.
  • Broker Primary Node Failure: Automatic primary selection.

Nearby principle

In a nutshell, it’s about determining two things:

  • Nodes (client node, server node) how to determine which IDC they are in;
  • How does the client node determine which IDC the server node is in?

How to judge oneself in which IDC?

1) IP query

When a node is started, it can obtain its own IP and query the computer room it is in through the internal components of the company.

2) Environmental awareness

It is necessary to cooperate with the operation and maintenance students to write some of their own metadata, such as computer room information, into the local configuration file when the node is installed, and then directly read and write the configuration file at startup.

We used the second scenario, which had no component dependencies, and the value of logicIdcUK in the configuration file was the machine room flag.

How can a client node identify a server node in the same machine room?

The client node can get the IP of the server node and the broker name, so:

  • IP query: query IP room information through internal components of the company;
  • Add room information to the broker name: In the configuration file, add room information to the broker name.
  • Machine room identification is added in the protocol layer: when the server node registers with the metadata system, it will register its own machine room information together.

Compared to the former two, the implementation is a little more complicated, changed the protocol layer, we used the second and the third combination.

Go to the production

Based on the above analysis, the idea of nearby production is very clear, and the default priority is the nearby production in the machine room;

If the service node of this machine room is not available, we can try to expand the machine room for production, and the business can be configured according to the actual needs.

The nearest consumption

Priority to the machine room consumption, and by default to ensure that all messages can be consumed.

The queue allocation algorithm uses the queue allocation according to the machine room

Each computer room message is equally distributed to the consumer end of the computer room; This machine room has no consumer end, equally divided to other machine room consumer end.

The pseudocode is as follows:

Map<String, Set> mqs = classifyMQByIdc(mqAll); Map<String, Set> cids = classifyCidByIdc(cidAll); Set<> result = new HashSet<>; for(element in mqs){ result.add(allocateMQAveragely(element, cids, cid)); // CID is the current client}

The consumption scenario mainly consists of unilateral and bilateral deployment on the consumption side.

Unilateral deployment, the consumer will pull all messages for each machine room by default.



Bilateral deployment, the consumer end will only consume the message of their own room, we should pay attention to the actual production of each room and the number of consumer end, to prevent the emergence of a computer room consumption end is too little.

Single room failure

  • Each group of brokers is configured

One master and two slaves, one master and one slave in one machine room, one slave in another machine room; When a slave synchronizes a message, the message is sent successfully.

  • Single room failure

Message production across the machine room; The unconsumed message continues to be consumed in another machine.

Fault cut the main

In case of failure of a group of broker master nodes, in order to ensure the availability of the whole cluster, the master needs to be selected in the slave and switched. To do this, you first need to have a broker primary fault arbitration system called the Nameserver (NS) metadata system (similar to the sentinel in Redis).

The nodes in the NS metadata system are located in three computer rooms (there is a third-party cloud computer room, and NS nodes are deployed on the cloud, so the amount of metadata is small and the delay is acceptable). The NS nodes in the three computer rooms choose a leader through RAFT protocol. The broker node synchronizes metadata to the leader, who in turn synchronizes metadata to the follower.

When metadata is retrieved from the client node, the data can be read from both the leader and follower.



Cut the main process

  • If the Nameserver leader detects an abnormality on the broker primary node and asks the other followers to confirm it; If half of the followers believe that the broker nodes are abnormal, the leader notifies the broker to select the master among the slave nodes, and the slave nodes with large synchronization progress are selected as the main one.
  • The newly elected broker master performs the switch and registers with the metadata system.
  • The production side could not send a message to the old broker master node.

The flow chart is as follows



Cutting center drill

The user requests the load to the dual center. The following operation first cuts the traffic to the second center — returns to the dual center — cuts the traffic to the first center. Ensure that each center can handle the full volume of user requests.

First, all user traffic is cut to the second center



Flow returns to dual centers and cuts to one center



review

  • Global Global Cluster
  • Nearby principle
  • One master and two slave, write half the message and write successfully
  • Metadata system RAFT master
  • Broker master node failure, automatic selection of master

MQ platform governance

Even if the system is of high performance and high availability, it will bring a variety of problems and increase the unnecessary maintenance cost if it is casually used or not standardized. Therefore, the necessary governance means are indispensable.

purpose

It makes the system more stable

  • Timely warning
  • Quick positioning and stop loss

What aspects of governance

Subject/consumer group governance

  • Apply for to use

Production environment MQ cluster, we have turned off the automatic creation of theme and consumption group, before using the need to apply for and record the theme and consumption group project identification and users. As soon as something goes wrong, we can immediately go to the person in charge of the subject and the consumer group and find out what’s going on. If there are multiple sets of environments such as test, grayscale and production, you can apply for multiple clusters at the same time to avoid the trouble of applying for clusters one by one.

  • Production speed

In order to prevent the business from inadvertently sending a large number of useless messages, it is necessary to flow control the topic production rate on the server side to avoid this topic crowding out the processing resources of other topics.

  • Message backlog

If the consumer group is sensitive to message accumulation, the user can set the threshold value of message accumulation quantity and the alarm mode. If the threshold value is exceeded, the user will be notified immediately. You can also set the threshold value of message accumulation time, after a period of time has not been consumed, immediately notify the user.

  • The consumer node went offline

If the consumer node goes offline or has no response for a period of time, it needs to be notified to the user.

Client Governance

  • Sending and consuming time – consuming detection

Monitor the time it takes to send/consume a message, detect applications that are performing poorly, and inform users to take action to improve performance; At the same time, the message body size is monitored. For the project whose message body size is more than 10 KB on average, the project is pushed to enable compression or message refactoring to control the message body within 10 KB.

  • Message link tracing

By which IP and at which point in time a message is sent, by which IP and at which point in time it is consumed, together with the statistical message reception of the server and the information pushed by the message, it forms a simple message link tracking and connects the life cycle of the message. Users can query MSGID or pre-set key to view messages and troubleshoot problems.

  • Low or hidden version detection

As functionality continues to iterate, SDK versions will be upgraded and may introduce risks. Report the SDK version regularly, and push users to upgrade the problematic or low version.

Server-side governance

  • Cluster health inspection

How do you tell if a cluster is healthy? Timely detect the number of nodes in the cluster, cluster write TPS, consumption TPS, and simulate user production and consumption messages.

  • Cluster performance patrol

Performance metrics are ultimately reflected in the time it takes to process message production and consumption. The server side statistics the processing time of each production and consumption request. If a certain proportion of message processing time is too long within a statistical cycle, the node is considered to have a performance problem. The main cause of performance problems is the physical bottleneck of the system, such as high utilization rate of disk IO UTIL and high CPU load. These hardware indicators are automatically alerted by the nightjar monitoring system.

  • Cluster high availability

High availability mainly refers to the situation where the master node in the broker fails to work normally due to hardware and software failure, and the slave node is automatically switched to the master, which is suitable for the scenario where message order and cluster integrity are required.

Part of the background operation display

Subject with consumer group application

Real-time statistics of production, consumption and accumulation





The cluster monitoring

The pit of tread

The community of MQ system has experienced a long time of improvement and precipitation, we have also come to some problems in the process of use, require us to in-depth understanding of the source code, so that problems do not panic, quick stop loss.

  • When the old and new consumers coexist, the queue allocation algorithm we implement is not compatible, so it is enough to be compatible.
  • The number of themes and consumption groups is large, and the registration time is too long. The memory is OOM, and the registration time is shortened by compression. The community has been fixed.
  • [Fixed] Inconsistent topic length judgment caused restart message loss, community has fixed it;
  • CentOS 6.6 – The broker process is suspended and the OS version is upgraded.

The future of MQ

At present, the message retention time is short, which is not convenient for problem checking and data prediction. Next, we will archive the historical message and make data prediction based on it.

  • Historical data archiving
  • The underlying storage is stripped, and the computation and storage are separated
  • Complete more data prediction based on historical data
  • The server upgrades to Dledger to ensure strict consistency of messages

This article is the original content of Aliyun, shall not be reproduced without permission.