This is the 15th day of my participation in the More text Challenge. For more details, see more text Challenge

>>>> 😜😜😜 Making: 👉 github.com/black-ant

A. The preface

What does begiunTransaction do in begiunTransaction

2. Process sorting

The process is divided into several nodes:

  • Properties: TransactionInfo + GlobalLockConfig
  • Transaction part: beginTransaction + commitTransaction
  • Logical part: Execute + TransactionalExecutor

2.1 TransactionInfo details

> PS: what does the M52_02_01 TransactionInfo object contain? ```java C53- TransactionInfo F53_01-intF53_02 -string name: transaction name F53_03 -set <RollbackRule> rollbackRules: Rollback rule F53_04- Propagation Propagation F53_05-intLockRetryInternal: retry interval F53_06-intLockRetryTimes: retry times m53_01-rollbackON: m53_02-getPropagation ()public final class TransactionInfo implements Serializable {

     public boolean rollbackOn(Throwable ex) {

        RollbackRule winner = null;
        int deepest = Integer.MAX_VALUE;

        if (CollectionUtils.isNotEmpty(rollbackRules)) {
            winner = NoRollbackRule.DEFAULT_NO_ROLLBACK_RULE;
            for (RollbackRule rule : this.rollbackRules) {
                int depth = rule.getDepth(ex);
                if (depth >= 0&& depth < deepest) { deepest = depth; winner = rule; }}}return! (winnerinstanceof NoRollbackRule);
    }

    public Propagation getPropagation(a) {
        if (this.propagation ! =null) {
            return this.propagation;
        }
        //default propagation
        returnPropagation.REQUIRED; }}Copy the code

The role of Propagation?

Propagation is an enumeration indicating the Propagation modes of transactions. The modes are as follows:

  • REQUIRED: Execute with the current transaction if it exists, otherwise execute with a new transaction
  • REQUIRES_NEW: If the transaction exists, it is suspended and the business is executed using the new transaction.
  • NOT_SUPPORTED: If the transaction exists, suspend it and execute the business without the transaction
  • SUPPORTS: If the transaction does not exist, the global transaction will not be executed, otherwise the business of the current transaction will be executed
  • NEVER: Throw an exception if the transaction exists, otherwise execute the business without the transaction
  • MANDATORY: An exception is thrown if the transaction does not exist, otherwise the business associated with the current transaction is executed

2.2 GlobalLockConfig details

Object properties:

// Let's review the objects we've seen before
public class GlobalLockConfig {
    
    // Lock retry interval
    private int lockRetryInternal;
    // Lock retry times
    private int lockRetryTimes;
}
Copy the code

Logic processing:


// Let's look at the configuration again
private GlobalLockConfig replaceGlobalLockConfig(TransactionInfo info) {

    GlobalLockConfig myConfig = new GlobalLockConfig();
    myConfig.setLockRetryInternal(info.getLockRetryInternal());
    myConfig.setLockRetryTimes(info.getLockRetryTimes());
    // What do you want to do
    return GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
}


public class GlobalLockConfigHolder {

    // Key 1: thread storage
    private static ThreadLocal<GlobalLockConfig> holder = new ThreadLocal<>();

    // Key 2: Here is an interesting place to see, you can get Previous, and set Current
    public static GlobalLockConfig setAndReturnPrevious(GlobalLockConfig config) {
        GlobalLockConfig previous = holder.get();
        holder.set(config);
        returnprevious; }}Copy the code

[Pro] : Why is the key to getting the second in the previous Config

The previous GlobalLockConfig is obtained primarily for rollback purposes

GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
    / /... The transaction
} finally {
    resumeGlobalLockConfig(previousConfig);
    triggerAfterCompletion();
    cleanUp();
}

Copy the code

So the question is, I’ve already moved on to the next operation, why do I get the previous global lock?

When the first transaction obtains the global lock, other local transactions must acquire the global lock to execute, so the next transaction should focus on the previous global lock configuration.

If the previous global lock is not completed, the next transaction will not actually acquire a global lock.

private void resumeGlobalLockConfig(GlobalLockConfig config) {
    if(config ! =null) {
        GlobalLockConfigHolder.setAndReturnPrevious(config);
    } else{ GlobalLockConfigHolder.remove(); }}Copy the code

If ThreadLocal obtains the configuration of the current thread, my global lock is unique within a thread.

TODO: We’ll review this later when we look at global locking

2.3 beginTransaction Starts transactions

Having looked at the configuration information above, let’s take a look at transaction initiation

// There are three main steps >>>
private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    try {
        triggerBeforeBegin();
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw newTransactionalExecutor.ExecutionException(tx, txe,TransactionalExecutor.Code.BeginFailure); }}Copy the code

2.3.1 triggerBeforeBegin ()

// Start by looking at the main logic of the triggerM51_03- triggerBeforeBegin FOR- loops all transactionHooks: GetCurrentHooks -hook.beforeBegin () M51_04 -triggerAfterBegin FOR- loops all transactionhooks: getCurrentHooks - hook.afterBegin()// The core of both is to call the corresponding TransactionHook method, which brings two problems:- What is TransactionHook? - TransactionHook management?// [Pro1] : What is TransactionHook?TransactionHook is an interface that allows a process to be attached via a slot. Its main implementation class is TransactionHookAdapterpublic interface TransactionHook {

    /** * before tx begin */
    void beforeBegin(a);

    /** * after tx begin */
    void afterBegin(a);

    /** * before tx commit */
    void beforeCommit(a);

    /** * after tx commit */
    void afterCommit(a);

    /** * before tx rollback */
    void beforeRollback(a);

    /** * after tx rollback */
    void afterRollback(a);

    /** * after tx all Completed */
    void afterCompletion(a);
}

// Configure Hook manually. // Configure Hook manually.
public void testTransactionCommitHook(a) throws Throwable {
    TransactionHook transactionHook = Mockito.mock(TransactionHook.class);

    TransactionHookManager.registerHook(transactionHook);
    TransactionalTemplate template = new TransactionalTemplate();
    template.execute(transactionalExecutor);
}



[Pro2] :TransactionHook management?
private List<TransactionHook> getCurrentHooks(a) {
    // Manage transactionHooks through TransactionHookManager
    return TransactionHookManager.getHooks();
}

public final class TransactionHookManager {
    // Similarly, it is managed internally by a ThreadLocal
    private static final ThreadLocal<List<TransactionHook>> LOCAL_HOOKS = new ThreadLocal<>();
}





Copy the code

2.3.2 DefaultGlobalTransaction # begin Processing

Proceed to the core of the three steps: tx.begin(txinfo.getTimeout (), txinfo.getName ())

C52- DefaultGlobalTransaction        
    M52_01- begin(intTimeout, String name) -rootContext.getxid () : get currentXid -transactionManager.begin ()null.null, name, timeout) : transactionManager Starts management. -globalstatus. Begin: modify load. -RootContext.bind (xid) : bind transactionsID 
        
public void begin(int timeout, String name) throws TransactionException {
    if(role ! = GlobalTransactionRole.Launcher) { assertXIDNotNull();return;
    }
    assertXIDNull();
    // Step 1: Obtain the ID of the current transaction
    String currentXid = RootContext.getXID();
    if(currentXid ! =null) {
        throw new IllegalStateException("Global transaction already exists," +
                " can't begin a new global transaction, currentXid = " + currentXid);
    }
    // Step 2: call
    xid = transactionManager.begin(null.null, name, timeout);
    status = GlobalStatus.Begin;
    RootContext.bind(xid);
}
Copy the code

PS: What is RootContext?

RootContext is the RootContext, which is managed according to the current XID

C- RootContext
    F- ContextCore CONTEXT_HOLDER = ContextCoreLoader.load();
    F- BranchType DEFAULT_BRANCH_TYPE;
	M- bind(@NonnullString xid) - CONTEXT_HOLDER.put(KEY_XID, xid) ? - - FastThreadLocalContextCore CONTEXT_HOLDER for FastThreadLocalContextCore C hereprivate FastThreadLocal<Map<String, Object>> fastThreadLocal = new FastThreadLocal<Map<String, Object>>() {
    @Override
    protected Map<String, Object> initialValue(a) {
        return newHashMap<>(); }};Copy the code

2.3.3 TransactionManager details

// TransactionManager is finally used for management
C53- DefaultTransactionManager
    M53_01- begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        1- Build a new GlobalBeginRequest2- setTransactionName + setTimeout
        3- Call syncCall to start the transaction and receive it with GlobalBeginResponsepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout) 
    throws TransactionException {
    // Step 1: Build the Request
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // Step 2: Initiate a request
    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    // Step 3: Get Response
    return response.getXid();
}

    
Copy the code

GlobalBeginRequest and GlobalBeginResponse details

2.3.4 Remote call

The remote call is divided into two steps:

  • Step 1: syncCall initiates the remote call master logic
  • Step 2: Call sendSyncRequest(Object MSG) formally

Step 1: syncCall initiates the remote call master logic

private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
    try {
        // Make a Netty remote call via TmNettyRemotingClient
        return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
    } catch (TimeoutException toe) {
        throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe); }}Copy the code

Step 2: Call sendSyncRequest(Object MSG) formally

 public Object sendSyncRequest(Object msg) throws TimeoutException {
        String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
        int timeoutMillis = NettyClientConfig.getRpcRequestTimeout();
        RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

        // send messages in batches to basketMap
        if (NettyClientConfig.isEnableClientBatchSendRequest()) {

            // Sending a batch message is a synchronous request. You need to create a messageFuture and put it into futures
            MessageFuture messageFuture = new MessageFuture();
            messageFuture.setRequestMessage(rpcMessage);
            messageFuture.setTimeout(timeoutMillis);
            futures.put(rpcMessage.getId(), messageFuture);

            // put the information into basketMap
            BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                key -> new LinkedBlockingQueue<>());
            basket.offer(rpcMessage);
            if(! isSending) {synchronized(mergeLock) { mergeLock.notifyAll(); }}try {
                // Send the message
                return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {
                if (exx instanceof TimeoutException) {
                    throw (TimeoutException) exx;
                } else {
                    throw newRuntimeException(exx); }}}else {
            Channel channel = clientChannelManager.acquireChannel(serverAddress);
            return super.sendSync(channel, rpcMessage, timeoutMillis); }}Copy the code

2.4 commitTransaction commitTransaction commits a transaction

private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
        try {
            triggerBeforeCommit();
            tx.commit();
            triggerAfterCommit();
        } catch (TransactionException txe) {
            // 4.1 Failed to commit
            throw newTransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure); }}/ / remove the log, you can see one of the core code is transactionManager.com MIT (xid)
 public void commit(a) throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            return;
        }
        assertXIDNotNull();
        int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
        try {
            while (retry > 0) {
                try {
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex); }}}}finally {
            if(xid.equals(RootContext.getXID())) { suspend(); }}}@Override
public GlobalStatus commit(String xid) throws TransactionException {
    GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    globalCommit.setXid(xid);
    Commit also initiates syncCall
    GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    return response.getGlobalStatus();
}

Copy the code

2.5 the execute process

Rs = business.execute();

As we know from the previous article, business is an anonymous TransactionalExecutor object built in Inter

C52- TransactionalExecutor M52_01- execute M52_02- getTransactionInfo: Obtain the TransactionInfo object -> PS:M52_02_01// Step 1: Initiates the remote call
C- GlobalTransactionalInterceptor
return transactionalTemplate.execute(new TransactionalExecutor() {
    @Override
    public Object execute(a) throws Throwable {
        // Core statement, method proxy
        returnmethodInvocation.proceed(); }});// Step 2: AOP interceptionAs you can see here, this is actually consistent with the AOP logic and ends up implementing the method proxy through CglibAopProxyCopy the code

The methodInvocation details

conclusion

Rollback has been implemented on the Server side of the transaction