As a data distribution center, message queue carries more and more scenarios and data, from the beginning of OLTP to OLAP, and even to the Internet of Things, artificial intelligence, machine learning and other scenarios, there is a lot of imagination. In terms of capability, message queues now own the data, have the computing power to go from hosting the data to understanding the data.


Ant Financial is also thinking about the ability to add algorithms to message queues, so that algorithms can walk into message queues and move to the next stage: insight into data. These capabilities are integrated to create an intelligent transmission computing service platform.


The good news is that as a core part of SOFA (Scalable Open Financial Architecture) technology, message queues will continue to embrace Open source and community.


This article will share the story of ant Financial’s message queue development process and the architectural thinking in this process.

Key requirements for messaging systems in financial scenarios

At Ant Financial, message queues have been around for more than a decade.

In 2007 and 2008, we adopted an ESB to implement messaging mechanisms.

At that time, the most headache is the loss of information, troubleshooting and repair is very painful.

In 2009, a new message queue system was jointly built and launched with Taobao, and the problem of message loss was effectively improved.

Ant’s business has financial attributes. From this perspective, what are the key requirements? The four points are as follows:

  1. High reliability

    For example, if a bill is generated from a message, what happens if the message is unreliable and the message is lost? Customers get confused when they pay for something but don’t see it in the bill or expense record. So high reliability means that messages cannot be lost.

  2. Great consistency

    A strong degree of consistency is critical and important in financial business. If you do a transfer operation, for a variety of reasons, such as network jitter, the transfer fails, if the consistency is not good, you may also receive a notice of a transfer, this time the data of the system will be inconsistent.

  3. Continuous availability

    Continuous availability means that the service capability provided by the system must be available when it is expected to be used. For example, on Singles’ Day, an online order needs to be paid. We must hope that it can be paid smoothly. Another, the offline scene is very popular now. When you go to the supermarket to buy things, you also want to scan the code and pay smoothly, which are all requirements for availability.

  4. Extremely high performance

    In Ant Financial, there are hundreds of billions of messages in circulation every day, and the peak TPS reaches tens of millions. In such a large volume, the performance requirements are very high. In addition, from a cost perspective and user experience perspective, performance is also a very important concern.

Compared to a classical messaging system, what mechanisms need to be put in place to meet these key requirements?

The performance requirements for the four core financial scenarios have just been mentioned, so how can they be met?

1. How to achieve high reliability?

  • The mechanism of ACK. The ACK mechanism draws on the ideas in TCP to ensure the reliability of the message in each link of the flow path through the ACK mechanism in the sending stage, persistent stage and delivery stage.

  • The retry mechanism ensures that the message can be consumed again if the consumer fails to consume it after being sent out.

  • The reliability of message data is guaranteed by the persistence mechanism and reliability mechanism of the storage layer.

2. Use a two-phase transaction messaging mechanism to ensure strong consistency

In the first phase, sending messages and the business’s own business operations are placed in a local transaction, and messages are sent with uncommitted status. In the second phase, the local transaction execution determines whether the message sent in the first phase is committed or rolled back. If it is rolled back, delete the message. If it is committed, update the status of the message, change it to uncommitted, and then post.


If the transaction status notification is lost in the second phase, the message server will proactively check the transaction status back to the message sender until it has a clear result of the transaction commit or rollback.

3. Implementation of continuous availability

We are doing things to improve usability in the era of single room. For example, thread pool isolation is done at the application level, and flow limiting, fusing, and so on. Do various horizontal scaling capabilities at the architecture level, single point isolation at the failure isolation level, cluster deployment isolation and so on. These measures improve the usability of the system.

However, due to the limitations of the single room deployment architecture, when there is a machine room level problem, the front means on the heart willing and insufficient.


In the same-city Active-active architecture, service traffic can be switched between two equipment rooms or data layer switchover can be used to effectively solve the single point problem in the equipment room.

However, as services grow, the capacity and DISASTER recovery capabilities of the same-city dual-room mode gradually fail to meet service development requirements.

In the face of the situation that the same city can not solve the dual activity, Ant Financial precipitated a remote multi-activity LDC architecture:

What are the requirements for message queues under the LDC architecture?

Take the transfer as an example. In the multi-living architecture, the receiver and the transfer may be in the same logical Zone, or they may not be in the same logical Zone, or they may not even be in the same city. As a result, one of the most important and core requirements is that message queues need to have very flexible and powerful routing capabilities, which can be used for routing within zones

.


In terms of implementation, if it is found that the message is to be routed across the city and across the Zone, it is routed through the message server and forwarded through the server. When it is found that the message is in the cross-city scenario, it is converged on the cross-city link and closed on the city-level deployment logic through a system called MQ-Router. All the logic that needs to pass through the city is all closed into such a system, unified and flexible to support the structure of remote living.

Under the LDC architecture, messages are interesting application scenarios

There is a kind of member information data with more characteristics:

  1. The traffic is very high, so put it in the cache to reduce the stress on the back-end database.

  2. This data can be accessed many times in a single business request, so it is sensitive to delays in data access. If this kind of data needs to be accessed across the city, the delay caused by cross-city is very unacceptable to the business. So this kind of data is required to be accessible from the city, and every city needs to have a full amount of this kind of data.

  3. Such data is sensitive to the timeliness of changes. Data changes need to be sensed very quickly. If you rely on the database-level replication mechanism to do this, there is a delay of about a second.

Therefore, we design a message-based scheme to implement a city-level cache update mechanism. Emphasis was placed on adding broadcast capabilities to the MQ-Router. When such data changes, it is sent as a message and broadcast to all cities via mQ-Router, thus achieving the effect of real-time updates to the cache of multiple cities.

4. Performance is a constant work in progress

Message queue is implemented based on SEDA mode, and SOFABolt, a high-performance network communication layer developed by us, is introduced to improve the performance of message communication. In addition to the traditional optimization methods, we are also investigating and thinking about some more advanced methods, such as the combination of hardware and technologies such as DPDK and RDMA, to pursue more extreme performance.

Embracing the era of big data, we have made a message queue in pull mode

For a long time, the research and development work of message queue was carried out around OLTP services such as transactions, payments and accounting. So I’ve been polishing the functionality of message queues in OLTP scenarios. For example, the reliability of message can be guaranteed through database storage, and the real-time of message can be improved through push mode. With the expansion of business scenarios, especially the advent of the era of big data, more and more OLAP scenarios appear. At this time, the previous practices, especially the model of the push encountered a lot of difficulties.


At this stage, we went to the Log semantics based on the pull pattern of the message queue. Pull mode messages are deployed on physical machines and are sequentially written to local disks to implement pull semantics. It effectively supports the requirements of such scenarios as OLAP for some time.

Moving toward a separate architecture for computing and storage, from hanging tray mode to API mode

With the spread of pull mode, many OLTP scenarios gradually adopt pull mode, putting forward many new requirements. For example, OLTP has high requirements on data reliability, and is concerned about the reliability of local file storage.

Because it is based on physical machine deployment, it also encounters many difficulties in operation and maintenance, such as cost, model and so on. In particular, physical machine models vary a lot, and each purchase may be different, making it very difficult to standardize. There are many difficulties in capacity planning, capacity reduction and expansion.


Message is a more heavy IO computing model, on the physical machine will show a very obvious imbalance of resource ratio. Often the disk is out of use, but the CPU is still free. The operation and maintenance based on physical machines is also very complex, with prominent problems such as low resource utilization, poor capacity planning, and difficult capacity expansion and reduction.


In doing this, we started with a lighter approach, which we called the tray model. A distributed file system is mounted to a message queue application. The advantage of this approach is that the application itself requires little modification. Message data is transparently written to the distributed file system, and the reliability of message data is guaranteed by the high reliability of three replicas provided by the distributed file system.


One other thing that was done in this phase was to standardize the messaging application specifications. You can make specifications like 8C, 16G, and 1T. With a standard specification, you can more accurately estimate how many TPS messages a specification can handle, which makes capacity planning easier. After this model went up, it carried some business and also accepted the test of double eleven.


So we started the second phase of computational storage separation: API mode, which had a significant performance improvement. In this mode, the message server has to make major changes, and many functional enhancements have been made to take advantage of this opportunity. For example, support for global fixed partitions has been added, as well as the ability to send idempotent and strong orders. There is also a change to data landing, where all the data is stored in a commit log and moved to a queue. The benefit of this is more fine-grained configuration and control at the queue level.


As a whole, this architecture is a relatively complete architecture for computing and storage separation. In the application level also made a lot of extensible design.


On the whole, the mode of computing and storage separation lays a good foundation for message queue, which can be well adapted to the operation and maintenance mode of ant Financial’s whole station.

Let the computation go to the message queue, giving the message queue computing power

Message queues carry more and more message data, and a large amount of data flows in and out. It is said that in the era of big data, data is money, but you can find so much data flowing through message queues, but there is no gold to be found.

In thinking about this, the key thing is that message queues have been looked at in a more traditional way, as a channel for messages, and messages flow in and out, and the mission is over. In this way of thinking, focus on creating its transmission capacity, its storage capacity, its reliability and so on. However, it ignores a very important ability in the era of big data, that is, the ability to calculate.

Take this problem to see some developments in the industry, get a lot of new ideas. Kafka in particular took a lot of inspiration.

So we decided to let the calculation into the message queue, streaming mode as a message queue are added to a kind of computing power, implemented a lightweight the computing framework of noncentral type, can be embedded in the client and server can also be embedded in news, do some lightweight computation, support more general and lightweight operator and a variety of semantic calculation window.

At this point, message queues are capable of transmission, storage, and computation


Based on these capabilities, the message queue to a larger level to build a data transmission computing platform. Enrich message queuing capabilities, expand richer and richer data sources, capture more and more diverse data, and deliver messages to more and more destinations. The message is evaluated in transit to get more value out of the computation.


From the previous review, we can see that message queues serve as a hub for data, hosting more and more scenarios and data.


In terms of capability, message queues now own the data, have the computing power, and are on a path from hosting the data to understanding it. Next, we’re also thinking about the ability to add algorithms to message queues, to walk algorithms into message queues. So we can take the next step, the next step of data insight, and we can combine these capabilities to build an intelligent transmission computing service platform. In this way, the message data not only flows through the message queue, but also can go through more calculation and processing, and play a greater value more quickly and in real time.


This article is compiled and edited according to the topic of “Evolution of Financial Message Queue” shared by Jiang Tao in GIAC 2018

Welcome to jointly create SOFAStack https://github.com/alipay