preface

Recently when I looked at SpringBoot source code, I suddenly remembered the previous transaction mechanism and its transmission, so I seriously looked at the relevant source code, especially to share with you, and manual implementation of a simple version, the main sightseeing process is as follows.

The basic idea

The mechanism for SpringBoot transaction propagation is the binding of connections to transaction managers and the need to suspend, or unbind, previous transactions as new transactions occur. A new ConnectionHolder is created for REQUIRES_NEW. The ConnectionHolder is an owner of the current Connection and is used to retrieve the current Connection.

The source code parsing

First let’s take a look at the interceptor entry

public Object invoke(MethodInvocation invocation) throws Throwable {
	// Load the target classClass<? > targetClass = invocation.getThis() ! =null ? AopUtils.getTargetClass(invocation.getThis()) : null;
        // Load the method of the target class
        Method var10001 = invocation.getMethod();
        invocation.getClass();
        // Do some AOP processing of transactions (core)
        return this.invokeWithinTransaction(var10001, targetClass, invocation::proceed);
    }
Copy the code

Then we go to the invokeWithinTransaction method, where we don’t go if (txAttr! = null && tm instanceof CallbackPreferringPlatformTransactionManager) {this condition, Because our transaction management is not CallbackPreferringPlatformTransactionManager, so we entered the else

If a transaction is created, return the new transaction, otherwise the old transaction
TransactionAspectSupport.TransactionInfo txInfo = this.createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

            try {
            	// Intercepts the target method
                result = invocation.proceedWithInvocation();
            } catch (Throwable var17) {
            	// Handle exceptions after they occur, such as rollback
                this.completeTransactionAfterThrowing(txInfo, var17);
                throw var17;
            } finally {
            	// Clear the information about the current transaction and add the previous transaction
                this.cleanupTransactionInfo(txInfo);
            }
			// Commit the current transaction
            this.commitTransactionAfterReturning(txInfo);
            return result;
Copy the code

Here we see createTransactionIfNecessary mainly

if(txAttr ! =null && ((TransactionAttribute)txAttr).getName() == null) {
  	    // If the current transaction information is empty, the default is given
            txAttr = new DelegatingTransactionAttribute((TransactionAttribute)txAttr) {
                public String getName(a) {
                    returnjoinpointIdentification; }}; } TransactionStatus status =null;
        if(txAttr ! =null) {
            if(tm ! =null) {
                // Get the transaction object
                status = tm.getTransaction((TransactionDefinition)txAttr);
            } else if (this.logger.isDebugEnabled()) {
                this.logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); }}return this.prepareTransactionInfo(tm, (TransactionAttribute)txAttr, joinpointIdentification, status);
Copy the code

Let’s take a look at the getTransaction methods, since they are usually REQUIRED and REQUIRED_NEW, and the others are similar.

 // Determine if there is a transaction before, and if there is some processing
 if (this.isExistingTransaction(transaction)) {
            return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);
Copy the code
REQUIRES_NEW = REQUIRES_NEW = REQUIRES_NEW
if (definition.getPropagationBehavior() == 3) {
                if (debugEnabled) {
                    this.logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]");
                }
				// Suspend the current transaction
                suspendedResources = this.suspend(transaction);

                try {
                    newSynchronization = this.getTransactionSynchronization() ! =2;
                    // Create a new transaction state
                    DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    // Start transaction (core)
                    this.doBegin(transaction, definition);
                    // Do some property Settings and synchronization Settings for transaction management
                    this.prepareSynchronization(status, definition);
                    return status;
                } catch (Error | RuntimeException var7) {
                	// Resume pending transactions
                    this.resumeAfterBeginException(transaction, suspendedResources, var7);
                    throw var7;
                }
Copy the code

Let’s look at how the transaction pending, in to suspend method, click enter into doSuspend again, you can see DataSourceTransactionManager

protected Object doSuspend(Object transaction) {
		// Get the transaction object of the current thread
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        // Clear the connection of the current object because the transaction is suspended and the next transaction is dropped
        txObject.setConnectionHolder((ConnectionHolder)null);
        // Unbind the previous transaction
        return TransactionSynchronizationManager.unbindResource(this.obtainDataSource());
    }
Copy the code

The previous transaction is suspended, and the new transaction, connection and binding process proceed.

// Check whether the current thread already has a connection, because this is empty, that is, unbind there, here again verify
if(! txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {// Get a new connection
                Connection newCon = this.obtainDataSource().getConnection();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
		// Bind the new connection to the transaction object of the current thread
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            // Get the connection Holder
            con = txObject.getConnectionHolder().getConnection();
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            // Sets whether the current connection is readable
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            // Turn off auto commit
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }

                con.setAutoCommit(false);
            }
	    // Set the transaction to read-only
            this.prepareTransactionalConnection(con, definition);
            // Set transaction to take effect
            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = this.determineTimeout(definition);
            if(timeout ! = -1) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
	    // If the connection is new, it will be bound, because if it is not new, it is already bound, and an error will be reported if it is bound again
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
            }
Copy the code

The main reason why the REQUIRE_NEW rollback does not affect the parent method is because they are two different connections. In general, the whole idea is that if a new transaction needs to be created, the REQUIRE_NEW state, the new connection should bind the current transaction management. In this case, the child method is in the REQUIRE_NEW state. If an exception occurs and we roll back the parent method, it will not affect the parent method because we are binding the connection of the current child method.

Write one by hand

Connect related operation tool classes

/ * * *@author chujun
 * @version 1.0
 * @dateThe 2021-03-06 "* /
@Component
public class ConnectionHolderUtil {

    private static DataSource dataSource;


    private static final Logger  log =  LoggerFactory.getLogger(ConnectionHolderUtil.class);

    @Autowired
    public  void setDataSource(DataSource dataSource) {
        ConnectionHolderUtil.dataSource = dataSource;
    }

    private static ThreadLocal<ConnectionHolder> connectionHolderThreadLocal = new ThreadLocal<>();

    /** * get database connection *@return Connection
     */
    public static ConnectionHolder getConnectionHolder(boolean isNew){
        ConnectionHolder connectionHolder = connectionHolderThreadLocal.get();
        // There is no need to generate a new direct return if there is a connection
        if(connectionHolder ! =null && !isNew){
            return connectionHolder;
        }
        try {
            // Get a new connection
            Connection connection = dataSource.getConnection();

            // Turn off auto commit
            connection.setAutoCommit(false);
            connectionHolder = new ConnectionHolder(connection);
            connectionHolderThreadLocal.set(connectionHolder);

            // Bind the connection
            TransactionSynchronizationManager.bindResource(dataSource,connectionHolder);
            return connectionHolder;
        } catch (SQLException e) {
            log.error("Database connection acquisition failed",e);
            return null; }}/** * Commit transaction */
    public static void commit(a){
        ConnectionHolder connectionHolder = connectionHolderThreadLocal.get();
        if(connectionHolder == null) {return;
        }
        try {
            connectionHolder.getConnection().commit();
        } catch (SQLException e) {
            log.error("Submission failed",e); }}/** * transaction rollback */
    public static void rollback(a){
        ConnectionHolder connectionHolder = connectionHolderThreadLocal.get();
        if(connectionHolder == null) {return;
        }
        try {
            connectionHolder.getConnection().rollback();
        } catch (SQLException e) {
            log.error("Rollback failed",e); }}/** * close the connection */
    public static void close(a){
        ConnectionHolder connectionHolder = connectionHolderThreadLocal.get();
        if(connectionHolder == null) {return;
        }
        Connection connection = connectionHolder.getConnection();
        try {
            connection.close();
        } catch (SQLException e) {
            log.error("Failed to close database connection",e); }}/** * Resuming pending transactions */
    public static void resume(Object susPend){
        TransactionSynchronizationManager.unbindResource(dataSource);
        TransactionSynchronizationManager.bindResource(dataSource,susPend);
        connectionHolderThreadLocal.set((ConnectionHolder) susPend);
    }

    /** * suspend the current transaction */
    public static Object hangTrasaction(a){
        return TransactionSynchronizationManager.unbindResource(dataSource);
    }

    /** * Check whether the current connection is closed *@return* /
    public static boolean isClose(a){
        if(connectionHolderThreadLocal.get() == null) {return true;
        }
        try {
            return connectionHolderThreadLocal.get().getConnection().isClosed();
        } catch (SQLException e) {
            log.error("Failed to get connection status");
        }
        return true; }}Copy the code

AOP aspect interception

/ * * *@author chujun
 * @version 1.0
 * @dateThe 2021-02-28 protest * /
@Aspect
@Component
public class TransactionAspect{

    private static final Logger log = LoggerFactory.getLogger(TransactionAspect.class);

    @Pointcut("@annotation(com.cj.annotion.Transactional)")
    public void point(a){}

    @Around("point()")
    public Object transactionHandle(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Transactional transactional = methodSignature.getMethod().getAnnotation(Transactional.class);

        // Method execution result
        Object result = null;
        // The pending transaction
        Object supSend = null;

        boolean isNew = false;

        // If the transaction type is require require_new
        if(transactional.propagation() == Propagation.REQUIRED || transactional.propagation() == Propagation.REQUIRES_NEW)
        {
            // If the transaction type is require
            if(transactional.propagation() == Propagation.REQUIRED){
                // If there is a transaction, use the original transaction
                 ConnectionHolderUtil.getConnectionHolder(false);
            }
            // If the transaction type is require_new
            else {
                // Suspend the current transaction
                // Create a new connection transaction and bind to the transaction synchronization manager
                // Bind a new connection
                supSend = ConnectionHolderUtil.hangTrasaction();
                // Get a new connection
                isNew = true;
                ConnectionHolderUtil.getConnectionHolder(true);
            }
            try {
                // Execute the intercepting method
                result = joinPoint.proceed();
                // Commit the transaction
                if(!ConnectionHolderUtil.isClose()) {
                    ConnectionHolderUtil.commit();
                }
            }catch (Exception e){
                // Rollback the transaction
                if(!ConnectionHolderUtil.isClose()) {
                    ConnectionHolderUtil.rollback();
                }
            }finally {
                // Close the connection
                if(isNew) {
                    ConnectionHolderUtil.close();
                }
                 // Resume pending transactions
                if(supSend ! =null) { ConnectionHolderUtil.resume(supSend); }}}/ / otherwise
        else {
            log.info("Transaction type not currently supported");
        }
        log.info("Section execution completed");
        returnresult; }}Copy the code

Transaction annotations

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Transactional {

    Propagation propagation(a) default Propagation.REQUIRED;

    Class <? extends Throwable>[] rollBackFor() default {};
}
Copy the code

mode of transmission

public enum Propagation {
    REQUIRED(0),
    SUPPORTS(1),
    MANDATORY(2),
    REQUIRES_NEW(3),
    NOT_SUPPORTED(4),
    NEVER(5),
    NESTED(6);

    private final int value;

    private Propagation(int value) {
        this.value = value;
    }

    public int value(a) {
        return this.value; }}Copy the code

Call the client

@Service
public class BlogService {

    @Autowired
    private BlogDao blogDao;

    @Transactional(propagation = Propagation.REQUIRED)
    public void testMyTransaction(Blog blog){
        blogDao.insertBlog(blog);
        BlogService blogService  = (BlogService) AopContext.currentProxy();
        try {
          blogService.testMyTransaction2(blog);
        }catch(Exception e){ e.printStackTrace(); }}@Transactional(propagation = Propagation.REQUIRES_NEW)
    public void testMyTransaction2(Blog blog){
        blogDao.insertBlog(blog);
        int a = 1/0; }}Copy the code

As shown above, if testMyTransaction2 fails at this point, testMyTransaction will not be affected because they are different transactions.

conclusion

Spring transaction management is also used, I have not been in touch with this for a long time, but I can only use it before, after these days of interview, I found that learning things must be in-depth to understand, can only use can not. ConnectionHolder: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal: ThreadLocal Last but not least, thank you for browsing and feel free to point out any mistakes.