preface

Perhaps due to some business requirements, our system sometimes tends to connect to multiple databases, which creates the problem of multiple data sources.

Transactional annotation Transactional does not work and distributed transaction issues are involved when switching between multiple data sources automatically.

I’ve seen some examples of multi-source solutions on the web, but most of them are wrong, don’t work at all, or don’t work with transactions.

Today, we will analyze the causes of these problems and the corresponding solutions bit by bit.

1. Multiple data sources

For the story to run smoothly, we simulated the business of creating orders and reducing inventory.

So, let’s start by creating an order table and an inventory table. Notice, put them in two separate databases.

CREATE TABLE `t_storage` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

CREATE TABLE `t_order` (
  `id` bigint(16) NOT NULL,
  `commodity_code` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0', ` amount ` double (14, 2) DEFAULT'0.00',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

1. Database connection

Configure both databases using the YML file.

Spring: a datasource: ds1: jdbc_url: JDBC: mysql: / / 127.0.0.1:3306 / db1 username: root password: root ds2: jdbc_url: JDBC: mysql: / / 127.0.0.1:3306 / db2 username: root password: rootCopy the code

2. Configure DataSource

Mybatis requires a Connection to execute a SQL statement. At this point, it’s up to the Spring manager to fetch the connection from the DataSource.

A DataSource with routing functions in the Spring, it can call a different data sources by looking for key, this is AbstractRoutingDataSource.

Public abstract class AbstractRoutingDataSource {/ / a collection of data source @ Nullable private Map < Object, the Object > targetDataSources; @nullable private Object defaulttarDatasource; / / returns the current routing key, according to the different data source of the value returned @ Nullable protected the abstract Object determineCurrentLookupKey (); // Specify a protected DataSourcedetermineTargetDataSource() {/ / abstract methods Returns a routing key Object lookupKey = determineCurrentLookupKey (); DataSource dataSource = this.targetDataSources.get(lookupKey);returndataSource; }}Copy the code

As you can see, the core of this abstract class is to set up multiple data sources into the Map collection and then retrieve different data sources based on the Key.

So, we can rewrite the determineCurrentLookupKey method, it returns the name of a data source.

public class DynamicDataSource extends AbstractRoutingDataSource {
    @Override
    protected Object determineCurrentLookupKey() {
        DataSourceType.DataBaseType dataBaseType = DataSourceType.getDataBaseType();
        returndataBaseType; }}Copy the code

Then you need a utility class that holds the data source type for the current thread.

public class DataSourceType { public enum DataBaseType { ds1, Private static final ThreadLocal<DataBaseType> TYPE = new ThreadLocal<DataBaseType>(); // Set the data source type to the current thread public static voidsetDataBaseType(DataBaseType dataBaseType) {
        if(dataBaseType == null) { throw new NullPointerException(); } TYPE.set(dataBaseType); } // Get the data source type public static DataBaseTypegetDataBaseType() {
        DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.ds1 : TYPE.get();
        returndataBaseType; }}Copy the code

With that done, we need to configure the DataSource into the Spring container. The following configuration class does the following:

  • Create DataSource, DS1, DS2
  • Add DS1 and DS2 data sources to the DynamicDataSource;
  • Inject DynamicDataSource into SqlSessionFactory.
@configuration public class DataSourceConfig {/** * create multiple data sources ds1 and DS2 * Primary, set the priority of a Bean * @return
     */
    @Primary
    @Bean(name = "ds1")
    @ConfigurationProperties(prefix = "spring.datasource.ds1")
    public DataSource getDateSource1() {
        return DataSourceBuilder.create().build();
    }
    @Bean(name = "ds2")
    @ConfigurationProperties(prefix = "spring.datasource.ds2")
    public DataSource getDateSource2() {
        returnDataSourceBuilder.create().build(); } /** * Inject multiple data sources into DynamicDataSource * @param dataSource1 * @param dataSource2 * @return
     */
    @Bean(name = "dynamicDataSource")
    public DynamicDataSource DataSource(@Qualifier("ds1") DataSource dataSource1,
                                        @Qualifier("ds2") DataSource dataSource2) {
        Map<Object, Object> targetDataSource = new HashMap<>();
        targetDataSource.put(DataSourceType.DataBaseType.ds1, dataSource1);
        targetDataSource.put(DataSourceType.DataBaseType.ds2, dataSource2);
        DynamicDataSource dataSource = new DynamicDataSource();
        dataSource.setTargetDataSources(targetDataSource);
        dataSource.setDefaultTargetDataSource(dataSource1);
        returndataSource; } /** * inject the dynamicDataSource into SqlSessionFactory * @param dynamicDataSource * @return
     * @throws Exception
     */
    @Bean(name = "SqlSessionFactory")
    public SqlSessionFactory getSqlSessionFactory(@Qualifier("dynamicDataSource") DataSource dynamicDataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dynamicDataSource);
        bean.setMapperLocations(
                new PathMatchingResourcePatternResolver().getResources("classpath*:mapping/*.xml"));
        bean.setTypeAliasesPackage("cn.youyouxunyin.multipledb2.entity");
        returnbean.getObject(); }}Copy the code

3. Set routing keys

After the above configuration is complete, we also need to find a way to dynamically change the data source key value, which is related to the business of the system.

In this case, for example, we have two Mapper interfaces to create orders and subtract inventory.

public interface OrderMapper {
    void createOrder(Order order);
}
public interface StorageMapper {
    void decreaseStorage(Order order);
}
Copy the code

So, we can have a slice that cuts to data source DS1 when we perform an order operation and to data source DS2 when we perform an inventory operation.

@Component
@Aspect
public class DataSourceAop {
    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.OrderMapper.*(..) )")
    public void setDataSource1() {
        DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds1);
    }
    @Before("execution(* cn.youyouxunyin.multipledb2.mapper.StorageMapper.*(..) )")
    public void setDataSource2() { DataSourceType.setDataBaseType(DataSourceType.DataBaseType.ds2); }}Copy the code

4, test,

Now you can write a Service method and test it against the REST interface.

public class OrderServiceImpl implements OrderService {
    @Override
    public void createOrder(Order order) {
        storageMapper.decreaseStorage(order);
        logger.info("Inventory reduced, item code :{}, purchase quantity :{}. Create order...",order.getCommodityCode(),order.getCount()); orderMapper.createOrder(order); }}Copy the code

Not surprisingly, the tables in both databases have changed since the execution of the business.

But at this point, we’re reminded that these two operations need to be atomic. Therefore, we need to rely on transactions and annotate Transactional on the Service method.

If we add the Transactional annotation to the createOrder method and then run the code, an exception will be thrown.

### Cause: java.sql.SQLSyntaxErrorException: Table 'db2.t_order' doesn't exist
; bad SQL grammar []; nested exception is java.sql.SQLSyntaxErrorException: 
    Table 'db2.t_order' doesn't exist] with root cause
Copy the code

This means that if Spring transactions are added, our data source cannot be switched over. How did this happen?

Second, transaction mode, why can’t switch data sources

To understand why, we need to analyze what Spring transactions are doing if they are added.

As we know, Spring’s automatic transaction implementation is AOP based. When a method that contains a transaction is called, an interceptor is entered.

Public Class TransactionInterceptor{public Object invoke(MethodInvocation) throws Throwable {// Get the target class Class<? > targetClass = AopUtils.getTargetClass(invocation.getThis()); // transaction invocationreturninvokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }}Copy the code

1. Create transaction

In this case, the first step is to start creating a transaction.

protected Object doGetTransaction() {/ / DataSource transaction object DataSourceTransactionObject txObject = new DataSourceTransactionObject (); / / Settings are stored in the transaction txObject. SetSavepointAllowed (isNestedTransactionAllowed ()); ConnectionHolder ConnectionHolder conHolder = TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder,false);
    return txObject;
}
Copy the code

In this step, the important thing is that the transaction object is set to the ConnectionHolder property, which is still null.

2. Start transactions

The next step is to start a transaction that binds resources to the current transaction object using ThreadLocal, and then sets some transaction state.

protected void doBegin(Object txObject, TransactionDefinition definition) { Connection con = null; Connection newCon = obtainDataSource().getConnection(); / / reset the connectionHolder transaction object, already cited a connection txObject. SetConnectionHolder (new connectionHolder (newCon),true); / / the connectionHolder synchronous txObject marked and affairs. GetConnectionHolder () setSynchronizedWithTransaction (true);
    con = txObject.getConnectionHolder().getConnection();
    con.setAutoCommit(false); / / activate the transaction activity state txObject getConnectionHolder () setTransactionActive (true); // Bind the Connection holder to the current thread, via threadLocalif(txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } / / transaction manager, activate the transaction synchronization state TransactionSynchronizationManager. InitSynchronization (); }Copy the code

3. Run the Mapper interface

Once the transaction is started, the target class real method is executed. At this point, the proxy object for Mybatis will be entered. Ha ha, the framework, all kinds of agents.

Mybatis needs to obtain the SqlSession object before executing SQL.

public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, Extracted from PersistenceExceptionTranslator exceptionTranslator) {/ / the ThreadLocal SqlSessionHolder, For the first time to get less than empty SqlSessionHolder holder = TransactionSynchronizationManager. GetResource (sessionFactory); // If SqlSessionHolder is empty, SqlSession cannot be obtained; SqlSession session = sessionHolder(executorType, holder);if(session ! = null) {returnsession; } / / create a new SqlSession session. = sessionFactory openSession (executorType); // If the transaction of the current thread is active, Bind SqlSessionHolder to ThreadLocal registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);return session;
}
Copy the code

Get SqlSession, start to call Mybatis executor, ready to execute SQL statement. Before you can execute the SQL, of course, you need to get the Connection.

Public Connection getConnection() throws SQLException {// Obtain the Connection through the data source // For example, if we configure multiple data sources, the switch will be normalif (this.connection == null) {
        openConnection();
    }
    return this.connection;
}
Copy the code

Let’s look at the openConnection method, which gets a Connection from the data source. If we configure multiple data sources, we can switch normally at this point. If a transaction is added, the data source is not switched because, on the second call, this.connection! = null, returns the same connection as the previous one.

This is because, on the second SqlSession fetch, the current thread is fetched from ThreadLocal, so it does not reacquire the Connection.

At this point, we should understand why we cannot dynamically switch data sources if Spring transactions are added in the case of multiple data sources.

Here, the author interrupts an interview question:

  • How does Spring guarantee transactions?

That is, multiple business operations are committed or rolled back together in the same database connection.

  • How do you do that, all in one connection?

This is where ThreadlLocal is used to try to bind database resources to the current transaction.

Third, transaction mode, how to support switching data sources

Now that we’ve figured it out, let’s look at how to support it to dynamically switch data sources.

All else being the same, we need to create two different SQlsessionFactories.

@Bean(name = "sqlSessionFactory1")
public SqlSessionFactory sqlSessionFactory1(@Qualifier("ds1") DataSource dataSource){
    return createSqlSessionFactory(dataSource);
}

@Bean(name = "sqlSessionFactory2")
public SqlSessionFactory sqlSessionFactory2(@Qualifier("ds2") DataSource dataSource){
    return createSqlSessionFactory(dataSource);
}
Copy the code

Create a CustomSqlSessionTemplate to replace Mybatis’ sqlSessionTemplate with the two sqlsessionfactories defined above.

@Bean(name = "sqlSessionTemplate")
public CustomSqlSessionTemplate sqlSessionTemplate(){
    Map<Object,SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
    sqlSessionFactoryMap.put("ds1",factory1);
    sqlSessionFactoryMap.put("ds2",factory2);
    CustomSqlSessionTemplate customSqlSessionTemplate = new CustomSqlSessionTemplate(factory1);
    customSqlSessionTemplate.setTargetSqlSessionFactorys(sqlSessionFactoryMap);
    customSqlSessionTemplate.setDefaultTargetSqlSessionFactory(factory1);
    return customSqlSessionTemplate;
}
Copy the code

In the CustomSqlSessionTemplate definition, everything else is the same, depending on how to get the SqlSessionFactory.

public class CustomSqlSessionTemplate extends SqlSessionTemplate {
    @Override
    public SqlSessionFactory getSqlSessionFactory() {/ / the name of the current data source String currentDsName = DataSourceType. GetDataBaseType (). The name (); SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactorys.get(currentDsName);if(targetSqlSessionFactory ! = null) {return targetSqlSessionFactory;
        } else if(defaultTargetSqlSessionFactory ! = null) {return defaultTargetSqlSessionFactory;
        }
        returnthis.sqlSessionFactory; }}Copy the code

The point here is that we can get different SQlsessionFactories based on different data sources. If the SqlSessionFactory is not the same, it will not be retrieved from ThreadLocal when SqlSession is fetched, so it will be a new SqlSession object each time.

Since SqlSession is also different, the Connection is fetched from the dynamic data source every time.

That’s how it works. Let’s do it.

After modifying the configuration, we annotate the Service method with transaction annotations, and the data can be updated normally.

@Transactional
@Override
public void createOrder(Order order) {
    storageMapper.decreaseStorage(order);
    orderMapper.createOrder(order);
}
Copy the code

Being able to switch data sources is just the first step; we need guarantees that transactions can be guaranteed. If, in the code above, the inventory deduction completes but the order creation fails, the inventory is not rolled back. Because they belong to different data sources, they are not the same connection at all.

4. XA distributed transaction

To solve the above problem, we can only consider the XA protocol.

I don’t have much to say about what XA protocol is. All we need to know is that the MySQL InnoDB storage engine supports XA transactions.

The implementation of the XA protocol, then, is called Java Transaction Manager, or JTA, in Java.

How do you implement JTA? We use the Atomikos framework to introduce its dependencies.

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> < version > 2.2.7. RELEASE < / version > < / dependency >Copy the code

Then, simply change the DataSource object to the AtomikosDataSourceBean.

public DataSource getDataSource(Environment env, String prefix, String dataSourceName){
    Properties prop = build(env,prefix);
    AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
    ds.setXaDataSourceClassName(MysqlXADataSource.class.getName());
    ds.setUniqueResourceName(dataSourceName);
    ds.setXaProperties(prop);
    return ds;
}
Copy the code

When you get a Connection from MysqlXAConnection, you get the MysqlXAConnection object. When committing or rolling back, the XA protocol of MySQL is used.

Public void commit(Xid Xid, Boolean onePhase) throws XAException {// Encapsulates the XA Commit request StringBuildercommandBuf = new StringBuilder(300);
    commandBuf.append("XA COMMIT ");
    appendXid(commandBuf, xid); // execute XA transaction dispatchCommand(commandBuf.toString());
    } finally {
        this.underlyingConnection.setInGlobalTx(false); }}Copy the code

By introducing Atomikos and modifying the DataSource, in the case of multiple data sources, multiple databases can be rolled back even if an error occurs in the middle of a business operation.

Another question, should I use XA?

The XA protocol may seem relatively simple, but it also has some disadvantages. Such as:

  • Performance problems: all participants are blocked synchronously in the transaction submission stage, occupying system resources and easily leading to performance bottlenecks, which cannot meet high concurrency scenarios;
  • If the coordinator has a single point of failure, if the coordinator fails, the participant will remain locked;
  • Master/slave replication may result in inconsistent transaction state.

Some XA restrictions are listed in the MySQL documentation:

https://dev.mysql.com/doc/refman/8.0/en/xa-restrictions.html

In addition, the author in the actual project, in fact, has not used such a way to solve the problem of distributed transactions, this example is only to explore the feasibility of the scheme.

conclusion

By introducing SpringBoot+Mybatis multi-data source scenario, this paper analyzes the following problems:

  • Configuration and implementation of multiple data sources;
  • Spring transaction mode, the reasons and solutions of multi-data source failure;
  • Multi – data source, distributed transaction implementation based on XA protocol.

Due to space constraints, this example does not include all the code. If you need it, please go to GitHub and pick it up.

https://github.com/taoxun/multipledb2.git

The original is not easy, the guest officers point a praise and then go, this will be the author’s motivation to continue writing ~