Introduction to Seata flexible Transactions

Seata is a distributed transaction framework jointly developed by Alibaba Group and Ant Financial. The goal of its AT transaction is to provide incremental transaction ACID semantics under the microservices architecture, allowing developers to use distributed transactions as they use local transactions. The core concept is consistent with Apache ShardingSphere.

The Seata AT transaction model consists of TM (transaction manager), RM (resource manager) and TC (transaction Coordinator). TCS are independently deployed services. TM and RM are deployed with business applications in the form of JAR packages. They establish long connections with TCS and maintain remote communication throughout the transaction life cycle. TM is the initiator of global transactions and is responsible for starting, committing and rolling back global transactions. As a participant of global transactions, RM is responsible for reporting the execution results of branch transactions and submitting and rolling back branch transactions through coordination of TCS.

A typical lifecycle for distributed transactions managed by Seata:

  1. TM asks TC to start a brand new global transaction. TC generates an XID representing the global transaction.
  2. The XID runs through the entire invocation chain of the microservice.
  3. RM registers the local transaction as part of the global transaction under the TC to which the XID corresponds.
  4. TM requires TCS to commit or roll back global transactions corresponding to XIDS.
  5. All branch transactions under the global transaction corresponding to the TC driver XID are committed or rolled back.

Seata flexible transactions and ShardingSphere

When integrating Seata AT transactions, TM, RM and TC models need to be integrated into the distributed transaction ecosystem of Apache ShardingSphere. On the database resources, Seata connects to the DataSource interface so that JDBC operations can communicate with TCS remotely. Similarly, Apache ShardingSphere is also for DataSource interface, aggregating user-configured data sources. Therefore, by encapsulating a DataSource as a SeATa-based DataSource, Seata AT transactions can be integrated into the sharding ecosystem of Apache ShardingSphere.

SeataTansactionManager contrast

Start by comparing Seata and XA transactions

public final class SeataATShardingTransactionManager implements ShardingTransactionManager { private final Map<String, DataSource> dataSourceMap = new HashMap<>(); private final String applicationId; private final String transactionServiceGroup; private final boolean enableSeataAT; public SeataATShardingTransactionManager() { FileConfiguration config = new FileConfiguration("seata.conf"); enableSeataAT = config.getBoolean("sharding.transaction.seata.at.enable", true); applicationId = config.getConfig("client.application.id"); transactionServiceGroup = config.getConfig("client.transaction.service.group", "default"); } @Override public void init(final DatabaseType databaseType, final Collection<ResourceDataSource> resourceDataSources, final String transactionMangerType) { if (enableSeataAT) { initSeataRPCClient(); for (ResourceDataSource each : resourceDataSources) { dataSourceMap.put(each.getOriginalName(), new DataSourceProxy(each.getDataSource())); } } } private void initSeataRPCClient() { Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file."); TMClient.init(applicationId, transactionServiceGroup); RMClient.init(applicationId, transactionServiceGroup); } @Override public TransactionType getTransactionType() { return TransactionType.BASE; } @Override public boolean isInTransaction() { Preconditions.checkState(enableSeataAT, "sharding seata-at transaction has been disabled."); return null ! = RootContext.getXID(); }Copy the code

You can see that the biggest difference apart from the public inheritance method is initSeataRPCClient

private void initSeataRPCClient() {
    Preconditions.checkNotNull(applicationId, "please config application id within seata.conf file.");
    TMClient.init(applicationId, transactionServiceGroup);
    RMClient.init(applicationId, transactionServiceGroup);
}
Copy the code

TMClient initialization

The tmrPcclient.getInstance () method will fetch a TM client instance. During the fetching process, the Netty client configuration file object will be created. Create a messageExecutor thread pool for handling various message interactions with the server, and create ClientBootstrap for managing the start and stop of Netty services when creating TmRpcClient instances. And ClientChannelManager, which is dedicated to managing the Netty client object pool that is used in conjunction with the Netty part of Seata.

public final class TmRpcClient extends AbstractRpcRemotingClient { private TmRpcClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup, ThreadPoolExecutor messageExecutor) { super(nettyClientConfig, eventExecutorGroup, messageExecutor, TransactionRole.TMROLE); } public static TmRpcClient getInstance(String applicationId, String transactionServiceGroup) { TmRpcClient tmRpcClient = getInstance(); tmRpcClient.setApplicationId(applicationId); tmRpcClient.setTransactionServiceGroup(transactionServiceGroup); return tmRpcClient; } public static TmRpcClient getInstance() { if (null == instance) { Class var0 = TmRpcClient.class; synchronized(TmRpcClient.class) { if (null == instance) { NettyClientConfig nettyClientConfig = new NettyClientConfig();  ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(), 2147483647L, TimeUnit.SECONDS, new LinkedBlockingQueue(2000), new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(), nettyClientConfig.getClientWorkerThreads()), RejectedPolicies.runsOldestTaskPolicy()); instance = new TmRpcClient(nettyClientConfig, (EventExecutorGroup)null, messageExecutor); } } } return instance; }Copy the code

The TM client init() method is called, which eventually starts the Netty client (not really started at this point, but only when the object pool is called); Start a scheduled task that periodically resends RegisterTMRequest (the RM client sends RegisterRMRequest) to try to connect to the server. Specific logic is cached in the channels of NettyClientChannelManager client channel, if the channels do not exist for expired, Then it tries to connect to the server to retrieve the channel and cache it in channels; Start a separate thread for asynchronous request sending.

protected Function<String, NettyPoolKey> getPoolKeyFunction() {
    return (severAddress) -> {
        RegisterTMRequest message = new RegisterTMRequest(this.applicationId, this.transactionServiceGroup);
        return new NettyPoolKey(TransactionRole.TMROLE, severAddress, message);
    };
}
Copy the code

RMClient initialization

Next, rMRPcclient.getInstance processing logic is roughly the same as TM; One difference is ResourceManager

public void setResourceManager(ResourceManager resourceManager) {
    this.resourceManager = resourceManager;
}
Copy the code

ResourceManager is an RM ResourceManager. ResourceManager registers, submits, reports, and rolls back branch transactions, and queries global locks. DefaultResourceManager holds all RM resource managers and invokes them in a unified manner. The get() method loads the current resource manager, using a spI-like mechanism for flexible loading.

public class DefaultResourceManager implements ResourceManager { protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap(); private DefaultResourceManager() { this.initResourceManagers(); } public static DefaultResourceManager get() { return DefaultResourceManager.SingletonHolder.INSTANCE; } public static void mockResourceManager(BranchType branchType, ResourceManager rm) { resourceManagers.put(branchType, rm); } protected void initResourceManagers() { List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class); if (CollectionUtils.isNotEmpty(allResourceManagers)) { Iterator var2 = allResourceManagers.iterator(); while(var2.hasNext()) { ResourceManager rm = (ResourceManager)var2.next(); resourceManagers.put(rm.getBranchType(), rm); }}}Copy the code

EnhancedServiceLoader.load(ExtConfigurationProvider.class).provide(configuration); EnhancedServiceLoader is the core class for Seata SPI implementation. This line of code loads the class names specified in the files in the meta-inf /services/ and meta-inf/Seata/directories.

And NettyClientChannelManager NettyClientChannelManager this class integrated connection pooling mechanism and establish a new connection after the implementation of, become a connection manager. When according to the internal logic, need to send information to the server, will get connected through NettyClientChannelManager. Although NettyClientChannelManager has integrated a connection pool, but it is also the internal maintain the cache of the channel

private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap();
Copy the code

When it is called to retrieve a connection, it looks in its cache, verifies that the channel is valid, and returns the channel if it is. If the channel is invalid, or there is no cache, then the connection pool is delegated to establish a new connection. The details are in its acquireChannel method:

Channel acquireChannel(String serverAddress) { Channel channelToServer = (Channel)this.channels.get(serverAddress); if (channelToServer ! = null) { channelToServer = this.getExistAliveChannel(channelToServer, serverAddress); if (null ! = channelToServer) { return channelToServer; } } if (LOGGER.isInfoEnabled()) { LOGGER.info("will connect to " + serverAddress); } this.channelLocks.putIfAbsent(serverAddress, new Object()); synchronized(this.channelLocks.get(serverAddress)) { return this.doConnect(serverAddress); }}Copy the code

transaction

After initialization of TMClient and RMClient, SeATA’s transaction manager GlobalTransaction is ready, and the program moves to begin() and COMMIT.


/**
 * Seata transaction holder.
 */
@NoArgsConstructor(access = AccessLevel.PRIVATE)
final class SeataTransactionHolder {
    
    private static final ThreadLocal<GlobalTransaction> CONTEXT = new ThreadLocal<>();
    
    /**
     * Set seata global transaction.
     *
     * @param transaction global transaction context
     */
    static void set(final GlobalTransaction transaction) {
        CONTEXT.set(transaction);
    }
    
    /**
     * Get seata global transaction.
     *
     * @return global transaction
     */
    static GlobalTransaction get() {
        return CONTEXT.get();
    }
    
    /**
     * Clear global transaction.
     */
    static void clear() {
        CONTEXT.remove();
    }
}
Copy the code