We continue the previous TCC transaction model, today Xiao Ge “play” Seata TCC model core design principle code landing scheme, whether it is tuning or interview is very helpful.

Seata TCC model implementation architecture diagram

The core code of TCC transaction model architecture in the figure above is described as follows (the following code corresponds to the structure in the figure) :

TM core architecture code

TMClient client implementation

public final class TmRpcClient extends AbstractRpcRemotingClient {
 
    /** * Get transaction management client instance */
    public static TmRpcClient getInstance(String applicationId, String transactionServiceGroup) {
        TmRpcClient tmRpcClient = getInstance();
        // Set the application number
        tmRpcClient.setApplicationId(applicationId);
        // Set the TC cluster key
        tmRpcClient.setTransactionServiceGroup(transactionServiceGroup);
        return tmRpcClient;
    }

    /** * Get transaction management client instance */
    public static TmRpcClient getInstance(a) {
        if (null == instance) {
            synchronized (TmRpcClient.class) {
                if (null == instance) {
                    NettyClientConfig nettyClientConfig = new NettyClientConfig();
                    final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
                        nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
                        KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
                        new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
                            nettyClientConfig.getClientWorkerThreads()),
                        RejectedPolicies.runsOldestTaskPolicy());
                    instance = new TmRpcClient(nettyClientConfig, null, messageExecutor); }}}returninstance; }}Copy the code

TransactionManager TransactionManager contract

public interface TransactionManager {

    /** * start a new global transaction */
    String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException;

    /** * Commit global transaction */
    GlobalStatus commit(String xid) throws TransactionException;

    /** * rollback global transaction */
    GlobalStatus rollback(String xid) throws TransactionException;

    /** * gets the specified global transaction state */
    GlobalStatus getStatus(String xid) throws TransactionException;

    /** * Global transaction report */
    GlobalStatus globalReport(String xid, GlobalStatus globalStatus) throws TransactionException;
}

Copy the code

DefaultTransactionManager manager to achieve things

Pseudocode that provides core implementation points

public class DefaultTransactionManager implements TransactionManager {

    // Start global things
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
        // Build the global transaction request
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        // Synchronous send global transaction enabled
        GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }
  
    private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
        try {
            // Send protocol requests to the TC through the client
            return (AbstractTransactionResponse)TmRpcClient.getInstance().sendMsgWithResponse(request);
        } catch (TimeoutException toe) {
            throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe); }}}Copy the code

GlobalTransaction GlobalTransaction contract

public interface GlobalTransaction {

    /** * start global transaction */
    void begin(a) throws TransactionException;

    /** * start global transaction, specify timeout */
    void begin(int timeout) throws TransactionException;

    /** * starts the global transaction, specifying the transaction name */
    void begin(int timeout, String name) throws TransactionException;

    /** * Global transaction commit */
    void commit(a) throws TransactionException;

    /** * Global transaction rollback */
    void rollback(a) throws TransactionException;

    /** * get global transaction status */
    GlobalStatus getStatus(a) throws TransactionException;

    /** * global transaction number */
    String getXid(a);

    /** * Reports global transaction status */
    void globalReport(GlobalStatus globalStatus) throws TransactionException;

}
Copy the code

DefaultGlobalTransaction Global transaction implementation

public class DefaultGlobalTransaction implements GlobalTransaction {

    // Start the global transaction
    public void begin(a) throws TransactionException {
        begin(DEFAULT_GLOBAL_TX_TIMEOUT);
    }

    // Start the global transaction
    public void begin(int timeout) throws TransactionException {
        begin(timeout, DEFAULT_GLOBAL_TX_NAME);
    }

    // Start the global transaction
    public void begin(int timeout, String name) throws TransactionException {
        if(role ! = GlobalTransactionRole.Launcher) { check();return;
        }
        if(xid ! =null) {
            throw new IllegalStateException();
        }
        if(RootContext.getXID() ! =null) {
            throw new IllegalStateException();
        }
        // Start a global transaction through the transaction manager
        xid = transactionManager.begin(null.null, name, timeout);
        status = GlobalStatus.Begin;
        // The global context binds the global transaction number
        RootContext.bind(xid);

    }


    // Commit the global transaction
    public void commit(a) throws TransactionException {
        if (role == GlobalTransactionRole.Participant) {
            return;
        }
        if (xid == null) {
            throw new IllegalStateException();
        }
        int retry = COMMIT_RETRY_COUNT;
        try {
            // Maximum number of failed retries
            while (retry > 0) {
                try {
                    // Commit the transaction
                    status = transactionManager.commit(xid);
                    break;
                } catch (Throwable ex) {
                    retry--;
                    if (retry == 0) {
                        throw new TransactionException("Failed to report global commit", ex); }}}}finally {
            if(RootContext.getXID() ! =null&& xid.equals(RootContext.getXID())) { RootContext.unbind(); }}}... The other methods are basically the same, you can see the source code}Copy the code

The interceptor GlobalTransactionalInterceptor global transaction method

public class GlobalTransactionalInterceptor implements ConfigurationChangeListener.MethodInterceptor {
    
    // @globalTransaction annotation method interception
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        // Get the class object of the target methodClass<? > targetClass = methodInvocation.getThis() ! =null ? AopUtils.getTargetClass(methodInvocation.getThis())
            : null;
        // Get the method object
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        // Get the specific method
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
        // Get the GlobalTransactional annotation
        final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
        // Get the GlobalLock annotation
        final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
        // Global transactions are enabled and methods are marked with global transaction annotations
        if(! disable && globalTransactionalAnnotation ! =null) {
            // Make the global transaction call process
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if(! disable && globalLockAnnotation ! =null) { // Enable global transactions and the global lock annotation is not null
            // Run the global lock process
            return handleGlobalLock(methodInvocation);
        } else {
            // Plain method calls
            returnmethodInvocation.proceed(); }}Copy the code

RM Core architecture code

RMClient client core code

public class RMClient {

    /** * Resource management client initialization */
    public static void init(String applicationId, String transactionServiceGroup) {
        // Create a resource management client.
        RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
        // Set resource management
        rmRpcClient.setResourceManager(DefaultResourceManager.get());
        // Set the client message listener
        rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get(), rmRpcClient));
        // The resource client is initializedrmRpcClient.init(); }}Copy the code

ResourceManagerInbound Contract for receiving resource management messages

public interface ResourceManagerInbound {

    /** * Receive local transaction commit */ from TC
    BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;

    /** * Rollback of local transactions sent by TC */
    BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException;
}

Copy the code

ResourceManager Resource management contract

public interface ResourceManager extends ResourceManagerInbound.ResourceManagerOutbound {

    /** * Register resources */
    void registerResource(Resource resource);

    /** * Delete resource */
    void unregisterResource(Resource resource);

    /** * get all resources */
    Map<String, Resource> getManagedResources(a);

    /** * get transaction model type */
    BranchType getBranchType(a);
}
Copy the code

TCCResourceManager TCC transaction manager

public class TCCResourceManager extends AbstractResourceManager {
// Branch commit
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
                                     
        // Get the TCC resource entity
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        // Get the target object
        Object targetTCCBean = tccResource.getTargetBean();
        // Get the commit method
        Method commitMethod = tccResource.getCommitMethod();
 
        try {
            boolean result = false;
            // Get the BusinessActionContext object
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            // Commit the method call
            Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
            // Get the result of the commit method
            if(ret ! =null) {
                if (ret instanceof TwoPhaseResult) {
                    result = ((TwoPhaseResult)ret).isSuccess();
                } else {
                    result = (boolean)ret; }}// Returns the value of the two-phase commit result
            return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
        } catch (Throwable t) {
            // Fail to throw exception
            throw newFrameworkException(t, msg); }}//TCC branch type
    public BranchType getBranchType(a) {
        returnBranchType.TCC; }}Copy the code

TC core architecture code

RpcServer Transaction Coordinator (server side)


public class RpcServer extends AbstractRpcRemotingServer {
 
    
    /** * Synchronously sends the response */
    @Override
    public void sendResponse(RpcMessage request, Channel channel, Object msg) {
        Channel clientChannel = channel;
        // Non-heartbeat packets
        if(! (msginstanceof HeartbeatMessage)) {
            // Get the client channel
            clientChannel = ChannelManager.getSameClientChannel(channel);
        }
        if(clientChannel ! =null) {
            // If the client channel is not empty, send the response result
            super.defaultSendResponse(request, clientChannel, msg);
        } else {
            throw new RuntimeException("channel is error. channel:"+ clientChannel); }}/** * synchronously sends the request with the response body */
    @Override
    public Object sendSyncRequest(String resourceId, String clientId, Object message,
                                  long timeout) throws TimeoutException {
        Channel clientChannel = ChannelManager.getChannel(resourceId, clientId);
        if (clientChannel == null) {
            throw new RuntimeException("rm client is not connected. dbkey:" + resourceId
                + ",clientId:" + clientId);

        }
        return sendAsyncRequestWithResponse(null, clientChannel, message, timeout);
    }

    /** * synchronously sends requests with response objects */
    @Override
    public Object sendSyncRequest(Channel clientChannel, Object message) throws TimeoutException {
        return sendSyncRequest(clientChannel, message, NettyServerConfig.getRpcRequestTimeout());
    }

    /** * synchronously sends requests with response objects */
    @Override
    public Object sendSyncRequest(Channel clientChannel, Object message, long timeout) throws TimeoutException {
        if (clientChannel == null) {
            throw new RuntimeException("rm client is not connected");

        }
        return sendAsyncRequestWithResponse(null, clientChannel, message, timeout);
    }

    /** * asynchronously sends a request with a response object */
    @Override
    public Object sendSyncRequest(String resourceId, String clientId, Object message)
        throws TimeoutException {
        return sendSyncRequest(resourceId, clientId, message, NettyServerConfig.getRpcRequestTimeout());
    }

    /** * asynchronously sends a request with a response object */
    @Override
    public Object sendASyncRequest(Channel channel, Object message) throws TimeoutException {
        returnsendAsyncRequestWithoutResponse(channel, message); }}Copy the code

TransactionMessageHandler transaction coordinator receives the message processing contract

public interface TransactionMessageHandler {

    /** * receive the request message */
    AbstractResultMessage onRequest(AbstractMessage request, RpcContext context);

    /** * Receive the response message */
    void onResponse(AbstractResultMessage response, RpcContext context);

}

Copy the code

Your “like” is the best support and motivation for me.

Pay attention to xiao Ge play structure, follow-up efforts to launch high-quality content.

Chat 🏆 technology project stage v | distributed those things…