preface

Recently, I took over a project in a supply chain system and came into contact with the internal distributed transaction framework, so I took advantage of the Spring Festival to have a good understanding of the knowledge related to distributed transaction. This paper introduces the concepts and business application scenarios of transaction, as well as several mainstream design schemes and implementation principles of distributed transaction.

1. Basic concepts of transactions

1.1 The concept of transactions

A transaction is a logical group of operations, or a series of operations that are performed as a single logical unit. A transaction has the four properties of ACID, and is usually used to interact with a database using a client. In the left figure below, a transaction is started in a session, N data operations are performed, and a transaction is committed/rolled back.

Take the MySQL database as an example, transactions are implemented mainly by undo and redo logs. The MySQL transaction execution process is shown in the right above. These transactions are often used in business development. MySQL has supported XA transactions since version 5.0.3, and only InnodDB engine supports XA transactions. Essentially, XA transactions are distributed transactions based on two-phase commit. Multiple transactions involved in the operation either commit successfully or fail.

The XA transaction model consists of a transaction manager, one or more resource managers, and an application.

  • In the Prepare phase, the transaction manager receives the result information from all resource managers. After receiving the command, the resource manager modifies the data and records the log information. Then, the resource manager sends the result information to the transaction manager.
  • During the Commit phase, the transaction manager receives the result information returned by all resource managers. If one or more resource managers return a result to the transaction manager that is not committed or timed out, the transaction manager sends a rollback instruction to the resource manager that is not present.

Docker can be used to conveniently construct the experimental environment. The specific commands are as follows.

Docker run -d -p 3306:3306 -e MYSQL_ROOT_PASSWORD='123456' --name mysql mysql:latest# Docker run -d -p 3307:3307 -e MYSQL_ROOT_PASSWORD='123456' --name mysql mysql:latest# docker exec it [containId] /bin/bash # mysql -u root -p # alter user 'root'@'%' identified with mysql_native_password by '123456';Copy the code

Can write a MySQL XA transaction demo through Java code, here using JDBC directly connected to two databases, in the actual business development, rarely use JDBC to operate, will use a third-party framework to achieve XA transactions.

public class XaDemo { public static MysqlXADataSource getDataSource(String connStr, String user, String pwd) { try { MysqlXADataSource ds = new MysqlXADataSource(); ds.setUrl(connStr); ds.setUser(user); ds.setPassword(pwd); return ds; } catch (Exception e) { e.printStackTrace(); } return null; } public static void main (String [] args) {String connStr1 = "JDBC: mysql: / / 127.0.0.1:3306 / test". String connStr2 = "JDBC: mysql: / / 127.0.0.1:3307 / test". MysqlXADataSource ds1 = getDataSource(connStr1, "root", "123456"); MysqlXADataSource ds2 = getDataSource(connStr2, "root", "123456"); XaConnection1 = ds1.getXaconnection (); xaConnection1 = ds1.getXaconnection (); XAResource xaResource1 = xaConnection1.getXAResource(); Connection connection1 = xaConnection1.getConnection(); Statement statement1 = connection1.createStatement(); XaConnection2 = ds2.getXaconnection (); XAResource xaResource2 = xaConnection2.getXAResource(); Connection connection2 = xaConnection2.getConnection(); Statement statement2 = connection2.createStatement(); Xid xid1 = new MysqlXid(new byte[] {0x01}, new byte[] {0x02}, 100); Xid xid2 = new MysqlXid(new byte[] { 0x011 }, new byte[] { 0x012 }, 100); Xaresource1.start (xid1, xaresource.tmnoflags); int update1Result = statement1.executeUpdate( "update account_from set money=money - 50 where id=1"); xaResource1.end(xid1, XAResource.TMSUCCESS); Xaresource2.start (xid2, xaresource.tmnoflags); int update2Result = statement2.executeUpdate( "update account_to set money= money + 50 where id=1"); xaResource2.end(xid2, XAResource.TMSUCCESS); Int ret1 = xaresource1. prepare(xid1); int ret2 = xaResource2.prepare(xid2); XA_OK == ret1 && xaresource.xa_ok == ret2) {xaresource1.commit (xid1, false); xaResource2.commit(xid2, false); } } catch (Exception e) { .... } } catch (Exception e) { .... }}}Copy the code

1.2 Evolution of Distributed Systems

In the beginning, the company’s website had little traffic, and it only needed one application to package all the functional code into a service and deploy it on the server to support the company’s business needs. With the continuous development of enterprises, the application of single node cannot meet the needs of the business, will be the original project system vertical split into several unrelated applications, to scale out, in order to better improve the overall performance of the system, when the application is more and more long, vertical will be some repeat business code into abstract service, calls for other system module, Eventually, it may be further broken down into microservices architectures, so that each microservice has its own database.

1.3 Distributed transaction scenario

To split a large system into multiple application services that can be independently deployed, each service needs to perform special processing to complete transaction operations. The following scenarios are common.

  • Cross-jvm process: Taking the e-commerce system domain as an example, the order microservice has its own order database, and will deduct the corresponding quantity of goods by calling the inventory service when updating the order. The inventory service also has its own inventory database. The two microservices belong to two processes, resulting in cross-JVM transactions.
  • Cross-database instance: it is common to visit multiple database instances in a single system, and it is also a distributed transaction generated by cross-data source access. For example, order data and inventory data are stored in different database instances. When a user initiates a refund, the user’s order data and transaction data will be operated at the same time.
  • Multiple services accessing the database: It is also possible that the order and inventory microservices are accessing the same database at the same time, essentially operating on the database through different database sessions, where distributed transactions occur.

,

1.4 Data consistency

In distributed scenarios, an exception or fault occurs on the network, server, or system software, causing inconsistent databases and inconsistent data on multiple cache nodes.

  • Call timeout scenario: Service A invokes service B synchronously or asynchronously. Data on service A and service B are inconsistent due to network or service exceptions.
  • Inconsistent between the cache and the database: In a high concurrency scenario, some hot data is cached in Redis or other cache components. If the data in the cache is not updated immediately when the database is added, modified, or deleted, the data in the cache will be inconsistent with the data in the database.
  • Inconsistent data of multiple cache nodes: The cache content is inconsistent among nodes. For example, in the Redis cluster, the data of multiple cache nodes is inconsistent due to brain split caused by network exceptions.
  • Multiple data copies: The database or cache middleware is deployed in active/standby mode or cluster mode for better performance. If the network, server, or system software is faulty, some copies may be written successfully and some copies may fail to be written. As a result, different data copies are inconsistent.

2. Strong consistency solutions

The strong consistent distributed transaction requires that the data of all nodes participating in the global transaction be consistent at any time. The typical solutions of the strong consistent transaction include DTP model /2PC model /3PC model. The strong consistent transaction solution has high data consistency and is suitable for bank transfer and other services. It has advantages such as being able to query the latest written data at any time, but its implementation is also technically complex, sacrificing availability, and performing poorly in high-concurrency scenarios.

2.1 DTP transaction model

The DTP model defines the specification and API for implementing distributed transactions, and the specific framework implemented by different vendors varies according to the specification. But in general, it is composed of three parts: application program, resource manager and transaction manager. Resource manager can be understood as database management system or message service manager. Transaction manager is responsible for coordinating and managing transactions in the model and providing programming interfaces for applications.

2.2 2PC transaction model

In the Prepare phase, the transaction manager sends a Prepare message to each resource manager participating in the global transaction. The resource manager either returns a failure message or executes the corresponding transaction locally. The transaction is written to the Redo Log and Undo Log files locally, but the transaction is not committed. In the Commit phase, if the transaction manager received the resource manager to participate in a global transaction returns failure message, directly to Prepare stage to perform a successful resource manager sent back to the rolling news, otherwise it will send a Commit message and the corresponding resource manager according to the transaction manager sent message command, executing a transaction rollback or submitted, And release lock resources used during transaction processing.

In the 2PC model, all nodes participating in transactions will lock the common resources they occupy, which will cause other processes or threads accessing the common resources to block. If the transaction manager fails, the resource manager will block until timeout, which is easy to occur a single point of failure.

2.3 3PC transaction model

3PC model refers to the three-stage commit model, which is divided into CanCommit, PreCommit and doCommit or doRollback. The general process is the same as that of 2PC model, which mainly solves the single point of failure and reduces the blocking phenomenon during the concurrent execution of transactions. In 3PC model, If the resource manager is unable to receive a message from the transaction manager in a timely manner, the resource manager will execute the transaction rather than holding the transaction’s resources and blocking, but this mechanism can lead to transaction inconsistencies.

3. Final consistent solution

Eventual consistency is not as strong consistency for each node involved in the transaction is consistent with the data, data query any node can get the latest data as a result, this will lead to the high concurrency scenarios, the performance of the system will be affected, and the eventual consistency distributed transaction solution can allow the intermediate state, As long as the final consistent state of the data can be reached after a period of time.

3.1 TCC

TCC is a typical solution to the distributed transaction problem, mainly to solve the distributed transaction problem in the scenario of cross-service invocation, widely used in the distributed transaction scenario. Business scenarios with strong isolation and strict consistency requirements also apply to services with short execution time. The core idea is to divide a complete transaction into three phases: Try, Confirm, and Cancel.

  • In the Try phase, no service logic is executed, but only service consistency check is performed and corresponding resources are reserved to maintain operation isolation.
  • Confirm phase: The Confirm phase starts when all branch transactions in the Try phase are successfully executed. Generally, the Confirm phase also succeeds after the Try phase is successfully executed. If an error occurs during the execution, a retry mechanism or manual processing is introduced to intervene in the error.
  • Cancel phase: In the case of an exception or error, the transaction needs to be rolled back, the branch transaction is cancelled, and the resources reserved in the Try phase are released. If the Cancel phase goes wrong, a retry mechanism or manual processing is also introduced.

TCC model implements specific logic in the application layer, locks resources with small granularity, and does not lock all resources, which improves system performance. Confirm and Cancel stages realize idempotent, which can ensure data consistency after distributed transactions are executed. At the same time, empty rollback/idempotence/suspension problems should be paid attention to when using this mode.

3.2 Distributed transaction scheme for reliable message ultimate Consistency

Reliability The basic principle of ultimate consistency is that the transaction initiator (message sender) sends a message after successfully executing the local transaction, and the transaction participant (message consumer) receives the message from the transaction initiator and successfully executes the local transaction. The final data of the transaction initiator and participant can reach a consistent state.

  • Based on the local message table: Through distributed system processing tasks, such as synchronous data such as operation, through the message or log in the form of asynchronous execution, these messages stored in a local file, or log can also be stored in the local database data table, can also be stored in the message middleware, by certain rules can retry, specific process is as follows (left), the scheme is the relatively mature, Use more no obvious shortcomings, but there is no guarantee that each service node of data between the strong consistency, at some point may not be able to query to the submitted data, message table is coupled to the business in the library, require additional manual maintenance message data, and not conducive to the expansion of the message data, since each message service is unique to the business system, and can make it public, Every time a common distributed transaction needs to be implemented, it needs to be redeveloped.
  • Based on independent message service: Separate the message service from the local message table, and separate the message data from the local data table into a separate message database. The detailed process is shown in the figure on the right. In this scheme, message service is independently deployed, developed and maintained, decoupled from business, with better expansibility and maintainability, reducing the cost of repeated development and weakening the dependence on message middle.

Alibaba’s RocketMQ messaging middleware provides the function of transaction message, and the user can call the interface of the provided transaction message. The multi-state of the transaction message is the core element to complete the transaction. Compared with normal producer, the state of messages can be stored in TransactionMQProducer, and the message callback and state query interfaces can be set.

public interface RocketMQLocalTransactionListener {
  RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg);
  RocketMQLocalTransactionState checkLocalTransactoin(Message msg);
}

TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNameServer("xxxx");
producer.start();
producer.setTransactionListerner(transactionListerner);
SendResult result = producer.sendMessageInTransaction(msg, null);
Copy the code

There is a high degree of reliance on message-oriented middleware in this scenario, so pay special attention to several aspects of using message-oriented middleware.

  • Message consistency: transaction sponsors perform local affairs and overall uniformity of a message and send a message, that is to say, the initiator to perform after the success of the transaction will produce its message successfully sent out, need to send confirmation message mechanism, here business sponsors and participants are to be sent with the middleware confirmation and retry mechanism;
  • Message received consistency: need to set the maximum times, message middleware repeat to transaction participants receive messages need to implement the interface power features, and make the transaction between parties and message confirmation mechanism, in the delivery after many failures, the message will be in a dead-letter queue, need to introduce monitoring and intervention mechanism;
  • Message reliability: Includes sending/storage/consumer reliability of three stages, in ensure the message is sent when the reliability, introduces the callback mechanism, the transaction sponsors provide callback interface, when abnormal messages, message service can also through certain mechanism to callback affairs sponsors provide a callback interface, access to business sponsors of transaction execution status and message data, Ensure that data is successfully received by the messaging service. If the transaction initiator does not receive the confirmation information from the message service within a certain period of time, the message retry mechanism will be triggered to resend the message according to certain rules. The reliability of message storage mainly depends on persistence and is redundant replication of multiple copies. The reliability of consumption mainly realizes the retry mechanism and idempotence mechanism of participants, and obtains the data of message middleware according to certain rules to ensure that the transaction initiator successfully consumes the message.

3.3 Maximum effort notification distributed transaction scheme

Maximum effort notification distributed transaction solution is applicable to scenarios where data can eventually achieve consistency and is less sensitive to time. This solution is mostly used to achieve transaction consistency across enterprises and systems, such as notifying merchants of successful payment after successful payment. The following requirements must be met to use the solution:

  • The service pattern used has queriable, idempotent operations and low sensitivity to the time of final consistency, ranging from seconds or minutes to days to achieve transaction consistency.
  • The processing result of the passive party does not affect the processing result of the active party.
  • The service active party sends notification messages to the service passive party after completing service processing operations to allow message loss.
  • The service active party can set step notification rules according to certain policies. After the notification fails, the service active party can repeat notification according to the rules until the notification times reach the set maximum.
  • The service active party provides a query interface for the service passive party to collate the query based on requirements to recover the lost service messages.


The resources

  • Jianshu.com/p/a7d1c4f2542c “XA transaction”
  • In-depth Understanding of Distributed Transaction Principles and Practices
  • www.cnblogs.com/zengkefu/p/… MySQL Two-phase Commit Protocol