With distributed transactions enabled, the next step is to execute the target method.

1. Implement the target method

1.1 SEATA proxy data source

One of the prerequisites for SeATA to implement distributed transactions is that we hand over the data source to SEATA for management.

In our seata AT mode source (a), mention seata springboot part of integration, injected SeataAutoDataSourceProxyCreator seata to container, completed the agent to the data source.

SeataAutoDataSourceProxyCreator is a BeanPostProcessor type of bean, inherited AbstractAutoProxyCreator class, Rewrite the shouldSkip method (which objects to be agents) and getAdvicesAndAdvisorsForBean method (said agent logic, that is, to add to the interceptors in the interceptor chain)

public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator { private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class); private final String[] excludes; / * * * representative agent logic of interceptor * / private final Advisor Advisor = new DefaultIntroductionAdvisor (new SeataAutoDataSourceProxyAdvice()); public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) { this.excludes = excludes; setProxyTargetClass(! useJdkProxy); } / / need to add to the interceptor in the chain of interceptors @ Override protected Object [] getAdvicesAndAdvisorsForBean (Class <? > beanClass, String beanName, TargetSource customTargetSource) throws BeansException { if (LOGGER.isInfoEnabled()) { LOGGER.info("Auto proxy of [{}]",  beanName); } return new Object[]{advisor}; } /** * which beans need to be skipped (without proxy) * @param beanClass * @param beanName * @return */ @override protected Boolean shouldSkip(Class<? > beanClass, String beanName) {/ / DataSource type of bean don't skip (agent) return SeataProxy. Class. IsAssignableFrom (beanClass) | |! DataSource.class.isAssignableFrom(beanClass) || Arrays.asList(excludes).contains(beanClass.getName()); }}Copy the code

As you can see, SeataAutoDataSourceProxyCreator agent DataSource type of bean, agent logic is SeataAutoDataSourceProxyCreator the interceptor.

public class SeataAutoDataSourceProxyAdvice implements MethodInterceptor, IntroductionInfo { @Override public Object invoke(MethodInvocation invocation) throws Throwable { /** * Add the mapping between dataSource and DataSourceProxy to the map * that is, when a DataSourceProxy is created for a dataSource *, a scheduled task is initialized. Refresh the metadata information of the data table. Here is not to expand) * / DataSourceProxy DataSourceProxy = DataSourceProxyHolder. The get (). PutDataSource ((DataSource) invocation.getThis()); Method method = invocation.getMethod(); Object[] args = invocation.getArguments(); Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes()); if (null ! Return m.invoke(dataSourceProxy, args) {return m.invoke(dataSourceProxy, args); } else { return invocation.proceed(); } } @Override public Class<? >[] getInterfaces() { return new Class[]{SeataProxy.class}; }}Copy the code

Note: The agent logic of the data source is entered only if the target method has operations on the database. Normally, when distributed transactions are enabled, there must be operations on the database.

2 Data source agent logic

Recall that all JDBC operations on databases are done through statements, and preparedStatements, which we’re more familiar with, are subinterfaces to statements. All created through Connection. Seata also provides a proxy for Connection (ConnectionProxy) and PreparedStatement (PreparedStatementProxy).

Public class DataSourceProxy extends AbstractDataSourceProxy implements Resource {/** * DataSource targetDataSource; / * * * seata packaging is carried out for the Connection object persistence layer to get the Connection is actually ConnectionProxy * * * in the superclass AbstractConnectionProxyseata ConnectionProxy PreparedStatementProxy * @return * @throws SQLException */ @Override Public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); }}Copy the code

In ConnectionProxy’s parent class, AbstractConnectionProxy, methods are provided to create StatementProxy and PreparedStatementProxy objects.

public abstract class AbstractConnectionProxy implements Connection { public Statement createStatement() throws SQLException { Statement targetStatement = getTargetConnection().createStatement(); return new StatementProxy(this, targetStatement); PreparedStatementProxy * @param SQL * @return * @throws SQLException */ @Override public PreparedStatement prepareStatement(String sql) throws SQLException { String dbType = getDbType(); // support Oracle 10.2+ PreparedStatement targetPreparedStatement = null; if (RootContext.inGlobalTransaction()) { List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType); if (sqlRecognizers ! = null && sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); if (sqlRecognizer ! = null && sqlRecognizer.getSQLType() == SQLType.INSERT) { String tableName = ColumnUtils.delEscape(sqlRecognizer.getTableName(), dbType); TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(), tableName, getDataSourceProxy().getResourceId()); targetPreparedStatement = getTargetConnection().prepareStatement(sql, new String[]{tableMeta.getPkName()}); } } } if (targetPreparedStatement == null) { targetPreparedStatement = getTargetConnection().prepareStatement(sql); } return new PreparedStatementProxy(this, targetPreparedStatement, sql); }}Copy the code

Next, look at the proxy logic in PreparedStatementProxy.

public class PreparedStatementProxy extends AbstractPreparedStatementProxy implements PreparedStatement, ParametersHolder {/** * Seata wraps the PreparedStatement object and calls PreparedStatementProxy#execute() * @return * @throws SQLException */ @override public Boolean execute() throws SQLException {// Directly invoke the execution template return ExecuteTemplate.execute(this, (statement, args) -> statement.execute()); }}Copy the code

ExecuteTemplate.execute

Public class ExecuteTemplate {/ / PreparedStatementProxy. Execute call is the method of public static < T, S extends Statement> T execute(StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { return execute(null, statementProxy, statementCallback, args); } public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... The args) throws SQLException {/ * * * RootContext inGlobalTransaction () to determine whether the operation of a distributed transaction Through the xid * RootContext. RequireGlobalLock () added GlobalLock note * determine whether/if (! RootContext.inGlobalTransaction() && ! RootContext.requireGlobalLock()) { // Just work as original statement return statementCallback.execute(statementProxy.getTargetStatement(), args); Return (sqlRecognizers == null) {sqlRecognizers = sqlvisitorFactory.get (** * recognizers == null) statementProxy.getTargetSQL(), statementProxy.getConnectionProxy().getDbType()); } Executor<T> executor; // If (collectionutils.isempty (sqlizers)) {executor = new PlainExecutor<>(statementProxy, recognizers); // If (collectionUtils.isempty) {executor = new PlainExecutor<>(statementProxy, recognizers) statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); InsertExecutor */ switch (sqlrecognizer.getsqlType ()) {case insert: executor = new InsertExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case UPDATE: executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<>(statementProxy, statementCallback); break; } } else { executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers); } } T rs; Try {/** * select statement and no for update: PlainExecutor#execute() * insert update delete, select for update: BaseTransactionalExecutor#execute() * */ rs = executor.execute(args); } catch (Throwable ex) { if (! (ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; }}Copy the code

2.1 Processing SQL with exclusive lock

Insert, update, delete, select the for upadte – > BaseTransactionalExecutor# execute

public abstract class BaseTransactionalExecutor<T, S extends Statement> implements Executor<T> { public T execute(Object... Args) throws Throwable {/ / save xid if (RootContext. InGlobalTransaction ()) {String xid. = RootContext getXID (); statementProxy.getConnectionProxy().bind(xid); } / / RootContext requireGlobalLock () is false for the first time statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); / / add bowdlerize = = > AbstractDMLBaseExecutor. The doExecute / / select for update = > SelectForUpdateExecutor. The doExecute return doExecute(args); }}Copy the code

2.1.1 Update Operations

Insert, update, delete -> AbstractDMLBaseExecutor#doExecute

public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> { public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); / / the default value is automatically submit the if (connectionProxy getAutoCommit () {return executeAutoCommitTrue (args); } else { return executeAutoCommitFalse(args); } } protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); Try {/ / set to manually submit connectionProxy setAutoCommit (false); / * * * the execute () : Execute (() -> {/** * Performs the pre-snapshot * performs the real business logic * Execute post-snapshot * Prepare undolog object */ T result = executeAutoCommitFalse(args); /** * commit * 1, register branch transaction * 2, register business SQL with undolog */ connectionProxy.com (); return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (! LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); }} protected T executeAutoCommitFalse(Object[] args) throws Exception {/** * INSERT Does not have a mirror * uodate has a mirror * delete has a mirror */ / perform image TableRecords before the front image TableRecords beforeImage = beforeImage(); / / perform their business logic T result = statementCallback. Execute (statementProxy. GetTargetStatement (), args); /** * insert with afterImage * delete without afterImage * update with afterImage */ / TableRecords afterImage = afterImage(beforeImage); PrepareUndoLog object prepareUndoLog(beforeImage, afterImage) prepareUndoLog object prepareUndoLog(beforeImage, afterImage); // Return result has not been submitted; }}Copy the code

AbstractDMLBaseExecutor.LockRetryPolicy#execute

private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy { public <T> T execute(Callable<T> Callable) throws Exception {// Read configurations in the configuration file. The default value is true. If (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {// Call method return of the parent class doRetryOnLockConflict(callable); } else {// Call callable directly return callable.call(); }}}Copy the code

ConnectionProxy.LockRetryPolicy

public static class LockRetryPolicy { protected static final boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = ConfigurationFactory .getInstance().getBoolean(ConfigurationKeys.CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT, DEFAULT_CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT); //todo: subclass overrides this method, Compare the logic of the overridden two methods public <T> T execute(Callable<T> Callable) throws Exception {if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) {lambda return callable.call(); } else { return doRetryOnLockConflict(callable); } } protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception { LockRetryController lockRetryController = new LockRetryController(); While (true) {try {// Actually calls the lambda expression return callable.call(); } catch (LockConflictException lockConflict) {** * Catch (LockConflictException lockConflict); / * * * retry mechanism Every time after dormancy m ms dormancy n times thrown exception * / lockRetryController. Sleep (lockConflict); } catch (Exception e) { onException(e); throw e; }}}}Copy the code

Callable.call () actually calls the lambda expression defined in the AbstractDMLBaseExecutor#executeAutoCommitTrue method

/** * Execute the pre-snapshot * execute the real business logic * execute the post-snapshot * prepare the undolog object */ T result = executeAutoCommitFalse(args); /** * commit * 1, register branch transaction * 2, register business SQL with undolog */ connectionProxy.com (); return result;Copy the code
public abstract class AbstractDMLBaseExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> {protected T executeAutoCommitFalse(Object[] args) throws Exception {/** * insert no pre-mirror -> InsertExecutor#beforeImage() * uodate with front-image -> UpdateExecutor#beforeImage() * delete with front-image -> DeleteExecutor#beforeImage() */ / perform image TableRecords before the front image TableRecords beforeImage = beforeImage(); / / perform their business logic T result = statementCallback. Execute (statementProxy. GetTargetStatement (), args); /** * insert with afterImage -> InsertExecutor#afterImage() * update with afterImage -> UpdateExecutor#afterImage() * delete with afterImage -> DeleteExecutor#afterImage() */ / Record TableRecords afterImage = afterImage(beforeImage); PrepareUndoLog object prepareUndoLog(beforeImage, afterImage) prepareUndoLog object prepareUndoLog(beforeImage, afterImage); // Return result has not been submitted; } protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException { if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) { return; } ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage; // buildLockKey String lockKeys = buildLockKey(lockKeyRecords); connectionProxy.appendLockKey(lockKeys); SQLUndoLog = buildUndoItem(beforeImage, afterImage); undolog SQLUndoLog = buildUndoItem(beforeImage, afterImage); / / will undolog temporary connectionProxy appendUndoLog (sqlUndoLog); }}Copy the code

ConnectionProxy#commit

public class ConnectionProxy extends AbstractConnectionProxy { private final static LockRetryPolicy LOCK_RETRY_POLICY = new LockRetryPolicy(); public void commit() throws SQLException { try { //ConnectionProxy.LockRetryPolicy.execute LOCK_RETRY_POLICY.execute(() -> {// commit doCommit(); return null; }); } catch (SQLException e) { throw e; } catch (Exception e) { throw new SQLException(e); } } private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { /** * GlobalTransaction */ processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { /** * GlobalLock */ processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); }} private void processGlobalTransactionCommit () throws SQLException {try {/ * * * * / registered branch affairs register (); } catch (TransactionException e) {/** * global lock conflict exception will be rolled back if a SqlException is thrown (TransactionalTemplate#execute) */ recognizeLockKeyConflictException(e, context.buildLockKeys()); } the try {/ * * * to insert data. * / UndoLogManagerFactory undolog table getUndoLogManager (enclosing getDbType ()). FlushUndoLogs (this); /** * commit local data with undolog insert data */ targetConnection.mit (); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); /** * Tell the SEata server if the local transaction commit fails */ report(false); throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) {// Reporting a phase to the seATA server report(true); } context.reset(); Private void register() throws TransactionException {if (! context.hasUndoLog() || context.getLockKeysBuffer().isEmpty()) { return; } // Register the branch transaction with the seATA server and pass the resource to the seATa-server. Judging by seata - server lock conflict Long branchId = DefaultResourceManager. The get (). BranchRegister (BranchType. Ats, getDataSourceProxy().getResourceId(), null, context.getXid(), null, context.buildLockKeys()); context.setBranchId(branchId); } / / global lock conflict judgment private void recognizeLockKeyConflictException (TransactionException te, String throws SQLException lockKeys) {/ / if it is a global lock conflict if (te, getCode () = = TransactionExceptionCode. LockKeyConflict) { StringBuilder reasonBuilder = new StringBuilder("get global lock fail, xid:"); reasonBuilder.append(context.getXid()); if (StringUtils.isNotBlank(lockKeys)) { reasonBuilder.append(", lockKeys:").append(lockKeys); } throw new LockConflictException(reasonBuilder.toString()); } else { throw new SQLException(te); Private void Report (Boolean commitDone) throws SQLException {if (context.getBranchId() == null) {return;  } int retry = REPORT_RETRY_COUNT; / / retry mechanism while (retry > 0) {try {/ / report one phase commit DefaultResourceManager success or failure. The get () branchReport (BranchType. Ats, context.getXid(), context.getBranchId(), commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null); return; } catch (Throwable ex) { LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done [" + commitDone + "] Retry Countdown: " + retry); retry--; if (retry == 0) { throw new SQLException("Failed to report branch status " + commitDone, ex); } } } } }Copy the code

2.1.2 Query Operations

select for upadte -> SelectForUpdateExecutor.doExecute

public class SelectForUpdateExecutor<T, S extends Statement> extends BaseTransactionalExecutor<T, S> { public T doExecute(Object... args) throws Throwable { Connection conn = statementProxy.getConnection(); // DatabaseMetaData DatabaseMetaData DBMD = conn.getmetadata (); T rs; Savepoint sp = null; LockRetryController lockRetryController = new LockRetryController(); boolean originalAutoCommit = conn.getAutoCommit(); ArrayList<List<Object>> paramAppenderList = new ArrayList<>(); Select id from t_order where username = 'ZWJ' for update String selectPKSQL = buildSelectSQL(paramAppenderList); try { if (originalAutoCommit) { /* * In order to hold the local db lock during global lock checking * set auto commit Value to false first if original auto commit was true */ / setAutoCommit to false conn.setautocommit (false); } else if (dbmd.supportsSavePoints ()) {* * In order to release the local db lock when global lock conflict * create a save point if original auto commit was false, then use the save point here to release db * lock during global lock checking if necessary */ sp = conn.setSavepoint(); } else { throw new SQLException("not support savepoint. please check your db version"); } while (true) {try {// #870 // execute Return Boolean // execute Return ResultSet // execute target SQL rs = statementCallback.execute(statementProxy.getTargetStatement(), args); // Try to get global lock of those rows selected // SQL TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList); String lockKeys = buildLockKey(selectPKRows); if (StringUtils.isNullOrEmpty(lockKeys)) { break; } the if (RootContext inGlobalTransaction ()) {/ / whether the current method with @ GlobalTransaction annotation / / do the as usual / * * * to seata server sends a request to check if the lock was taken */ statementProxy.getConnectionProxy().checkLock(lockKeys); } else if (RootContext.requireGlobalLock()) { //check lock key before commit just like DML to avoid reentrant lock problem(no xid thus can // not reentrant) statementProxy.getConnectionProxy().appendLockKey(lockKeys); } else { throw new RuntimeException("Unknown situation!" ); } break; } catch (LockConflictException lce) {if (sp! = null) { conn.rollback(sp); } else { conn.rollback(); } / / sleep after a period of time. Retry lockRetryController sleep (lce); } } } finally { if (sp ! = null) { try { conn.releaseSavepoint(sp); } catch (SQLException e) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("{} does not support release save point, but this is not a error.", getDbType()); } } } if (originalAutoCommit) { conn.setAutoCommit(true); } } return rs; } private String buildSelectSQL(ArrayList<List<Object>> paramAppenderList) {SQLSelectRecognizer recognizer  = (SQLSelectRecognizer)sqlRecognizer; StringBuilder selectSQLAppender = new StringBuilder("SELECT "); //select selectSQLAppender.append(getColumnNameInSQL(getTableMeta().getPkName())); // select id (primary key) selectsqlappender.appEnd (" FROM ").append(getFromTableInSQL()); // select id from t_order String whereCondition = buildWhereCondition(recognizer, paramAppenderList); if (StringUtils.isNotBlank(whereCondition)) { selectSQLAppender.append(" WHERE ").append(whereCondition); //select id from t_order where username = 'zwj' } selectSQLAppender.append(" FOR UPDATE"); ////select if from t_order where username = 'zwj' for update return selectSQLAppender.toString(); }}Copy the code

2.2 Handling unlocked SQL

Select statement without for update -> PlainExecutor#execute(

public class PlainExecutor<T, S extends Statement> implements Executor<T> { private StatementProxy<S> statementProxy; public T execute(Object... Args) throws Throwable {/ / directly to return statementCallback. Execute (statementProxy. GetTargetStatement (), args); }}Copy the code

Summarize the flow for executing the target method: Data source agent logic:

Create a rear mirror. -insert: there is no front mirror, but there is a rear mirror. -update: there is both a front mirror and a rear mirror. Seata-server will check for global lock conflicts. 6. Insert generated Undolog into unDOLOG tableCopy the code