This is the 8th day of my participation in the August More Text Challenge. For details, see:August is more challenging

Introduction to the

Seata is alibaba’s open source distributed transaction solution middleware, which has little intrusion on business. In the application, Seata’s overall transaction logic is based on the two-phase commit model, and the core concept includes three roles:

  • TM: The transaction initiator. To tell the TC to start, commit, and roll back a global transaction.
  • RM: A transaction resource. Each RM is registered with a TC as a branch transaction.
  • TC: Transaction coordinator, a stand-alone seATa-server that receives transaction registrations, commits, and rollback.

Seata operates in two modes: AT and MT. There are other models, such as SAGA, that have not yet been explored.

AT (Auto Transaction) mode

This pattern requires the module to be in the Java language and the database to support local transactions. A typical distributed transaction process:

  • The TM requests the TC to start a global transaction. The global transaction creates and generates a globally unique XID.
  • The XID is propagated in the context of the microservice invocation link.
  • The RM registers the branch transaction with the TC and puts it under the jurisdiction of the xID-based global transaction.
  • TM initiates a global commit or rollback resolution for XID to the TC.
  • The TC schedules all branch transactions under the jurisdiction of XID to complete commit or rollback requests.

MT (Manual Transaction) mode

This pattern is suitable for other scenarios because the underlying storage may not have transaction support and need to implement the prepare, COMMIT, and ROLLBACK logic itself

Source code analysis

Reference juejin. Im/post / 684490…

Initialize the

The global two-phase commit is actually implemented through the proxy of the data source, which is a layer of proxy of the Druid data source in Seata

Two-phase commit

On the need to add the global transaction method, combined with GlobalTransactional annotations, Seata intercept in the global transaction is GlobalTransactionalInterceptor interceptor

@Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<? > targetClass = methodInvocation.getThis() ! = null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); if (! disable && globalTransactionalAnnotation ! = null) { return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (! disable && globalLockAnnotation ! = null) { return handleGlobalLock(methodInvocation); } else { return methodInvocation.proceed(); }}Copy the code

Call the handleGlobalTransaction method to open the global transaction; Otherwise, execute in the normal way. HandleGlobalTransaction method

private Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable {
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                return methodInvocation.proceed();
            }

            public String name() {
                String name = globalTrxAnno.name();
                if (!StringUtils.isNullOrEmpty(name)) {
                    return name;
                }
                return formatMethod(methodInvocation.getMethod());
            }

            @Override
            public TransactionInfo getTransactionInfo() {
                TransactionInfo transactionInfo = new TransactionInfo();
                transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                transactionInfo.setName(name());
                transactionInfo.setPropagation(globalTrxAnno.propagation());
                Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                    rollbackRules.add(new RollbackRule(rbRule));
                }
                for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                    rollbackRules.add(new NoRollbackRule(rbRule));
                }
                transactionInfo.setRollbackRules(rollbackRules);
                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        TransactionalExecutor.Code code = e.getCode();
        switch (code) {
            case RollbackDone:
                throw e.getOriginalException();
            case BeginFailure:
                failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case CommitFailure:
                failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case RollbackFailure:
                failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                throw e.getCause();
            case RollbackRetrying:
                failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause());
                throw e.getCause();
            default:
                throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));

        }
    }
}
Copy the code

The Execute method of TransactionalTemplate is called in the

public Object execute(TransactionalExecutor business) throws Throwable { // 1 get transactionInfo TransactionInfo txInfo  = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } / / 1.1 get or create a transaction GlobalTransaction tx. = GlobalTransactionContext getCurrentOrCreate (); // 2 Handle the Transaction propatation and the branchType Propagation = txinfo.getPropagation (); SuspendedResourcesHolder suspendedResourcesHolder = null; try { switch (propagation) { case NOT_SUPPORTED: suspendedResourcesHolder = tx.suspend(true); return business.execute(); case REQUIRES_NEW: suspendedResourcesHolder = tx.suspend(true); break; case SUPPORTS: if (! existingTransaction()) { return business.execute(); } break; case REQUIRED: break; case NEVER: if (existingTransaction()) { throw new TransactionException( String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s" ,RootContext.getXID())); } else { return business.execute(); } case MANDATORY: if (! existingTransaction()) { throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'"); } break; default: throw new TransactionException("Not Supported Propagation:" + propagation); } try { // 2. begin transaction beginTransaction(txInfo, tx); Object rs = null; try { // Do Your Business rs = business.execute(); } catch (Throwable ex) { // 3.the needed business exception to rollback. completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } // 4. everything is fine, commit. commitTransaction(tx); return rs; } finally { //5. clear triggerAfterCompletion(); cleanUp(); } } finally { tx.resume(suspendedResourcesHolder); }}Copy the code

The main steps in this method are as follows:

  1. Get the transaction information,
  2. Open the transaction
  3. Execution business method
  4. Commit transaction (no exception thrown)
  5. Roll back operation (throw exception)

BeginTransaction finally calls the Begin method of DefaultGlobalTransaction

@Override public void begin(int timeout, String name) throws TransactionException { if (role ! = GlobalTransactionRole.Launcher) { assertXIDNotNull(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid); } return; } assertXIDNull(); if (RootContext.getXID() ! = null) { throw new IllegalStateException(); } xid = transactionManager.begin(null, null, name, timeout); status = GlobalStatus.Begin; RootContext.bind(xid); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction [{}]", xid); }}Copy the code

The transactionManager.begin() method communicates with the server through TmRpcClient and generates an XID, which is then bound to the Root context. After the global transaction intercept is successful, the original business method is still executed, but because seATA proxies the data source, SQL parsing undolog is done in the proxy data source. Seata proxys not only data sources, but also Connection and Statement encapsulation. Parsing against SQL occurs in StatementProxy

@Override
public ResultSet executeQuery(String sql) throws SQLException {
    this.targetSQL = sql;
    return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
}
Copy the code

The execute method of the ExecuteTemplate class is finally executed:

public static <T, S extends Statement> T execute( List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (! shouldExecuteInATMode()) { // Just work as original statement return statementCallback.execute(statementProxy.getTargetStatement(), args); } if (sqlRecognizers == null) { sqlRecognizers = SQLVisitorFactory.get( statementProxy.getTargetSQL(), statementProxy.getConnectionProxy().getDbType()); } Executor<T> executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { executor = new PlainExecutor<>(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); switch (sqlRecognizer.getSQLType()) { case INSERT: executor = new InsertExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { rs = executor.execute(args); } catch (Throwable ex) { if (! (ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; }Copy the code

First determine whether there is a global transaction, do not execute in the global transaction according to the ordinary method, if in the global transaction will start to parse SQL, to different DML statement response processing, then call the execution method. The specific process is as follows:

  1. Check whether the global transaction is enabled. If not, do not use the agent and do not parse the SQL.
  2. Call SQLVisitorFactory to parse the target SQL.
  3. For specific types of SQL operations (INSERT, UPDATE, DELETE, SELECT_FOR_UPDATE), and other special resolution.
  4. Execute the SQL and return the result.

Key point is that certain types of actuators in the execute method (with InsertExecutor below the execute method, for example), call the superclass BaseTransactionalExecutor the execute method,

@Override
public T execute(Object... args) throws Throwable {
    if (RootContext.inGlobalTransaction()) {
        String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
    }
    statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
    return doExecute(args);
}
Copy the code

The XID is bound to the connectionProxy and the doExecute method is called, which in turn calls the doExecute method of its subclass, AbstractDMLBaseExecutor

@Override public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); }}Copy the code

The executeAutoCommitTrue method also sets the AutoCommit property to false to parse the SQL to generate undolog, preventing the database from being stored before undolog is generated.

protected T executeAutoCommitTrue(Object[] args) throws Throwable {
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    try {
        connectionProxy.setAutoCommit(false);
        return new LockRetryPolicy(connectionProxy).execute(() -> {
            T result = executeAutoCommitFalse(args);
            connectionProxy.commit();
            return result;
        });
    } catch (Exception e) {
        // when exception occur in finally,this exception will lost, so just print it here
        LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
        if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
            connectionProxy.getTargetConnection().rollback();
        }
        throw e;
    } finally {
        connectionProxy.getContext().reset();
        connectionProxy.setAutoCommit(true);
    }
}
Copy the code

ExecuteAutoCommitFalse (ARGS) in AbstractDMLBaseExecutor continues after AutoCommit is set to false

  protected T executeAutoCommitFalse(Object[] args) throws Exception {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    }
Copy the code

Note that this is a key method, executeAutoCommitFalse is executed in four main steps:

  1. Get snapshots of records before SQL execution beforeImage;
  2. Execute SQL;
  3. Obtain the afterImage snapshot after SQL execution;
  4. Based on beforeImage, afterImage generates an Undolog record and adds it to the context of the connectionProxy

The method of generating undolog is to record both beforeImage and afterImage after recording the lockKey

protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException { if (! beforeImage.getRows().isEmpty() || ! afterImage.getRows().isEmpty()) { ConnectionProxy connectionProxy = this.statementProxy.getConnectionProxy(); TableRecords lockKeyRecords = this.sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage; String lockKeys = this.buildLockKey(lockKeyRecords); connectionProxy.appendLockKey(lockKeys); SQLUndoLog sqlUndoLog = this.buildUndoItem(beforeImage, afterImage); connectionProxy.appendUndoLog(sqlUndoLog); } } protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) { SQLType sqlType = this.sqlRecognizer.getSQLType(); String tableName = this.sqlRecognizer.getTableName(); SQLUndoLog sqlUndoLog = new SQLUndoLog(); sqlUndoLog.setSqlType(sqlType); sqlUndoLog.setTableName(tableName); sqlUndoLog.setBeforeImage(beforeImage); sqlUndoLog.setAfterImage(afterImage); return sqlUndoLog; }Copy the code

The unDOLOG record will be unundo or delete through the UndoLogManager

try {
    // put serializer name to local
    setCurrentSerializer(parser.getName());
    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
    if (sqlUndoLogs.size() > 1) {
        Collections.reverse(sqlUndoLogs);
    }
    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
        TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
            conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
        sqlUndoLog.setTableMeta(tableMeta);
        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
            dataSourceProxy.getDbType(), sqlUndoLog);
        undoExecutor.executeOn(conn);
    }
} finally {
    // remove serializer name
    removeCurrentSerializer();
}
Copy the code

But before that happens, the records in the current database are compared to the records in the afterImage, and they need to be the same to proceed

/** * Data validation. * * @param conn the conn * @return return true if data validation is ok and need continue undo, and return false if no need continue undo. * @throws SQLException the sql exception such as has dirty data */ protected boolean dataValidationAndGoOn(Connection conn) throws SQLException { TableRecords beforeRecords = sqlUndoLog.getBeforeImage(); TableRecords afterRecords = sqlUndoLog.getAfterImage(); // Compare current data with before data // No need undo if the before data snapshot is equivalent to the after data snapshot. Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords); if (beforeEqualsAfterResult.getResult()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Stop rollback because there is no data change " + "between the before data snapshot and the after data snapshot."); } // no need continue undo. return false; } // Validate if data is dirty. TableRecords currentRecords = queryCurrentRecords(conn); // compare with current data and after image. Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords); if (! afterEqualsCurrentResult.getResult()) { // If current data is not equivalent to the after data, then compare the current data with the before // data, too. No need continue to undo if current data is equivalent to the before data snapshot Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords); if (beforeEqualsCurrentResult.getResult()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Stop rollback because there is no data change " + "between the before data snapshot and the current data snapshot."); } // no need continue undo. return false; } else { if (LOGGER.isInfoEnabled()) { if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) { LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams()); } } if (LOGGER.isDebugEnabled()) { LOGGER.debug("check dirty datas failed, old and new data are not equal," + "tableName:[" + sqlUndoLog.getTableName() + "]," + "oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," + "newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "]."); } throw new SQLException("Has dirty records when undo."); } } return true; }Copy the code

This shows that Seata’s distributed transactions are actually globally serial, which can cause performance problems in cases where hot resources are present.

Branch Transaction registration and Transaction Commit After the SQL and undolog are executed, the commit operation is performed in the ConnectionProxy connection

@Override
public void commit() throws SQLException {
    try {
        LOCK_RETRY_POLICY.execute(() -> {
            doCommit();
            return null;
        });
    } catch (SQLException e) {
        throw e;
    } catch (Exception e) {
        throw new SQLException(e);
    }
}

private void doCommit() throws SQLException {
    if (context.inGlobalTransaction()) {
        processGlobalTransactionCommit();
    } else if (context.isGlobalLockRequire()) {
        processLocalCommitWithGlobalLocks();
    } else {
        targetConnection.commit();
    }
}
Copy the code

If in a global transaction is invoked processGlobalTransactionCommit global transaction submission; If added a global lock comments call processLocalCommitWithGlobalLocks add global lock and submit (); In other cases, the transaction is committed directly.

private void processGlobalTransactionCommit() throws SQLException {
    try {
        register();
    } catch (TransactionException e) {
        recognizeLockKeyConflictException(e, context.buildLockKeys());
    }
    try {
        UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
        targetConnection.commit();
    } catch (Throwable ex) {
        LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
        report(false);
        throw new SQLException(ex);
    }
    if (IS_REPORT_SUCCESS_ENABLE) {
        report(true);
    }
    context.reset();
}
Copy the code

ProcessGlobalTransactionCommit method has the following steps:

  1. Register the branch transaction, binding the branchId branch to the context.

  2. If undolog is included, the undolog previously bound to the context is stored.

  3. Commit local transactions;

  4. If the operation fails, the first phase failure message is submitted by RM in report(); if the operation succeeds, the first phase success message is submitted by report()

    private void report(boolean commitDone) throws SQLException { if (context.getBranchId() == null) { return; } int retry = REPORT_RETRY_COUNT; while (retry > 0) { try { DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(), commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null); return; } catch (Throwable ex) { LOGGER.error(“Failed to report [” + context.getBranchId() + “/” + context.getXid() + “] commit done [” + commitDone + “] Retry Countdown: ” + retry); retry–;

    if (retry == 0) { throw new SQLException("Failed to report branch status " + commitDone, ex); }}}Copy the code

    }

Since the unDOLog entry and the execution of the business SQL call the same connection and are in the same transaction, this guarantees that the business SQL and undolog must be paired.

conclusion

The AT mode of Seata implements a traditional distributed transaction, which realizes the two-stage commit of each resource node in the form of automatic generation of Undolog. The benefits of this solution are that existing MySQL, PostgreSQL, and Oracle-based applications can quickly implement distributed transactions and do not require significant changes to existing code, but the disadvantages are that it is overall serial and does not address hotspot resource performance issues due to the additional cost of undolog processing. Seata adds unDOLog table in each submodule and uses the single transaction of node database to guarantee the atomicity of subtransaction and compensation information, which can be used for reference in distributed transaction design

Others: Distributed transaction modes include 2PC, TCC, SAGA, etc. SAGA has two forms of centralized orchestration and free orchestration. Besides Seata, distributed transaction frameworks include Axon, ServiceComb, etc.