This is the 18th day of my participation in the More text Challenge. For more details, see more text Challenge

>>>> 😜😜😜 Making: 👉 github.com/black-ant

A. The preface

The Seata Client request process is a simple process. The Seata Client request process is a simple process. The Seata Client request process is a simple process.

Each Seata global operation creates a Session and inserts transaction data into the table.

2. Global_table table

Take a look at the table structure of global_TABLE

CREATE TABLE `global_table` (
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `status` tinyint(4) NOT NULL,
  `application_id` varchar(32) DEFAULT NULL,
  `transaction_service_group` varchar(32) DEFAULT NULL,
  `transaction_name` varchar(128) DEFAULT NULL,
  `timeout` int(11) DEFAULT NULL,
  `begin_time` bigint(20) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`xid`),
  KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
  KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;


CREATE TABLE `branch_table` (
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `resource_group_id` varchar(32) DEFAULT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `branch_type` varchar(8) DEFAULT NULL,
  `status` tinyint(4) DEFAULT NULL,
  `client_id` varchar(64) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime(6) DEFAULT NULL,
  `gmt_modified` datetime(6) DEFAULT NULL,
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Server Session processing overview

We configure the request STORE_MODE with the boot parameter -m: seata-server.bat -m db

The entire Session will be processed for two operations, one for global_table and the other for branch_table.

Pro 1: Function of global_table Global_table is used to persist global transactions and can be configured using store.db.global.table

Pro 2: Branch_table function Branch_table is used to identify branch transactions and can be configured using store.db.branch

The data structure

# C-Insert global_table LogStoreDataBaseDAO # insertGlobalTransactionDO:INSERT INTO `seata`.`global_table` 
    ( `xid`, 
    `transaction_id`, 
    `status`, 
    `application_id`, 
    `transaction_service_group`, 
    `transaction_name`, 
    `timeout`, 
    `begin_time`, 
    `application_data`, 
    `gmt_create`, 
    `gmt_modified` )
VALUES
    ( '192.168.181.2:8091:8466916507467911205'.8466916507467911205.1.'business-seata-example'.'business-service-seata-service-group'.'dubbo-gts-seata-example'.300000.1624863673423.NULL.'the 2021-06-28 15:01:28'.'the 2021-06-28 15:01:28' );



# C- LogStoreDataBaseDAO # insertBranchTransactionDO  
INSERT INTO `seata`.`branch_table`
    (`branch_id`, 
    `xid`, 
    `transaction_id`, 
    `resource_group_id`,
    `resource_id`, 
    `branch_type`, 
    `status`, 
    `client_id`, 
    `application_data`,
    `gmt_create`, 
    `gmt_modified`) 
VALUES
    (8466916507467911829.'192.168.181.2:8091:8466916507467911205'.8466916507467911205.NULL.'the JDBC: mysql: / / 127.0.0.1:3306 / seata'.'AT'.0.'storage - seata - example: 192.168.181.2:51964'.NULL.'the 2021-06-28 15:35:18. 534107'.'the 2021-06-28 15:35:18. 534107');

Copy the code

3.1 Global_table Processing Process

After configuration STORE_MODE for db, can use DataBaseSessionManager and DataBaseTransactionStoreManager business processing

// The call entry created (ignore the pre-logic here, but start with the Session creation)C- AbstractSessionManager # onBegin C- DataBaseSessionManager # addGlobalSession C- DataBaseTransactionStoreManager # WriteSession (the type here is GLOBAL_ADD((byte)1))

Copy the code

As you can see from Step 1, when you add, you call writeSession. This is an important method, and almost all edit session operations go through this class. You can Debug this section

/ * * * DataBaseTransactionStoreManager this method in the global transaction and branch transaction management * * /
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
        // Insert global transaction
        return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
        // Update the global transaction
        return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
        // Delete the global transaction
        return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
        // Insert branch transaction
        return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
        // Update the branch transaction
        return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
        // Delete the branch transaction
        return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    } else {
            throw new StoreException("Unknown LogOperation:"+ logOperation.name()); }}Copy the code

[Pro31001] : Function and source of logOperation

LogOperation role:

LogOperation is an enumeration class that represents the type of operation

enum LogOperation {
    GLOBAL_ADD((byte)1),
    GLOBAL_UPDATE((byte)2),
    GLOBAL_REMOVE((byte)3),
    
    BRANCH_ADD((byte)4),
    BRANCH_UPDATE((byte)5),
    BRANCH_REMOVE((byte)6);

    private byte code;
}
Copy the code

LogOperation source:

When the process is called, the corresponding logoperation.code is passed in. For example, DataBaseSessionManager operation


C- DataBaseSessionManager
    M- addGlobalSession
        - transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
    M- updateGlobalSessionStatus
        - transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
    M- removeGlobalSession
        - transactionStoreManager.writeSession(LogOperation.GLOBAL_REMOVE, session)

Copy the code

3.2 Processing logic of branch_TABLE

//======== Here is the Beanch logicC - DataBaseTransactionStoreManager # writeSession (type of BRANCH_ADD ((byte)4))

// The final call has the following method
C- LogStoreDataBaseDAO 
    M- insertBranchTransactionDO  
    M- updateBranchTransactionDO
    M- deleteBranchTransactionDO
Copy the code

4. Session initialization process

4.1 Session Initialization

Step 1: Start the portal Server# main, where the Session is enabled

// In the server # main startup method, the following statement is called
SessionHolder.init(parameterParser.getStoreMode());
Copy the code

Step 2: SessionHolder init process

C- SessionHolder # init

  • Store. mode (DB, FILE, REDIS
  • Load the SessionManager using EnhancedServiceLoader#load
public static void init(String mode) {
    
    // Get the store.mode attribute in the configuration file
	if (StringUtils.isBlank(mode)) {
		mode = CONFIG.getConfig(ConfigurationKeys.STORE_MODE);
	}
    
    / / build StoreMode
	StoreMode storeMode = StoreMode.get(mode);
	if (StoreMode.DB.equals(storeMode)) {
        	// Basic session manager
            ROOT_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName());
        	// Asynchronous session manager
            ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[] {ASYNC_COMMITTING_SESSION_MANAGER_NAME});
    		// Retry submitting the session manager
            RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[] {RETRY_COMMITTING_SESSION_MANAGER_NAME});
		   // Roll back the session manager again
            RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader.load(SessionManager.class, StoreMode.DB.getName(),
                new Object[] {RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
	} else if (StoreMode.FILE.equals(storeMode)) {
          / /... omit
	} else if (StoreMode.REDIS.equals(storeMode)) {
          / /... omit
	} else {
		// unknown store
		throw new IllegalArgumentException("unknown store mode:" + mode);
	}
    // See the refresh operation at the end
	reload(storeMode);
}
Copy the code

Step 3: InnerEnhancedServiceLoader loading way

public static <S> S load(Class<S> service, String activateName) throws EnhancedServiceNotFoundException {
    / / SPI core: here is a simple call, lower here simply InnerEnhancedServiceLoader
	return InnerEnhancedServiceLoader.getServiceLoader(service).load(activateName, findClassLoader());
}


// getServiceLoader Obtains the ServiceLoader
private static <S> InnerEnhancedServiceLoader<S> getServiceLoader(Class<S> type) {
// The main thing to do is to get the entire collection from SERVICE_LOADERS. As you can see, the collection is created and cached every time it is empty
return (InnerEnhancedServiceLoader<S>)CollectionUtils.computeIfAbsent(SERVICE_LOADERS, type,
	key -> new InnerEnhancedServiceLoader<>(type));
}


Copy the code

[PRO:] InnerEnhancedServiceLoader 的作用 ?

InnerEnhancedServiceLoader is EnhancedServiceLoader inner class:// Pro: EnhancedServiceLoader functionEnhancedServiceLoader is the core Seata SPI implementation class. Seata implements Seata extensions through the SPI mechanism to make it compatible with multiple registries: M- load(Class<S> service, ClassLoader) : M-load (Class<S> service, String activateName) : M-load (Class<S> service, String activateName, String activateName, String activateName, String activateName, String activateName) M- load(Class<S> service, String activateName, Object[] args)// Load also provides the ability to load a set of services
	M- loadAll(Class<S> service)
	M- loadAll(Class<S> service, Class[] argsType, Object[] args)
        
// Pro: where SPI Server is storedSeata's Service class and Spring Factories are basically always in meta-inf.service, which provides the following configuration -> PIC30001// Pro: subclass of EnhancedServiceLoaderEnhancedServiceLoader has an inner class: C - InnerEnhancedServiceLoader, main effect to avoid multiple loads appeared unnecessary load InnerEnhancedServiceLoader provides the following parameters:/ / class corresponding InnerEnhancedServiceLoader collectionConcurrentMap<Class<? >, InnerEnhancedServiceLoader<? >> SERVICE_LOADERS =new ConcurrentHashMap<>();
// The Holder has a volatile argument inside to hold the object, ensuring that multiple threads can see it
Holder<List<ExtensionDefinition>> definitionsHolder = new Holder<>();
/ / ExtensionDefinition collection
ConcurrentMap<ExtensionDefinition, Holder<Object>> definitionToInstanceMap = new ConcurrentHashMap<>();
// Set of extensionDefinitions corresponding to name
ConcurrentMap<String, List<ExtensionDefinition>> nameToDefinitionsMap = new ConcurrentHashMap<>();
// ExtensionDefinition Indicates the ExtensionDefinition corresponding to the class typeConcurrentMap<Class<? >, ExtensionDefinition> classToDefinitionMap =new ConcurrentHashMap<>();

        
Copy the code

PIC30001: meta-INF. Service data

Step 4: Specify a classLoader to load the Server Provider

This unit is the main processing flow for

C- EnhancedServiceLoader
private S loadExtension(String activateName, ClassLoader loader, Class[] argTypes, Object[] args) {
	
    // activateName checks null, throwing an IllegalArgumentException for null
    try {
        // 1. Load all Extension objects from the configuration file (meta-INF)
        loadAllExtensionClass(loader);
        // 2. Get the ExtensionDefinition class data from the activation name
        ExtensionDefinition cachedExtensionDefinition = getCachedExtensionDefinition(activateName);
        // 3. Get the instance
        return getExtensionInstance(cachedExtensionDefinition, loader, argTypes, args);
    } catch (Throwable e) {
        / /... Exception handling omission}}Copy the code

Step 5: loadAllExtensionClass obtains all extensions from the configuration file

C- EnhancedServiceLoader
// 1. Determine and initiate loading
private List<Class> loadAllExtensionClass(ClassLoader loader) {
	List<ExtensionDefinition> definitions = definitionsHolder.get();
	if (definitions == null) {
		synchronized (definitionsHolder) {
			definitions = definitionsHolder.get();
			if (definitions == null) {
                  // Query all extensionDefinitions after locking to avoid thread conflictsdefinitions = findAllExtensionDefinition(loader); definitionsHolder.set(definitions); }}}return definitions.stream().map(def -> def.getServiceClass()).collect(Collectors.toList());
}

// 2. Load the process
private List<ExtensionDefinition> findAllExtensionDefinition(ClassLoader loader) {
    
    // Get the configuration from meta-inf. Service and meta-INF. Seata
	List<ExtensionDefinition> extensionDefinitions = new ArrayList<>();
	try {
		loadFile(SERVICES_DIRECTORY, loader, extensionDefinitions);
		loadFile(SEATA_DIRECTORY, loader, extensionDefinitions);
	} catch (IOException e) {
		throw new EnhancedServiceNotFoundException(e);
	}

	// Order the cache in order after loading all extensions -> nameToDefinitionsMap
	if(! nameToDefinitionsMap.isEmpty()) {for (List<ExtensionDefinition> definitions : nameToDefinitionsMap.values()) {
			Collections.sort(definitions, (def1, def2) -> {
				int o1 = def1.getOrder();
				int o2 = def2.getOrder();
				returnInteger.compare(o1, o2); }); }}// Sort the loaded extensionDefinitions
	if(! extensionDefinitions.isEmpty()) { Collections.sort(extensionDefinitions, (definition1, definition2) -> {int o1 = definition1.getOrder();
			int o2 = definition2.getOrder();
			return Integer.compare(o1, o2);
		});
	}

	return extensionDefinitions;
}


Copy the code

Step 6: getCachedExtensionDefinition BeanDefinition basic information

List
      
       > nameToDefinitionsMap; // Make sure you have the same definitoDefinitionSMap
      
private ExtensionDefinition getCachedExtensionDefinition(String activateName) {
	List<ExtensionDefinition> definitions = nameToDefinitionsMap.get(activateName);
	return CollectionUtils.getLast(definitions);
}
Copy the code

Step 7: Reflection performs initialization

// Start the process:
loadExtension -> getExtensionInstance -> createNewExtension

// The getExtensionInstance logic is simple. It determines whether it is a singleton and creates a singleton pattern

// createNewExtension Creates an instance
private S createNewExtension(ExtensionDefinition definition, ClassLoader loader, Class[] argTypes, Object[] args){ Class<? > clazz = definition.getServiceClass();try {
        S newInstance = initInstance(clazz, argTypes, args);
        return newInstance;
    } catch (Throwable t) {
        throw new IllegalStateException(....);
    }
}

// initInstance Initializes the instance
private S initInstance(Class implClazz, Class[] argTypes, Object[] args)
        throws IllegalAccessException, InstantiationException, NoSuchMethodException, InvocationTargetException {
    S s = null;
    if(argTypes ! =null&& args ! =null) {
        // Get the constructor parameters
        Constructor<S> constructor = implClazz.getDeclaredConstructor(argTypes);
        // If there are parameters, create the instance with them
        s = type.cast(constructor.newInstance(args));
    } else {
        // Create using the default constructor (no arguments)
        s = type.cast(implClazz.newInstance());
    }
    if (s instanceof Initialize) {
        // Core 7-1 instance init initialization
        ((Initialize)s).init();
    }
    return s;
}

Copy the code

Step 7-1: Init the DataBaseSessionManager

DataBaseSessionManager (DataBaseSessionManager);

public void init(a) {
    / / initialize DataBaseTransactionStoreManager
    transactionStoreManager = DataBaseTransactionStoreManager.getInstance();
}

// PS: All Initialize classes need to implement init methods
public interface Initialize {
    void init(a);
}


Copy the code

Step 7-2: Constructor operations



C- DataBaseTransactionStoreManager
    P- int DEFAULT_LOG_QUERY_LIMIT = 100;
    
private DataBaseTransactionStoreManager(a) {
        logQueryLimit = CONFIG.getInt(ConfigurationKeys.STORE_DB_LOG_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);
        // Get the Datasource type
        String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);
        // Initialize the dataSource
        DataSource logStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();
        / / build LogStoreDataBaseDAO
        logStore = new LogStoreDataBaseDAO(logStoreDataSource);
}


// [Pro] : Specifies the parameters for ConfigurationKeys
String STORE_DB_LOG_QUERY_LIMIT = STORE_DB_PREFIX + "queryLimit";

// [Pro] : DataSourceProvider implementation class
io.seata.server.store.DbcpDataSourceProvider
io.seata.server.store.DruidDataSourceProvider
io.seata.server.store.HikariDataSourceProvider


// Here we build a DataSource
@LoadLevel(name = "hikari")
public class HikariDataSourceProvider extends AbstractDataSourceProvider {

    @Override
    public DataSource generate(a) {
        Properties properties = new Properties();
        properties.setProperty("dataSource.cachePrepStmts"."true");
        properties.setProperty("dataSource.prepStmtCacheSize"."250");
        properties.setProperty("dataSource.prepStmtCacheSqlLimit"."2048");
        properties.setProperty("dataSource.useServerPrepStmts"."true");
        properties.setProperty("dataSource.useLocalSessionState"."true");
        properties.setProperty("dataSource.rewriteBatchedStatements"."true");
        properties.setProperty("dataSource.cacheResultSetMetadata"."true");
        properties.setProperty("dataSource.cacheServerConfiguration"."true");
        properties.setProperty("dataSource.elideSetAutoCommits"."true");
        properties.setProperty("dataSource.maintainTimeStats"."false");

        HikariConfig config = new HikariConfig(properties);
        config.setDriverClassName(getDriverClassName());
        config.setJdbcUrl(getUrl());
        config.setUsername(getUser());
        config.setPassword(getPassword());
        config.setMaximumPoolSize(getMaxConn());
        config.setMinimumIdle(getMinConn());
        config.setAutoCommit(true);
        config.setConnectionTimeout(getMaxWait());
        config.setInitializationFailTimeout(-1);
        return newHikariDataSource(config); }}Copy the code

Step 8 : reload 刷新 SessionManager

Not completely after executing the above logic, note that there is also a Reload operation at the end of Step 2, which will continue processing the DataBaseSessionManager

c- SessionHolder 

// This is the same as the previous attribute
private static SessionManager ROOT_SESSION_MANAGER;
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

protected static void reload(StoreMode storeMode) {
        if (ROOT_SESSION_MANAGER instanceof Reloadable) {
            ((Reloadable) ROOT_SESSION_MANAGER).reload();
        }
        
        // 
        Collection<GlobalSession> allSessions = ROOT_SESSION_MANAGER.allSessions();
        if (CollectionUtils.isNotEmpty(allSessions)) {
            List<GlobalSession> removeGlobalSessions = new ArrayList<>();
            Iterator<GlobalSession> iterator = allSessions.iterator();
            while (iterator.hasNext()) {
                GlobalSession globalSession = iterator.next();
                GlobalStatus globalStatus = globalSession.getStatus();
                // Use the attribute to determine the processing mode
                switch (globalStatus) {
                    case UnKnown:
                    case Committed:
                    case CommitFailed:
                    case Rollbacked:
                    case RollbackFailed:
                    case TimeoutRollbacked:
                    case TimeoutRollbackFailed:
                    case Finished:
                        removeGlobalSessions.add(globalSession);
                        break;
                    case AsyncCommitting:
                        if (storeMode == StoreMode.FILE) {
                            queueToAsyncCommitting(globalSession);
                        }
                        break;
                    default: {
                        // TODO: the principle here is the same as the Lock logic
                        if (storeMode == StoreMode.FILE) {
                            lockBranchSessions(globalSession.getSortedBranches());
                            // If none of the above, the branch transaction needs to be processed first
                            switch (globalStatus) {
                                case Committing:
                                case CommitRetrying:
                                    queueToRetryCommit(globalSession);
                                    break;
                                case Rollbacking:
                                case RollbackRetrying:
                                case TimeoutRollbacking:
                                case TimeoutRollbackRetrying:
                                    queueToRetryRollback(globalSession);
                                    break;
                                case Begin:
                                    globalSession.setActive(true);
                                    break;
                                default:
                                    throw new ShouldNeverHappenException("NOT properly handled "+ globalStatus); }}break; }}}for(GlobalSession globalSession : removeGlobalSessions) { removeInErrorState(globalSession); }}} C- If the Database is used as an example, run the following command: String ROOT_SESSION_MANAGER_NAME ="root.data";
String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data";
String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data";
String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data";

public Collection<GlobalSession> allSessions(a) {
        // get by taskName
    if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
        return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
    } else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
        return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying, GlobalStatus.Committing}));
    } else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
        return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
                GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
    } else {
        // all data
        return findGlobalSessions(new SessionCondition(newGlobalStatus[] { GlobalStatus.UnKnown, GlobalStatus.Begin, GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.AsyncCommitting})); }}Copy the code

[Pro] : Reloadable object system

public interface Reloadable {
    void reload(a);
}

// The implementation class is FileSessionManager

Copy the code

5. Expand your knowledge

5.1 Functions of LoadLevel

@LoadLevel(name = "file", scope = Scope.PROTOTYPE)

The LoadLevel annotation provides three parameters:@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface LoadLevel {

    String name(a);
    
    // The Provider can be sorted in a process similar to a chained call
    int order(a) default 0;

    Scope scope(a) default Scope.SINGLETON;
}

// Scope scope
public enum Scope {

    SINGLETON,
    PROTOTYPE

}


Copy the code

conclusion

The real use of LoadLevel and MATA-INF is to extend different databases, and we’ll see how to customize them after seATA is sorted out.

The initialization of the Session handler class is complete. Let’s take a look at the process of Session invocation and database processing