SeataTM services start with @@GlobalTransactional annotated methods as proxies

Start the global transaction

1.1 TM initiates a global transaction request

DefaultTransactionManager: : begin entry method, This method just builds the GlobalBeginRequest and sends it to the SEata-server via the nettyRPC synchronous call. There is a special topic to analyze ali’s encapsulation of NetTY-RPC

@Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }
Copy the code

1.2 the TC processes the request and returns the XID

Entry: IO. Seata. Server. The coordinator. DefaultCore# begin

@Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
            timeout);
  session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        session.begin();
        // transaction start event
        eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
            session.getTransactionName(), session.getBeginTime(), null, session.getStatus()));
        return session.getXid();
    }
Copy the code

Commit the global transaction

2.1 TM entrance methods: DefaultTransactionManager# commit build GlobalCommitRequest request and sent to seata – server

 @Override
    public GlobalStatus commit(String xid) throws TransactionException {
        GlobalCommitRequest globalCommit = new GlobalCommitRequest();
        globalCommit.setXid(xid);
        GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
        return response.getGlobalStatus();
}
Copy the code

2.2 TCS Process Global Submission Requests

@Override public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) { GlobalCommitResponse response = new GlobalCommitResponse(); response.setGlobalStatus(GlobalStatus.Committing); exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() { @Override public void execute(GlobalCommitRequest request, GlobalCommitResponse response) throws TransactionException { try { doGlobalCommit(request, response, rpcContext); // @1 } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e); } } @Override public void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response, TransactionException tex) { super.onTransactionException(request, response, tex); checkTransactionStatus(request, response); } @Override public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) { super.onException(request, response, rex); checkTransactionStatus(request, response); } }, request, response); return response; }Copy the code

Code @1: doGlobalCommit, the method that actually performs the commit

@Override public GlobalStatus commit(String xid) throws TransactionException { GlobalSession globalSession = SessionHolder.findGlobalSession(xid); if (globalSession == null) { return GlobalStatus.Finished; } globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); // just lock changeStatus boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> { // Highlight: Firstly, close the session, then no more branch can be registered. globalSession.closeAndClean(); // @1 if (globalSession.getStatus() == GlobalStatus.Begin) { if (globalSession.canBeCommittedAsync()) { // @2 // @3 Perform an asynchronous commit because TM initiated a global commit, So as long as asynchronous delete undo_log can / / IO seata. Server. The coordinator. DefaultCoordinator. HandleAsyncCommitting thread will constantly AsyncCommitting state Notice the RM can commit the transaction globalSession. AsyncCommit (); // @3 return false; } else {/ / @ 4 synchronization submitted, return true shouldCommit code below then submit globalSession synchronization. ChangeStatus mitting (GlobalStatus.Com); return true; } } return false; }); If (shouldCommit) {Boolean success = doGlobalCommit(globalSession, false); if (success && ! globalSession.getBranchSessions().isEmpty()) { globalSession.asyncCommit(); return GlobalStatus.Committed; } else { return globalSession.getStatus(); } } else { return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus(); }}Copy the code

Code @ 1: release branch executive SessionLifecycleListener lock. OnClose ()

Code @2: Check whether asynchronous commit is possible (XA/TCC not supported) AT mode support, all branch transactions must have performed local commit, can commit

Code @ 3: Perform asynchronous commit, since TM initiated a global commit, By IO. Seata. Server. The coordinator. DefaultCoordinator. HandleAsyncCommitting threads get AsyncCommitting state of the session to inform RM can commit the transaction

public void asyncCommit() throws TransactionException { this.addSessionLifecycleListener(SessionHolder.getAsyncCommittingSessionManager()); //@1 SessionHolder.getAsyncCommittingSessionManager().addGlobalSession(this); //@2 this.changeStatus(GlobalStatus.AsyncCommitting); / / @ 3}Copy the code

Code @1: Add a listener

Code @2: Commit to the asynchronous manager

Code @ 3: state is set to “submission”, and DefaultCoordinator handleAsyncCommitting will deal with it

After the main method of the Server class constructs the DefaultCoordinator, it starts many scheduled tasks, including asynchronous submissions

asyncCommitting.scheduleAtFixedRate(() -> {
            try {
                handleAsyncCommitting();
            } catch (Exception e) {
                LOGGER.info("Exception async committing ... ", e);
            }
        }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

Copy the code
protected void handleAsyncCommitting() { Collection<GlobalSession> asyncCommittingSessions = SessionHolder.getAsyncCommittingSessionManager() .allSessions(); if (CollectionUtils.isEmpty(asyncCommittingSessions)) { return; } for (GlobalSession asyncCommittingSession : asyncCommittingSessions) { try { // Instruction reordering in DefaultCore#asyncCommit may cause this situation if (GlobalStatus.AsyncCommitting ! = asyncCommittingSession.getStatus()) { // @1 continue; } asyncCommittingSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); core.doGlobalCommit(asyncCommittingSession, true); // @2 } catch (TransactionException ex) { LOGGER.error("Failed to async committing [{}] {} {}", asyncCommittingSession.getXid(), ex.getCode(), ex.getMessage(), ex); }}}Copy the code

The above code is fairly straightforward

Code @1: just process the GlobalSession GlobalSession in asyncresearch (right)

Code @ 2: Notify each session with DefaultCore#doGlobalCommit, which iterates through each branch of the global session, To the IO. Seata. Server. The coordinator. AbstractCore# branchCommit RPC notice sending

2.3 RM Performs Phase-2 Commit

As shown in the figure above, once RM receives the two-phase commit request, it only needs to do one thing: delete undolog

@Override public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException { if (! ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) { //@1 LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid); } return BranchStatus.PhaseTwo_Committed; }Copy the code

Code @ 1: here the producer consumer model, will be asked to offer to the queue, asynchronous processing (a crontab) IO. Seata. Rm. The datasource. AsyncWorker# doBranchCommits because this method is too long, here refers to the core

private void doBranchCommits() { if (ASYNC_COMMIT_BUFFER.isEmpty()) { return; } Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE); while (! ASYNC_COMMIT_BUFFER.isEmpty()) { //@1 Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll(); List<Phase2Context> contextsGroupedByResourceId = mappedContexts.computeIfAbsent(commitContext.resourceId, k -> new ArrayList<>()); contextsGroupedByResourceId.add(commitContext); } for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) { //@2 List<Phase2Context> contextsGroupedByResourceId = entry.getValue(); Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE); Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE); for (Phase2Context commitContext : contextsGroupedByResourceId) { xids.add(commitContext.xid); branchIds.add(commitContext.branchId); int maxSize = Math.max(xids.size(), branchIds.size()); if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) { UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn); / / @ 3}}}Copy the code

@1: Fetch all the data in the queue by resourceId (drainTo can be used instead)

Code @2: Iterate over grouped data

The @3 code simply removes undoLog

At this point the two-phase commit is complete