preface

A few days ago, I studied Alibaba’s open source distributed transaction middleware Seata at home and recorded the process.

SpringBoot+Dubbo+Seata distributed transaction combat

However the light has actual combat is not good, we how many also have to make the principle clear, otherwise gave a problem to do not know how to solve is not very embarrassing.

A, principle

First, imagine a traditional monolithic application that uses three modules to complete a business by updating data on the same data source.

Naturally, data consistency across business processes is guaranteed by local transactions.

As business requirements and architectures change, monolithic applications are broken down into microservices. The original three modules were split into three separate services, each using separate data.

The business process will be completed through a service invocation of RPC.

At this point, data consistency within each service is still guaranteed by local transactions.

How to ensure global data consistency and integrity at the business level? This is the typical distributed transaction requirement in microservices architecture.

1. Principle and design

Seata understands a distributed transaction as a global transaction that contains several branch transactions. The responsibility of a global transaction is to coordinate the branch transactions under its jurisdiction to agree on either a successful commit together or a failed rollback together. In addition, often a branch transaction is itself a local transaction that satisfies ACID.

Seata defines three components to protocol the processing of distributed transactions.

  • Transaction Coordinator (TC) : A Transaction Coordinator that maintains the running status of global transactions and coordinates and drives the commit or rollback of global transactions.
  • Transaction Manager (TM) : Controls the boundaries of a global Transaction, is responsible for starting a global Transaction, and ultimately initiating a global commit or rollback resolution.
  • Resource Manager (RM) : Controls branch transactions, is responsible for branch registration, status reporting, receives transaction coordinator instructions, and drives the commit and rollback of branch (local) transactions.

A typical distributed transaction process:

  1. TM applies to TC for starting a global transaction. The global transaction is successfully created and a globally unique XID is generated.
  2. The XID is propagated in the context of the microservice invocation link.
  3. RM registers branch transactions with THE TC and brings them under the jurisdiction of the global transaction corresponding to the XID.
  4. TM initiates a global commit or rollback resolution against the XID to TC.
  5. TC schedules all branch transactions under XID to complete commit or rollback requests.

2. AT mode

Seata has four distributed transaction solutions, namely AT mode, TCC mode, Saga mode and XA mode. < The last two implementations are still in the official planned version >

In our example project, the AT pattern is used. In AT mode, users only need to focus on their own “business SQL” as a phase, and Seata framework will automatically generate two-phase commit and rollback operations for transactions.

  • A phase:

In the first phase, Seata intercepts “business SQL”, parses SQL semantics, finds the business data to be updated by “Business SQL”, saves it as “Before image” before the business data is updated, and then executes “Business SQL” to update the business data. After the business data is updated, Save it as “After Image” and generate a row lock. All of the above operations are done within a single database transaction, which ensures atomicity of the one-phase operations.

  • Phase 2 Submission:

Seata framework only needs to delete the snapshot data and row locks saved in the first phase to complete data cleaning because the “business SQL” has been committed to the database in the first phase.

  • Two-phase rollback:

In phase 2 rollback mode, Seata needs to roll back the “business SQL” executed in phase 1 to restore the business data. The rollback method is to use “before Image” to restore service data.

Let’s take a look at how the whole process is strung together from the source code.

2. Local environment construction

In order to facilitate the source code, first of all, we have to make the debugging environment, convenient Debug.

Seata source: github.com/seata/seata.

The current version is 0.7.0-Snapshot and then packages the project locally via MVN install.

Our SpringBoot+Seata test project can introduce this dependency.

< the dependency > < groupId > IO. Seata < / groupId > < artifactId > seata -all < / artifactId > < version > 0.7.0 - the SNAPSHOT < / version > </dependency>Copy the code

Why do you do that? Because the communication between different components of Seata is done by Netty, the connection is often disconnected due to timeout during debugging.

With the introduction of the local version, we can make the heartbeat detection time longer or remove it altogether

1. Start the server

IO. Seata.Server

We said that Seata defines three components, including a transaction coordinator called TC, which refers to this server.

Let’s see what it does.

Public class Server {public static void main(String[] args) throws IOException {// Initializes the ParameterParser parameterParser = new ParameterParser(args); RpcServer RpcServer = new RpcServer(WORKING_THREADS); rpcServer.setHost(parameterParser.getHost()); rpcServer.setListenPort(parameterParser.getPort()); UUIDGenerator.init(1); / / loaded from a file or database Session SessionHolder. The init (parameterParser. GetStoreMode ()); // initializing the DefaultCoordinator DefaultCoordinator = new DefaultCoordinator(rpcServer); coordinator.init(); rpcServer.setHandler(coordinator); // Register the hook program to clean up the coordinator related resource shutdownhook.getInstance ().addDisposable(coordinator); //127.0.0.1 and 0.0.0.0 are not valid hereif (NetUtil.isValidIp(parameterParser.getHost(), false)) {
	    XID.setIpAddress(parameterParser.getHost());
	} else{ XID.setIpAddress(NetUtil.getLocalIp()); } XID.setPort(rpcServer.getListenPort()); // Start RPC service rpcserver.init (); System.exit(0); }}Copy the code

The RpcServer is an RPC server implemented by Netty to receive and process TM and RM messages. The focus of this article is not on the server side, so let’s get a general idea.

3. Client configuration

In our project, we configured a SeataConfiguration, which focused on configuring the global transaction scanner and data source agent. So let’s take a look at why they are configured and what they do.

1. Transaction scanner

@Bean
public GlobalTransactionScanner globalTransactionScanner() {
    return new GlobalTransactionScanner("springboot-order"."my_test_tx_group");
}
Copy the code

As a rule, when we look at a class, we look at its structure first. Like whose son is it? Where did it come from? Where does it want to go?

public class GlobalTransactionScanner extends AbstractAutoProxyCreator
    implements InitializingBean, ApplicationContextAware,DisposableBean {
}
Copy the code

Here we see that it is a subclass of AbstractautoXyCreator that implements the InitializingBean interface.

Both brothers are members of the Spring family, an AOP generation proxy for Spring; An initialization method used to invoke the Bean.

  • InitializingBean

There are three ways to initialize a Bean, in order: @postconstruct, afterPropertiesSet, and init-method.

Here, in its initialization method, it basically does three things.

private void initClient() {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Initializing Global Transaction Clients ... "); } //init TM initializes the transaction manager tmClient.init (applicationId, txServiceGroup); //init RM initializes the resource manager rmClient. init(applicationId, txServiceGroup); / / registered hook procedure for TM and RM resource cleanup registerSpringShutdownHook (); }Copy the code

So far, all three components defined by Seata have surfaced.

Tmclient.init is primarily a client that initializes the transaction manager, establishes a connection to the RPC server, and registers with the transaction coordinator.

Rmclient.init is the same procedure, initializing the resource manager, establishing a connection to the RPC server, and registering with the transaction coordinator.

Also, they are all connected by a timed task, so they can be automatically reconnected after a disconnection.

timerExecutor.scheduleAtFixedRate(new Runnable() {
	@Override
	public void run() {
	    clientChannelManager.reconnect(getTransactionServiceGroup());
	}
}, 5, 5, TimeUnit.SECONDS);
Copy the code

Finally, you register a hook program that cleans up resources in both components.

  • AbstractAutoProxyCreator

It is actually a Bean post processor, after the Bean initialization, call postProcessAfterInitialization method.

public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
    if(bean ! = null) { Object cacheKey = this.getCacheKey(bean.getClass(), beanName);if(this.earlyProxyReferences.remove(cacheKey) ! = bean) {returnthis.wrapIfNecessary(bean, beanName, cacheKey); }}return bean;
}
Copy the code

Then GlobalTransactionScanner. WrapIfNecessary () what did it do?

This is to check whether the Bean’s methods contain the GlobalTransactional and GlobalLock annotations, and then generate the proxy class.

protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey){
    if (disableGlobalTransaction) {
    	returnbean; } // The proxy has been generated and returns directlyif (PROXYED_SET.contains(beanName)) {
	    returnbean; } interceptor = null; // Check if you are TCC's agentif(TCCBeanParserUtils. IsTccAutoProxy (bean, beanName applicationContext)) {/ / TCC interceptor.  proxy bean of sofa:reference/dubbo:reference, and LocalTCC interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); }else{ Class<? > serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<? >[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); // Determine whether the Class method contains the GlobalTransactional annotation and GlobalLock annotationif(! existsAnnotation(new Class[] {serviceInterface}) && ! existsAnnotation(interfacesIfJdk)) {returnbean; } // Create an interceptorif(interceptor == null) { interceptor = new GlobalTransactionalInterceptor(failureHandlerHook); }} // If it is not an AOP proxy, create a proxy; If it is a proxy, the interceptor is added to the Advisorif(! AopUtils.isAopProxy(bean)) { bean = super.wrapIfNecessary(bean, beanName, cacheKey); }else {
    	AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
    	Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
    	for (Advisor avr : advisor) {
    		advised.addAdvisor(0, avr);
    	}
    }
    PROXYED_SET.add(beanName);
    return bean;
}
Copy the code

At this point, we know one thing for sure. Our ServiceImpl implementation of the GlobalTransactional annotated method on the class generates a proxy class.

When a method is called, the invoke() interceptor method of the proxy class is actually called.

public class GlobalTransactionalInterceptor implements MethodInterceptor { @Override public Object invoke(final MethodInvocation) throws invocation {// Get the target Class<? > targetClass = AopUtils.getTargetClass(methodInvocation.getThis()); / / get the Method called Method specificMethod = ClassUtils. GetMostSpecificMethod (the methodInvocation. GetMethod (), targetClass); final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); / / get in the way that annotations final GlobalTransactional globalTransactionalAnnotation = getAnnotation (method, GlobalTransactional. Class); final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class); // Handle global transactionsif(globalTransactionalAnnotation ! = null) {return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    	} else if(globalLockAnnotation ! = null) {return handleGlobalLock(methodInvocation);
    	} else {
    	    returnmethodInvocation.proceed(); }}}Copy the code

As you can see, this is the place to start processing global transactions. Let’s not go into that here, but let’s move on.

2. Data source agent

In addition to the proxy for the method created above, also create the proxy for the data source; Then set the proxy object to SqlSessionFactory.

@Bean
public DataSourceProxy dataSourceProxy(DataSource dataSource) {
    return new DataSourceProxy(dataSource);
}

@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
    SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
    sqlSessionFactoryBean.setDataSource(dataSourceProxy);
    sqlSessionFactoryBean.setTransactionFactory(new JdbcTransactionFactory());
    return sqlSessionFactoryBean.getObject();
}
Copy the code

Create DataSourceProxy and set it to SqlSessionFactory in Mybatis.

As we know, when Mybatis executes a method, it eventually creates a PreparedStatement object, and then executes ps.execute() to return the SQL result.

There are two things to note here:

  • The creation of a PreparedStatement

The PreparedStatement object is created from the Connection object, as we’ve probably all written:

PreparedStatement pstmt = conn.prepareStatement(insert ........)
Copy the code
  • The Connection of the create

Where does Connection come from? We don’t have to hesitate about this, of course, to get a connection from the data source.

But we have replaced the DataSource object with the DataSourceProxy object in Seata.

As a result, both the Connection and PreparedStatement are created as proxy objects in Seata.

Just look:

public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {
    public ConnectionProxy getConnection() throws SQLException {
    	Connection targetConnection = targetDataSource.getConnection();
    	returnnew ConnectionProxy(this, targetConnection); }}Copy the code

AbstractDataSourceProxy is then called to create a PreparedStatement.

public abstract class AbstractConnectionProxy implements Connection {
	
    @Override
    public PreparedStatement prepareStatement(String sql) throws SQLException {
        PreparedStatement targetPreparedStatement = getTargetConnection().prepareStatement(sql);
        returnnew PreparedStatementProxy(this, targetPreparedStatement, sql); }}Copy the code

Look at this, we should understand one thing.

In the execution of ps. The execute (), will call to PreparedStatementProxy. The execute ().

Once you’ve sorted out the logic behind the configuration file, you’ll probably have a sense of where to start when you look at the code.

4. Method implementation

Had said that above ServiceImpl is already a proxy class, so we see GlobalTransactionalInterceptor directly. The invoke ().

It will be called to TransactionalTemplate. The execute (), TransactionalTemplate template is the business logic and global affairs.

public class TransactionalTemplate { public Object execute(TransactionalExecutor business) throws Throwable { // 1. To create a global transaction GlobalTransaction tx = GlobalTransactionContext. GetCurrentOrCreate (); / / 1.1 for the transaction attribute Such as overtime time, name of the transaction TransactionInfo txInfo = business. GetTransactionInfo ();if (txInfo == null) {
    	    throw new ShouldNeverHappenException("transactionInfo does not exist"); } try {// 2. BeginTransaction (txInfo, tx); Object rs = null; Rs = business.execute(); } catch (Throwable ex) { // 3. Rollback completeTransactionAfterThrowing (txInfo, tx, the ex); throw ex; } // 4. CommitTransaction (tx);returnrs; } finally {//5. Clean up resources triggerAfterCompletion(); cleanUp(); }}}Copy the code

The code here is clear and the flow of the transaction is clear.

  1. Create a global transaction and set transaction properties
  2. Start a transaction
  3. Execute business logic
  4. If an exception occurs, the transaction is rolled back; Otherwise commit transaction
  5. Clean up resources

So let’s see what it does.

1. Start transactions

From the client’s point of view, to open a transaction is to tell the server: I want to open a global transaction, please transaction coordinator Mr. TC to allocate a global transaction ID to me.

Mr. TC creates a global Session based on the application name, transaction group, transaction name, etc., and generates a global transaction XID.

The client then records the current transaction state as Begin and binds the XID to the current thread.

2. Execute business logic

After starting the transaction, we start executing our own business logic. This is where the database operations come in. We said that Seata already projects the PreparedStatement object. So at the time of execution will call to PreparedStatementProxy. The execute ().

public class PreparedStatementProxy{
	
    public boolean execute() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<Boolean, PreparedStatement>() {
            @Override
            public Boolean execute(PreparedStatement statement, Object... args) throws SQLException {
                returnstatement.execute(); }}); }}Copy the code

In this case, it first generates different actuators based on the type of SQL. For example, an INSERT INTO statement is the InsertExecutor executor.

Then determine if the commit is automatic and execute the corresponding method. So executeAutoCommitFalse()

public abstract class AbstractDMLBaseExecutor{
	
    protected T executeAutoCommitFalse(Object[] args) throws Throwable {
    	TableRecords beforeImage = beforeImage();
    	T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    	TableRecords afterImage = afterImage(beforeImage);
    	prepareUndoLog(beforeImage, afterImage);
    	returnresult; }}Copy the code

This is what the FIRST phase of the AT pattern does, intercepting the business SQL and saving it as beforeImage before saving the data; The business SQL is then executed and saved as afterImage after the data is updated. These operations are all done in a local transaction, ensuring atomicity of one-stage operations.

Let’s take INSERT INTO as an example to see how it does this.

  • beforeImage

Since it is a new operation, this record does not exist before execution, and beforeImage is just an empty table record.

  • Business SQL

Execute the original SQL statement such as INSERT INTO ORDER(ID,NAME)VALUE(? ,?)

  • afterImage

All it has to do is retrieve the newly added record from the database.

Protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {// Find the primary key ID value List<Object> pkValues = containsPK() ? getPkValuesByColumn() : getPkValuesByAuto(); TableRecords afterImage = getTableRecords(pkValues);return afterImage;
}
Copy the code

BeforeImage and afterImage are then built into UndoLog objects and stored in the database. Importantly, these operations are all performed within the same local transaction. We can see this by looking at its sqlList.

Finally, let’s see what UndoLog’s record in the database looks like:

{
	"@class": "io.seata.rm.datasource.undo.BranchUndoLog"."xid": "192.168.216.1:8091:2016493467"."branchId": 2016493468,
	"sqlUndoLogs": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.undo.SQLUndoLog"."sqlType": "INSERT"."tableName": "t_order"."beforeImage": {
			"@class": "io.seata.rm.datasource.sql.struct.TableRecords$EmptyTableRecords"."tableName": "t_order"."rows": ["java.util.ArrayList", [the]]},"afterImage": {
			"@class": "io.seata.rm.datasource.sql.struct.TableRecords"."tableName": "t_order"."rows": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Row"."fields": ["java.util.ArrayList"[{"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "id"."keyType": "PrimaryKey"."type": 4."value": 116}, {"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "order_no"."keyType": "NULL"."type": 12."value": "c233d8fb-5e71-4fc1-bc95-6f3d86312db6"
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "user_id"."keyType": "NULL"."type": 12."value": "200548"
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "commodity_code"."keyType": "NULL"."type": 12."value": "HYD5620"
				}, {
					"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "count"."keyType": "NULL"."type": 4."value": 10}, {"@class": "io.seata.rm.datasource.sql.struct.Field"."name": "amount"."keyType": "NULL"."type": 8,
					"value": 5000.0}]]}]]}}]]}Copy the code

3, submit

If the execution of the business is normal, the two-phase commit is entered. The client sends a Commit event to the server and unbinds the XID.

After the server replies to confirm the submission, the client deletes the local UndoLog data.

Key here is the asyncWorker.init () method, which starts a timed task to execute doBranchCommits, which clear Log data.

4, rolled back

If an exception occurs, a two-phase rollback is performed.

First find the UnDoLog record through xID and branchId, and then parse the data inside to generate reverse SQL, and undo the execution result just now.

This piece of code is longer, everyone should reference UndoLogManager. Undo () and AbstractUndoExecutor executeOn () method.

5. How to associate Dubbo

Only one transaction manager TM can enable a global transaction, so how can other service participants be automatically included in a global transaction?

First of all, Seata to Dubbo make a Filter Filter called TransactionPropagationFilter.

It sets the XID in the Dubbo RPC context so that the XID can be retrieved in other services.

Then, we know that Seata has brokered the PreparedStatement. When performing data operations, there is a judgment.

if(! RootContext.inGlobalTransaction() && ! RootContext. RequireGlobalLock ()) {/ / if not contain XID, perform the original methodreturn statementCallback.execute(statementProxy.getTargetStatement(), args);
}
Copy the code

This means that if the current thread does not contain an XID, the original method is executed; If it does, proceed to the transaction method.

Five, the summary

This article Outlines how the client works in Seata TA mode. There is also a portion of Seata server-side logic that is not covered in depth in this article.

The reason lies in that the author has not completely understood this part of the content, can not be popular to write out, and so on later to fill ~

If the article has inaccurate place, also hope the friends not stingy comment, thank you.