What is the Seata

Seata is alibaba’s recent open source distributed transaction framework, address: github.com/seata/seata. The framework includes TXC of the Group (cloud version is called GTS) and TCC of Ant Financial. The number of stars on Github has reached nearly 10,000 in just a few months, which is the only distributed transaction solution endorsed by a big factory at present.

TXC is also called AT mode in Seata, which means that the compensation method is automatically generated by the framework and completely shielded to users. Users can use distributed transactions as local transactions, but the disadvantage is that only relational databases (currently MySQL) are supported. Seata AT services require local table creation and storage rollback_info. The default isolation level RU is applicable to limited scenarios.

TCC is not a new concept. Users can define three methods try/confirm/ Cancel to simulate two-phase commit at the application level. The difference lies in that the try method in TCC also needs to operate on the database for resource lock, and the subsequent two compensation methods are automatically called by the framework for resource commit and rollback respectively. This is different from pure storage layer 2PC. Ant Financial contributed its TCC implementation to Seata, which is said to have evolved for more than a decade and is widely used in finance, trading, warehousing and other fields.

The birth background of distributed transactions

Early applications are all single architecture. For example, accounts, amounts and order systems involved in payment services are all taken charge of by a single application. The bottom layer accesses the same database instance, and natural transaction operations are also local transactions, which can be easily realized by Spring. But because of higher level, a single service to duty split into three separate services, through RPC calls, data also exists in the different database instance, because when a business operations involves the modification of multiple database data, can no longer rely on local transactions, can only be addressed by distributed transaction framework.

TCC is a solution of distributed transaction, which belongs to the flexible compensation type. Its advantages are simple to understand, and the concurrency performance of lock in the try phase is good. Its disadvantages are the cost of code transformation.

What is TCC? The concept of TCC itself is not complicated

Seata TCC usage method

Before analyzing the source code, let’s briefly mention how to use the Seata TCC pattern to help you understand the overall TCC process.

Seata TCC participants

The TCC mode in Seata requires participants of the TCC service to annotate the interface with @TwophaseBusinessAction annotations, specifying the name of the TCC interface (globally unique), the name of the CONFIRM and cancel methods of the TCC interface, For subsequent framework reflection calls, here is an example of a TCC interface:

public interface TccAction {
    @TwoPhaseBusinessAction(name = "yourTccActionName", commitMethod = "confirm", rollbackMethod = "cancel")
    public boolean try(BusinessActionContext businessActionContext, int a, int b);
    public boolean confirm(BusinessActionContext businessActionContext);
    public boolean cancel(BusinessActionContext businessActionContext);
}
Copy the code

An implementation class Impl is then defined to implement this interface, providing concrete implementations for the three methods. Finally, the participant service is published and registered to the remote end, mainly in order to allow the Seata framework to call the participant’s confirm or Cancel method to close the whole TCC transaction.

Seata TCC initiator

Seata TCC initiators are similar to the Payment Service in the figure above. Participants need to add the @GlobalTransactional annotation to their business methods to enable the section to register global transactions. The business methods call TCC participants’ try methods. Once the business method invocation is successful, the Seata framework notifies the TC to call back the confirm and Cancel methods of these participants.

Source code analysis

The source code for THE TCC schema in Seata is not complex and focuses on:

module class function
seata-spring GlobalTransactionalInterceptor.class Global transaction section logic, including registering global transaction, get XID
seata-spring TccActionInterceptor.class TCC participant section logic
seata-tcc TCCResourceManager.class Parse the TCC Bean and save the TCC Resources for subsequent callbacks
seata-tcc ActionInterceptorHandler.class TCC branch transaction registration implementation
seata-server DefaultCoordinator. Class, FileTransactionStoreManager. Class Mainly TC implementation, transaction storage and other implementation

Registered TCC Resources

A TCC interface in Seata is called a TCC Resource and has the following structure:

public class TCCResource implements Resource {

    private String resourceGroupId = "DEFAULT";

    private String appName;

    private String actionName; // TCC interface name

    private Object targetBean; // TCC Bean

    private Method prepareMethod; / / try method

    private String commitMethodName;

    private Method commitMethod; / / confirm method

    private String rollbackMethodName;

    private Method rollbackMethod; / / cancel method

    / /... omit
}
Copy the code

Seata in TCC application of resolution to the Bean, by parserRemotingServiceInfo method to generate a TCCResource object, Then call TCCResourceManager class registerResource method, save the TCCResource object to the local tccResourceCache, it is a ConcurrentHashMap structure, At the same time, information such as resourceId and Address of the TCCResource is registered with the server through RmRpcClient so that subsequent TCS can call back to the correct address through RPC.

// Parse part of the TCCResource codeClass<? > interfaceClass = remotingBeanDesc.getInterfaceClass(); Method[] methods = interfaceClass.getMethods();if(isService(bean, beanName)){
    try {
        // If it is a TCC service Bean, parse and register the resource
        Object targetBean = remotingBeanDesc.getTargetBean();
        for(Method m : methods){
            TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
            if(twoPhaseBusinessAction ! =null) {// If there are TCC participant annotations, define a TCCResource,
                TCCResource tccResource = new TCCResource();
                tccResource.setActionName(twoPhaseBusinessAction.name());
                // TCC Bean
                tccResource.setTargetBean(targetBean); 
                / / try method
                tccResource.setPrepareMethod(m); 
                // confirm method name
                tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
                // confirm method object
                tccResource.setCommitMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(), new Class[]{BusinessActionContext.class}));
                // Cancel method name
                tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
                // Cancel method object
                tccResource.setRollbackMethod(ReflectionUtil.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(), new Class[]{BusinessActionContext.class}));
                // Call the TCCResourceManager registerResource methodDefaultResourceManager.get().registerResource(tccResource); }}}catch (Throwable t){
        throw new FrameworkException(t, "parser remting service error"); }}Copy the code

Let’s look at the implementation of TCCResourceManager’s registerResource method:

// The mapping between the resourceId stored in memory and TCCResource
private Map<String, Resource> tccResourceCache = new ConcurrentHashMap<String, Resource>();

@Override
public void registerResource(Resource resource) {
    TCCResource tccResource = (TCCResource) resource;
    tccResourceCache.put(tccResource.getResourceId(), tccResource);
    // Call methods of the parent class registered with the remote through RPC
    super.registerResource(tccResource);
}
Copy the code

Let’s take a look at how TCCResource is registered with the server:

public void registerResource(Resource resource) {
    // Get the RmRpcClient instance and call its registerResource method
    RmRpcClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
}

public void registerResource(String resourceGroupId, String resourceId) {
    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("register to RM resourceId:" + resourceId);
    }
    synchronized (channels) {
        for (Map.Entry<String, Channel> entry : channels.entrySet()) {
            String serverAddress = entry.getKey();
            Channel rmChannel = entry.getValue();
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("register resource, resourceId:" + resourceId);
            }
            // Register a resourceId, which is resolved by the remote to an RpcContext and stored in memorysendRegisterMessage(serverAddress, rmChannel, resourceId); }}}Copy the code

GlobalTransaction registers global transactions

GlobalTransaction annotation is the entry of the global transaction, its plane logic implementation in GlobalTransactionalInterceptor class. If the method is determined to enter the @GlobalTransaction modifier, the handleGlobalTransaction method is called to enter the aspect logic, where the key method is the Execute method of the transactionalTemplate.

public Object execute(TransactionalExecutor business) throws Throwable {
    
    // If the upstream xID is already passed to indicate that it is the downstream, you can directly participate in the global transaction. There is no need to open a new one
    // If no xID is passed upstream, you are the initiator and start a new global transaction with the role of Launcher
    GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

    / /... ... omit

    try {

        // Enable global transactions
        beginTransaction(txInfo, tx);

        Object rs = null;
        try {

            // Call the business method
            rs = business.execute();

        } catch (Throwable ex) {

            // If an exception is thrown, tell the TC to roll back the global transaction
            completeTransactionAfterThrowing(txInfo,tx,ex);
            throw ex;
        }

        // If no exception is thrown, inform TC to commit global transaction
        commitTransaction(tx);

        return rs;
    } 

    / /... ... omit
}
Copy the code

The beginTransaction method calls the Begin method of transactionManager:

/ / the client
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // Send the RPC to obtain the XID from the TC
    GlobalBeginResponse response = (GlobalBeginResponse)syncCall(request);
    return response.getXid();
}

/ / the server
@Override
public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    throws TransactionException {
    // Global transactions are represented by GlobalSession
    GlobalSession session = GlobalSession.createGlobalSession(
        applicationId, transactionServiceGroup, name, timeout);
    session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // Write GlobalSession to the file store
    session.begin();
    // Return the UUID as the global transaction ID
    return XID.generateXID(session.getTransactionId());
}
Copy the code

TwoPhaseBusinessAction Registers branch transactions

When a global transaction calls a business method, it enters the TCC participant’s cut logic, which is mainly implemented in the TccActionInterceptor class. The key method is the actionInterceptorHandler’s proceed method.

public Map<String, Object> proceed(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
    
    / /... ... omit

    // Create a branch transaction
    String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
    actionContext.setBranchId(branchId);
    
    // Record method parametersClass<? >[] types = method.getParameterTypes();int argIndex = 0;
    for(Class<? > cls : types) {if (cls.getName().equals(BusinessActionContext.class.getName())) {
            arguments[argIndex] = actionContext;
            break;
        }
        argIndex++;
    }
    
    / /... ... omit
}
Copy the code

The doTccActionLogStore method is responsible for registering branch transactions:

/ / the client
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, BusinessActionContext actionContext) {
    String actionName = actionContext.getActionName();
    // Get the global transaction ID
    String xid = actionContext.getXid();
    
    / /... ... omit

    try {
        // resourceManager registers branch transactions with TCS through RPC
        Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid, applicationContextStr, null);
        // get the branch transaction ID returned by TC
        return String.valueOf(branchId);
    }

    / /... ... omit
}

/ / the server
@Override
public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
    GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);
    // Create a new BranchSession
    BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
        applicationData, lockKeys, clientId);

    if(! branchSession.lock()) {throw new TransactionException(LockKeyConflict);
    }
    try {
        // Add the branch transaction to the global transaction
        globalSession.addBranch(branchSession);
    } catch (RuntimeException ex) {
        throw new TransactionException(FailedToAddBranch);
    }
    // Returns the branch transaction ID
    return branchSession.getBranchId();
}
Copy the code

TC callback participant compensation method

After the branch transaction is registered and the business method is successfully invoked, the TC is notified to submit the global transaction.

@Override
public void commit(a) throws TransactionException {
    // If you are a participant, there is no need to initiate a submit request
    if (role == GlobalTransactionRole.Participant) {
        return;
    }
    // A request to commit a global transaction is made by TM to TC
    status = transactionManager.commit(xid);
}
Copy the code

After TC receives the COMMIT request from TM:

@Override
public GlobalStatus commit(String xid) throws TransactionException {
    // Find GlobalSession by xID
    GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    GlobalStatus status = globalSession.getStatus();

    // Close GlobalSession to prevent subsequent branch transactions from being registered
    globalSession.closeAndClean(); 

    if (status == GlobalStatus.Begin) {
        // Change the status to Commit in progress
        globalSession.changeStatus(GlobalStatus.Committing);
        // The TCC branch can also commit asynchronously, if high performance is required
        if (globalSession.canBeCommittedAsync()) {
            asyncCommit(globalSession);
        } else {
            doGlobalCommit(globalSession, false); }}return globalSession.getStatus();
}
Copy the code

DoGlobalCommit is the key method we focus on, and we ignore the secondary logic:

@Override
public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {
    for (BranchSession branchSession : globalSession.getSortedBranches()) {
        
        / /... ... omit

        try {
            // Call the branchCommit method of DefaultCoordinator to commit a branch
            // The branch transaction id and the resourceId are used to find the corresponding TCCResource and compensation method parametersBranchStatus branchStatus = resourceManagerInbound.branchCommit(branchSession.getBranchType(), XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(), branchSession.getResourceId(), branchSession.getApplicationData()); }}/ /... ... omit
}
Copy the code

The branchCommit method in the DefaultCoordinator class on the server side issues an RPC request to invoke the corresponding TCCResource provider:

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                    String applicationData)
    throws TransactionException {
    
    / /... ... omit
    // Get global and branch transactions
    GlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));
        BranchSession branchSession = globalSession.getBranch(branchId);
    // Find the corresponding channel and RpcContext based on the resourceId
    BranchCommitResponse response = (BranchCommitResponse)messageSender.sendSyncRequest(resourceId,
        branchSession.getClientId(), request);
    // Return the commit status of the branch transaction
    return response.getBranchStatus();

    / /... ... omit
}
Copy the code

The client naturally receives the RPC request submitted by the branch, and then locally finds out the previously resolved and retained TCCResource for the reflection call of compensation method. Below, we intercept the key steps for analysis.

@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
    // Find the remaining TCCResource object based on the resourceId
    TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
    if(tccResource == null) {throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
    }
    // Get the targetBean and the corresponding Method object
    Object targetTCCBean = tccResource.getTargetBean();
    Method commitMethod = tccResource.getCommitMethod();
    try {
        boolean result = false;
        // Retrieve the compensation method parameters
        BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData);
        // reflection calls the compensation method
        Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
        // Return status
        return result ? BranchStatus.PhaseTwo_Committed:BranchStatus.PhaseTwo_CommitFailed_Retryable;
    }
    / /... ... omit
}
Copy the code

Transaction store

As for how Seata TC module performs transaction storage, some articles on the Internet have already been described in detail, such as in-depth analysis of one-stop distributed transaction solution SeATa-Server, so it will not be described here.

It should be mentioned that TC may become the performance bottleneck of the whole distributed transaction service, so how to achieve high performance and high availability is very important. The current storage Mode is File, and there is also a TODO item about DB Store Mode in the code. Compared with DB, the performance of File will be better but the availability will be worse. How this is guaranteed will have to wait for the release of subsequent HA clusters.

conclusion

The source code of THE TCC part in the whole Seata framework is not complicated, this paper only selected some key codes in the class to display, ignoring some judgment logic and exception handling, the author thinks that Seata TCC on TCC exception encapsulation and custom processing, as well as the design of various user extension buried point is also worth looking at.

Ant SOFA Channel has previously made an explanation about Seata TCC sharing and mentioned that the difficulty of TCC framework lies not in itself, but in how to write a TCC interface. If you are interested in this part, you can click the link to learn more about it.

Write in the last

It was an irregularly updated literary trumpet dressed as a programmer. Sharing geeky technology and recording human pyrotechnics.