Origin of the problem

In e-commerce and other businesses, the system is generally composed of multiple independent services. How to solve the data consistency in distributed invocation?

The specific service scenarios are as follows: For example, if A service operation invokes services A, B, and C at the same time, the operation succeeds. Or both. A, B, and C may be remote services developed by different departments and deployed on different servers.

In distributed systems, if we don’t want to sacrifice consistency, CAP theory tells us to give up availability, which is obviously unacceptable. In order to facilitate the discussion, the basic theory of data consistency is briefly introduced.

Strong consistent

 

When the update operation is complete, any subsequent accesses by multiple processes or threads will return the latest updated value. This is the most user-friendly, where the user is guaranteed to read what they wrote the next time. According to CAP theory, this implementation requires a sacrifice of availability.

Weak consistency

There is no guarantee that continuation or thread access will return the latest updated value. After data is written successfully, the system does not promise that the latest value can be read immediately, nor does it promise how long it will be read.

Final consistency

A particular form of weak consistency. The system guarantees that the value of the last update operation will be returned if there is no subsequent update. On the premise that no failure occurs, the time of inconsistent window is mainly affected by communication delay, system load and the number of duplicate copies. DNS is a typical ultimate consistency system.

In engineering practice, in order to ensure the availability of the system, most Internet systems transform the strong consistency requirement into the final consistency requirement, and ensure the final consistency of data through the system implementation of idempotent guarantee. However, in e-commerce and other scenarios, the solution to data consistency is different from that of common Internet systems (such as MySQL master-slave synchronization). The discussion among group members is divided into the following six solutions.

1. Avoid distributed transactions — business consolidation

The business integration scheme mainly adopts the method of integrating interfaces into local execution. Take the problem scenario as an example, service A, B, and C can be integrated into service D, which can then be transformed into A local transaction. For example, service D contains local service and service E, and service E is the integration of local services A to C.

Advantages: Solves (avoids) distributed transactions.

 

Disadvantages: It is obvious that the services planned to be split are coupled together, and the service responsibilities are not clear, which is not conducive to maintenance.

This method is generally not recommended because of its obvious drawbacks.

2. Classic solution — eBay model

The core of this solution is to execute tasks requiring distributed processing asynchronously through message logging. Message logs can be stored in local text, databases, or message queues, and then automatically or manually initiated retries by business rules. Manual retry is more commonly used in payment scenarios to deal with post-hoc problems through reconciliation systems.

The core of the message logging scheme is to ensure idempotency of the service interface.

Considering the network communication failure, data packet loss and other reasons, if the interface can not guarantee idempotent, data uniqueness will be difficult to guarantee.

The main idea of the eBay approach is as follows.

Base: an alternative to Acid

The solution, written by eBay architect Dan Pritchett for ACM in 2008, is a classic article explaining the BASE principle, or ultimate consistency. This paper discusses the basic difference between BASE and ACID principles in ensuring data consistency.

If ACID provides a consistent option for partitioned databases, how can availability be achieved? The answer is

BASE (basically available, soft state, eventually consistent)

BASE availability is achieved by supporting local failures rather than system-wide failures. Here’s a simple example: If you partition users across five database servers, the BASE design encourages similar processing, and a failure of a user’s database affects only the 20% of users on that particular host. There is no magic involved, but it does lead to higher perceived system availability.

One of the most common scenarios described in this article is if a transaction occurs, adding records to the transaction table and changing the amount in the user table. These two tables belong to different remote services, so the issue of distributed transaction consistency is involved.

A classic solution proposed in this paper is to place the main modification operation and the message of updating the user table in a local transaction. To avoid repeated consumption of user table messages and to achieve multiple retry idempotency, add an update log table updates_applied to record messages that have already been processed.

The pseudo-code for the execution of the system is shown below

 

Based on the above method, in the first phase, transaction tables and message queues are added through the transaction guarantee of the local database.

In the second phase, the message queues are read (but not deleted), and the related records are detected by judging the update record table updates_applied. The unexecuted records modify the USER table, and then add an operation record to updates_applied. The queue is deleted after the transaction is successfully executed.

Through the above methods, the final consistency of distributed system is achieved. For more information about eBay’s plan, see the link at the end of this article.

3. Qunar Distributed transaction scheme

With the continuous expansion of business scale, e-commerce websites are generally facing the split road. The idea is to break up a single application into subsystems with different responsibilities. For example, functions for users, customers and operations may be put in one system before, but now they are divided into order center, agent management, operation system, quotation center, inventory management and other subsystems.

What’s the first thing to do?

The original monolithic applications had everything together, storage together. For example, if the operation wants to cancel an order, it is ok to directly update the order table status and then update the inventory table. Because it is a single application, libraries together, all of these can be in a transaction, by the relational database to ensure consistency.

But when you break it down, different subsystems have their own storage. For example, the order center only manages its own order library, while the inventory management has its own library. So when the operation system cancels the order, it calls the order center and inventory management service through the interface call, rather than directly going to the operation library. This involves a “distributed transaction” problem.

Distributed transactions can be resolved in two ways

1. Use asynchronous messages preferentially.

As mentioned earlier, using asynchronous messaging on the Consumer side requires idempotent implementation.

There are two ways to be idempotent. One way is for business logic to guarantee idempotent. For example, after receiving the message of payment success, the order status changes to payment completed. If the current status is payment completed, another message of payment success is received, which indicates that the message is repeated and is directly treated as a message of success.

Alternatively, if the business logic cannot guarantee idempotent, add a de-duplicate table or similar implementation. For the producer end, a message library is placed on the same instance of the business database, and the messages and business operations are in the same local transaction. Message when the message is not sent immediately, but to insert a message message library records, and then at the time of transaction commit asynchronous message is sent, send messages, if successful will pool message deleted, if there are any abnormal message queue service or network issues, messages not successfully sent to stay here, Another service is constantly scanning out and resending the messages.

2. Some services are not suitable for asynchronous messaging, and all participants of a transaction need to get the results synchronously. This situation is implemented in a similar way, with a transaction record library on top of the same-instance of the local business library for each participant.

Let’s say A calls B and C synchronously. A updates the status of the local transaction record when the local transaction succeeds, as does B and C. If A fails to call B once, it could be that B actually failed or that the call timed out and B actually succeeded. A central service compares the transaction records of the three parties to make a final decision. Suppose that the transaction record for three parties is now A success, B failure, and C success. Then the final decision can be made in two ways, depending on the specific scenario:

  1. Retry B until B succeeds. The transaction record table records the call parameters and other information.
  2. Perform the compensation operations of A and B (one possible compensation method is rollback).

A special description is given for scenario B: For example, if b is the storage service, it fails the first invocation for some reason, but the inventory has become 0 when it is retried. In this case, only A and C can be rolled back.

Is it a reasonable design to put a message library or transaction record library in the same instance of a business library, which is intrusive to the business and which cares about the library?

In fact, we can rely on operations to simplify the intrusion of development. Our method is to let dbAs pre-initialize the library on all MySQL instances in the company and operate the library transparently behind the framework layer (message client or transactional RPC framework). Business developers only need to care about their own business logic. There is no need to access this library directly.

In summary, the basic principle of both approaches is similar, that is, transform distributed transactions into multiple local transactions, and then rely on retry to achieve final consistency.

4. Distributed consistency scheme in the process of mogujie transaction creation

General process for transaction creation

We abstract the transaction creation process as a set of extensible function points, each of which can have multiple implementations (the concrete implementations are combined/mutually exclusive). The process of creating a transaction is completed by connecting each function point according to a certain process.

Problems faced

The implementation of each function point may depend on external services. So how do you ensure that data is consistent across services? For example, the call to lock coupon service timed out, and it is not sure whether the lock coupon is successful, how to deal with it? For example, lock coupons successfully, but the deduction of inventory failed, how to deal with?

Scheme selection

Too many service dependencies lead to increased management complexity and stability risks. Imagine if we relied heavily on 10 services, 9 of which were successful and the last one failed, would the first 9 have to be rolled back? The cost is still very high.

Therefore, on the premise of splitting the large process into multiple small local transactions, we choose the scheme of sending message notification and asynchronous execution of associated transactions for non-real-time and non-strong consistency related business writing after the successful execution of local transactions.

Notifications are not always 100% successful; In addition, it is unknown whether the recipient service can be successfully implemented after the message is notified. The former problem can be solved by retry; The latter can be guaranteed by using transaction messages.

However, the transaction message framework itself will bring intrusion and complexity to the business code, so we choose to decouple the system based on the DB event change notification to MQ. Through the ACK mechanism when the subscriber consumes MQ messages, the message will be consumed successfully to achieve the final consistency. Since messages may be resent, subscribers of messages need to ensure idempotent processing of business logic.

So the only business scenarios left are those that require real-time synchronization and strong consistency. Coupon locking and inventory reduction are two typical scenarios in the transaction creation process.

At first glance, a distributed transaction framework is necessary to ensure data consistency across multiple systems. However, the introduction of a very heavy distributed transaction framework like two-phase commit introduces a sharp increase in complexity; In the field of e-commerce, absolute strong consistency is too ideal, we can choose quasi-real-time final consistency.

In the transaction creation process, we first create an invisible order and then issue a scrap message to MQ for invocation exceptions (failed or timed out) when calling the lock coupons and destocking synchronously. If the message fails to be sent, a time-stepped asynchronous retry is performed locally. After receiving the message, the coupon system and inventory system will determine whether the business needs to be rolled back, which ensures the final consistency of multiple local transactions in quasi-real time.

5. Distributed service DTS scheme of Alipay and Ant Financial Cloud

The industry also commonly used alipay’s XTS program, improved by Alipay on the basis of 2PC. The main ideas are as follows. Most of the information is quoted from the official website.

 

Introduction to Distributed transaction services

Distributed Transaction Service (DTS) is a Distributed Transaction framework used to guarantee the ultimate consistency of transactions in a large-scale Distributed environment. DTS is divided into xTS-client and XTS-Server in architecture. The former is a JAR package embedded in client applications, which is mainly responsible for writing and processing transaction data. The latter is a separate system responsible for the recovery of abnormal transactions.

Core features

The transaction model of a traditional relational database must follow the ACID principle. In the single-database mode, ACID model can effectively ensure data integrity, but in the large-scale distributed environment, a business often spans multiple databases, so how to ensure data consistency among these databases requires other effective strategies. In the JavaEE specification, 2PC (2 Phase Commit) is used to handle transactions across DB environments, but 2PC is anti-scalable, that is, during a transaction, participants need to hold resources until the end of the entire distributed transaction. In this way, when the business scale reaches tens of millions or more, the limitations of 2PC become more and more obvious, and the system scalability becomes poor. Based on this, we use the BASE idea to achieve a set of distributed transaction scheme similar to 2PC, which is DTS. DTS fully guarantees high availability and high reliability in distributed environment while simultaneously taking into account the requirements of data consistency. Its biggest feature is to ensure data consistency Eventually consistent.

To put it simply, the DTS framework has the following features:

  • Final consistency: During a transaction, there may be temporary inconsistencies, but by restoring the system, the transaction’s data can reach the final consistency goal.
  • Simple protocol: DTS defines a standard two-phase interface similar to 2PC. Business systems only need to implement the corresponding interface to use the transaction function of DTS.
  • Independent of THE RPC Service protocol: In SOA architecture, one or more DB operations are often wrapped as services, which communicate with each other through the RPC protocol. The DTS framework is built on top of the SOA architecture, independent of the underlying protocol.
  • Independent of the underlying transaction implementation: DTS is an abstract concept based on the Service layer and has nothing to do with the underlying transaction implementation. That is to say, within the scope of DTS, whether relational database MySQL, Oracle, KV storage MemCache, or HBase storage database, As long as the action on it is wrapped as a DTS participant, you can access the scope of a DTS transaction.

The following is a flowchart for the distributed transaction framework

implementation

 

  1. A complete business activity consists of a master business service and several slave business services.
  2. The master business service is responsible for initiating and completing the entire business activity.
  3. Provide TCC business operations from business services.
  4. The business activity manager controls the consistency of business activities by registering actions in business activities and confirming confirm actions for all two-phase transactions when the activity is committed and calling cancel actions for all two-phase transactions when the business activity is canceled.”

Compare with 2PC protocol

  1. There is no separate Prepare phase, reducing the cost of the agreement
  2. The system has high fault tolerance and easy recovery

6. Data consistency scheme of RURAL Credit Network

1. E-commerce business

The payment department of the company provides the payment service to the business department through access to other third-party payment systems. The payment service is a RPC service based on Dubbo.

For the business department, the order payment of the e-commerce department needs to be called

  1. The payment interface of the payment platform to process orders;
  2. At the same time, you need to invoke the interface of the integration center to add points to users according to service rules.

In terms of business rules, real-time performance and consistency of business data should be guaranteed, that is, credits must be added for successful payment.

The approach we take is synchronous invocation, dealing with the local transaction business first. Considering that the integral business is relatively single and the business impact is lower than that of payment, the integral platform provides the interface of increase and rollback.

The specific process is to first call the integration platform to increase user points, and then call the payment platform for payment processing. If the processing fails, the catch method calls the rollback method of the integration platform to withdraw the points order processed this time.

(Click on the image to zoom in full screen)

2. The user information is changed

The user information of the company is maintained by the user center, and the change of user information needs to be synchronized to each service subsystem, which then processes their own services according to the change content. The user center, acting as the producer of MQ, adds notifications to MQ. The APP Server subscribes to the message, synchronizes local data, and processes related services such as APP logout.

We adopt the asynchronous message notification mechanism. At present, we mainly use ActiveMQ and Virtual topic-based subscription to ensure the single consumption of a single business cluster subscription.

conclusion

Distributed services have more requirements for derivative supporting systems, especially for our final consistency scheme based on message and log, which needs to consider message backlogs, consumption, monitoring, alarm, etc.

The resources

  • Base: An Acid Alternative (eBay solution)

In partitioned databases, trading some consistency for availability can lead to dramatic improvements in scalability.

Queue.acm.org/detail.cfm?… The English version

Article.yeeyan.org/view/167444… The Chinese version of

  • Distributed Transaction Services (DTS)

www.cloud.alipay.com/docs/middle…