www.iocoder.cn/Sharding-JD… “Taro source” welcome to reprint, keep the summary, thank you!

This article is based on Sharding-JDBC 1.5.0 official release

  • 1. An overview of the
  • 2. ExecutorEngine
    • 2.1 ListeningExecutorService
    • 2.2 shut down
    • 2.3 Executing SQL Tasks
  • 3. Executor
    • 3.1 StatementExecutor
    • 3.2 PreparedStatementExecutor
    • 3.3 BatchPreparedStatementExecutor
  • 4. ExecutionEvent
    • 4.1 EventBus
    • 4.2 BestEffortsDeliveryListener
  • 666. The eggs

🙂🙂🙂 follow wechat public number:

  1. RocketMQ/MyCAT/Sharding-JDBC all source code analysis article list
  2. RocketMQ/MyCAT/Sharding-JDBC 中文 解 决 source GitHub address
  3. Any questions you may have about the source code will be answered carefully. Even do not know how to read the source can also ask oh.
  4. New source code parsing articles are notified in real time. It’s updated about once a week.
  5. Serious source communication wechat group.

1. An overview of the

After a long journey (SQL parsing, SQL routing, SQL rewriting), we finally came to SQL execution. Is it a good idea? !

This article mainly shares the process of SQL execution, not the aggregation of results. “Results polymerization” east half of the second conscience of the author will update, pay attention to wechat public number [taro road source] after the completion of the first time to inform you yo.

The SQL in green boxes executes the main flow.


Sharding-JDBC is collecting a list of companies that use: Portals. 🙂 Your registration will allow more people to participate and use Sharding-JDBC. The portal Sharding-JDBC will therefore be able to cover more business scenarios. Sign up for the portal, Slut! portal

2. ExecutorEngine

ExecutorEngine, SQL execution engine.

In this case, the amount of SQL that needs to be executed is changed from single SQL to multiple SQL. In this case, there are two ways to execute SQL:

  • Serial execution of SQL
  • Parallel execution of SQL

The former is easy to encode but poor in performance. The total time is the sum of multiple SQL execution times. The latter, with complex coding and better performance, takes about the same total time as the longest SQL execution.

👼 ExecutorEngine of course uses the latter, executing SQL in parallel.

2.1 ListeningExecutorService

Guava(Java tool library) provides a thread service interface inherited from ExecutorService that provides ListenableFuture creation. ListenableFuture interface, inheriting the Future interface, has the following benefits:

We strongly recommend that you use ListenableFuture instead of JDK futures in your code because:

  • It is required in most Futures methods.
  • Switching to ListenableFuture programming is easier.
  • The generic public classes provided by Guava encapsulate the common methods of action without the need to provide Future and ListenableFuture extension methods.

A Future in the traditional JDK computes the return Result asynchronously: in a multithreaded operation that may or may not return a Result, a Future is a reference handle to the running multithreading that ensures that a Result is returned during service execution.

ListenableFuture allows you to register callbacks to be called when an operation (multi-threaded) is complete, or to be executed immediately after the operation (multi-threaded) is complete. This simple improvement makes it possible to support significantly more operations that are not supported in JDK Concurrent futures.

The above article is from Google Guava package ListenableFuture Parsing, which is a great article. Below you’ll see how Sharding-JDBC simplifies concurrent programming with ListenableFuture.

How does ExecutorEngine initialize ListeningExecutorService

// ShardingDataSource.java public ShardingDataSource(final ShardingRule shardingRule, final Properties props) { // .... ShardingProperties = new shardingProperties (props); int executorSize = shardingProperties.getValue(ShardingPropertiesConstant.EXECUTOR_SIZE); executorEngine = new ExecutorEngine(executorSize); / /... Executtorengine public executtorengine (final int executorSize) {executorService = MoreExecutors.listeningDecorator(new ThreadPoolExecutor( executorSize, executorSize, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShardingJDBC-%d").build())); MoreExecutors.addDelayedShutdownHook(executorService, 60, TimeUnit.SECONDS); }Copy the code
  • A ShardingDataSource has an exclusive SQL execution engine.
  • MoreExecutors#listeningDecorator()Create ListeningExecutorService like this#submit().#invokeAll()You can return ListenableFuture.
  • By default, the thread pool size is 8. You can adjust ShardingProperties based on actual business requirements.
  • #setNameFormat()In concurrent programming, it is important to define thread names so that troubleshooting is easier.
  • MoreExecutors#addDelayedShutdownHook().Application of closedWhen waiting forAll missions completedShut down again. The default waiting time is 60 seconds.adviceMake the wait time manageable.

2.2 shut down

When the data source is closed, ExecutorEngine is called to do the same.

// ShardingDataSource.java
@Override
public void close() {
   executorEngine.close();
}
// ExecutorEngine
@Override
public void close() {
   executorService.shutdownNow();
   try {
       executorService.awaitTermination(5, TimeUnit.SECONDS);
   } catch (final InterruptedException ignored) {
   }
   if (!executorService.isTerminated()) {
       throw new ShardingJdbcException("ExecutorEngine can not been terminated");
   }
}Copy the code
  • #shutdownNow()Try to useThread.interrupt()Interrupts a task in progress and stops an unfinished task.advicePrint which tasks are not executed because the SQL is not executed and the data may not be persisted.
  • #awaitTermination()because#shutdownNow()Interrupt is notimmediatelyTo finish, it takes a process, so hereWaiting for theFor 5 seconds.
  • Wait 5 seconds, the thread pool may not be closed, then throw an exception to the upper layer. It is recommended to print a log to record this occurrence.

2.3 Executing SQL Tasks

ExecuteStatement (), #executePreparedStatement(), #executeBatch()

Three methods were provided to StatementExecutor, PreparedStatementExecutor, BatchPreparedStatementExecutor calls. All three of these methods internally call the #execute() private method.

* @param sqlType SQL type * @param statementUnits Set of Statement object execution units * @param ExecuteCallback executeCallback function * @param <T> return value type * @return result */ public <T> List<T> executeStatement(final SQLType SQLType, final Collection<StatementUnit> statementUnits, final ExecuteCallback<T> executeCallback) { return execute(sqlType, statementUnits, Collections.<List<Object>>emptyList(), executeCallback); * @param sqlType SQL type * @Param preparedStatementUnits Statement object execution unit * @param Parameters parameter list * @param executeCallback Executes the callback function * @param <T> Return value type * @return Execution result */ public <T> List<T> executePreparedStatement(final) SQLType sqlType, final Collection<PreparedStatementUnit> preparedStatementUnits, final List<Object> parameters, final ExecuteCallback<T> executeCallback) { return execute(sqlType, preparedStatementUnits, Collections.singletonList(parameters), executeCallback); } / perform Batch. * * * * @ param sqlType SQL type * @ param batchPreparedStatementUnits statement object execution unit set * @ param parameterSets set * parameter list Public List<int[]> executeBatch(final SQLType SQLType, final Collection<BatchPreparedStatementUnit> batchPreparedStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<int[]> executeCallback) { return execute(sqlType, batchPreparedStatementUnits, parameterSets, executeCallback); }Copy the code

#execute()

/** * execute ** @param sqlType SQL type * @param baseStatementUnits Statement object execution unit * @param parameterSets Parameter list * @param ExecuteCallback Execute callback function * @param <T> Return value type * @return execution result */ private <T> List<T> execute(final SQLType SQLType, final Collection<? extends BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) { if (baseStatementUnits.isEmpty()) { return Collections.emptyList(); } Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator(); BaseStatementUnit firstInput = iterator.next(); ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback); T firstOutput; List<T> restOutputs; FirstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback); RestFutures = restFutures. Get (); restFutures = restFutures. //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON ExecutorExceptionHandler.handleException(ex); return null; List<T> result = Lists. NewLinkedList (restOutputs); result.add(0, firstOutput); return result; }Copy the code
  • The first task“Synchronization”call#executeInternal()Perform tasks.
private <T> T syncExecute(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, Final ExecuteCallback<T> ExecuteCallback) throws Exception {// [synchronize] Execute the task return executeInternal(sqlType, baseStatementUnit, parameterSets, executeCallback, ExecutorExceptionHandler.isExceptionThrown(), ExecutorDataMap.getDataMap()); }Copy the code
  • The second mission to startThe submission thread pool is asynchronouscall#executeInternal()Perform tasks.
private <T> ListenableFuture<List<T>> asyncExecute( final SQLType sqlType, final Collection<BaseStatementUnit> baseStatementUnits, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) { List<ListenableFuture<T>> result = new ArrayList<>(baseStatementUnits.size()); final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); final Map<String, Object> dataMap = ExecutorDataMap.getDataMap(); for (final BaseStatementUnit each : Result.add (executorService.submit(new Callable<T>() {@override public T call() throws Exception { return executeInternal(sqlType, each, parameterSets, executeCallback, isExceptionThrown, dataMap); }})); } // Return ListenableFuture return Futures. AllAsList (result); }Copy the code
  • Let’s pay attentionFutures.allAsList(result); å’Œ restOutputs = restFutures.get();. Artifact GuavaSimplified concurrent programmingThe benefits of cash out.ListenableFuture#get() 当All missions were successful, all task execution results are returned. whenAny task failsWhen,immediatelyThrow an exception without waiting for other tasks to complete.

_😮 Guava true her cat artifact, public number: [taro channel source] will update Guava source code to share a series of yo! Why don’t the old driver get on the bus? _

  • Why synchronous and asynchronous execution? At a guess, performance is better when SQL execution is single table, as long as the first task is called synchronously. We will update it after we confirm the reason with Zhang Liang.
// ExecutorEngine.java private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback, final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception { synchronized (baseStatementUnit.getStatement().getConnection()) { T result; ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown); ExecutorDataMap.setDataMap(dataMap); List<AbstractExecutionEvent> events = new LinkedList<>(); // Generate Event if (parametersets.isEmpty ()) {events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList())); } else { for (List<Object> each : parameterSets) { events.add(getExecutionEvent(sqlType, baseStatementUnit, each)); }} / release/EventBus EventExecutionType BEFORE_EXECUTE for (AbstractExecutionEvent event: events) { EventBusInstance.getInstance().post(event); } try {// execute the callback function result = executecallback.execute (baseStatementUnit); } the catch (final SQLException ex) {/ / release EventBus EventExecutionType EXECUTE_FAILURE for (AbstractExecutionEvent each: events) { each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE); each.setException(Optional.of(ex)); EventBusInstance.getInstance().post(each); ExecutorExceptionHandler.handleException(ex); } return null; } / / release EventBus EventExecutionType EXECUTE_SUCCESS for (AbstractExecutionEvent each: events) { each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS); EventBusInstance.getInstance().post(each); } return result; }}Copy the code
  • result = executeCallback.execute(baseStatementUnit);Execute the callback function. StatementExecutor PreparedStatementExecutor, BatchPreparedStatementExecutor passing byExecute the callback functionExecuteCallback implements parallel execution for ExecutorEngine.
Public interface ExecuteCallback<T> {/** * Execute tasks. ** @param baseStatementUnit Execution unit of statement objects * @return processing result * @throws Exception Exception during execution */ T execute(BaseStatementUnit BaseStatementUnit) throws Exception; }Copy the code
  • Synchronized (baseStatementUnit getStatement (). The getConnection ()) thought that the Connection is not thread safe, so you need to use synchronous, after going through data in the database Connection pool: why do you want to set up multiple connections “. Connection is thread-safe. We will update it after we confirm the reason with Zhang Liang.

    • Answer: The Connection implementation of MySQL and Oracle is thread-safe. A Connection implemented by a database Connection pool is not necessarily thread-safe; for example, Druid’s thread pool Connection is not thread-safe

      The FROM github.com/dangdangdot… Druid’s stat filter does not consider thread safety when concurrently using the same connection, causing multiple threads to change the state of the filter. To fix this problem, consider that the mysql driver is thread-safe for the same connection when executing a statement. That is, the session of the same database link is executed serially. So executors in SJDBC also synchronize at the database link level for multithreaded execution. Therefore, this scheme does not degrade the performance of SJDBC. Synchronization in jdk1.7 also uses lock upgrade technology, which is very low overhead with low collisions.

  • ExecutionEvent is not explained here and is shared in section 4 of this article [EventBus].

  • ExecutorExceptionHandler, ExecutorDataMap, and AbstractSoftTransaction are shared in Flexible Transactions.

3. Executor

Executor, there are currently three executors. Different actuators correspond to different basic element units.

Actuator class Actuators, Execution unit
StatementExecutor Static statement object execution unit StatementUnit
PreparedStatementExecutor The executor requested by the precompiled statement object PreparedStatementUnit
BatchPreparedStatementExecutor An executor requested by a batch precompiled statement object BatchPreparedStatementUnit
  • The methods provided by the actuators are different, so there are no common interfaces or abstract classes.
  • The execution unit inherits from BaseStatementUnit

3.1 StatementExecutor

StatementExecutor, a multithreaded executor that executes static statement object requests, has three types of methods:

  • #executeQuery()
// statementexecutor.java /** * execute SQL query. * @return ResultSet List */ public List<ResultSet> executeQuery() {Context Context = MetricsContext.start("ShardingStatement-executeQuery"); List<ResultSet> result; try { result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<ResultSet>() { @Override public ResultSet execute(final BaseStatementUnit baseStatementUnit) throws Exception { return baseStatementUnit.getStatement().executeQuery(baseStatementUnit.getSqlExecutionUnit().getSql()); }}); } finally { MetricsContext.stop(context); } return result; }Copy the code
  • #executeUpdate()Because there are four different cases#executeUpdate(), so the Updater interface is abstracted to achieve logical reuse.
* @return Number of updates */ public int executeUpdate() {return executeUpdate(new) Updater() { @Override public int executeUpdate(final Statement statement, final String sql) throws SQLException { return statement.executeUpdate(sql); }}); } private int executeUpdate(final Updater updater) { Context context = MetricsContext.start("ShardingStatement-executeUpdate"); try { List<Integer> results = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Integer>() { @Override public Integer execute(final BaseStatementUnit baseStatementUnit) throws Exception { return updater.executeUpdate(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql()); }}); return accumulate(results); } finally { MetricsContext.stop(context); }} /** * accumulate(final List<Integer> results) {int result = 0; for (Integer each : results) { result += null == each ? 0 : each; } return result; }Copy the code
  • #execute()Because there are four different cases#execute(), so the Executor interface is abstracted for logical reuse.
* @return true: execute a DQL statement. False Indicates the DML statement executed. */ public Boolean execute() {return execute(new Executor() {@override public Boolean execute(final) Statement statement, final String sql) throws SQLException { return statement.execute(sql); }}); } private boolean execute(final Executor executor) { Context context = MetricsContext.start("ShardingStatement-execute"); try { List<Boolean> result = executorEngine.executeStatement(sqlType, statementUnits, new ExecuteCallback<Boolean>() { @Override public Boolean execute(final BaseStatementUnit baseStatementUnit) throws Exception { return executor.execute(baseStatementUnit.getStatement(), baseStatementUnit.getSqlExecutionUnit().getSql());  }}); if (null == result || result.isEmpty() || null == result.get(0)) { return false; } return result.get(0); } finally { MetricsContext.stop(context); }}Copy the code

3.2 PreparedStatementExecutor

PreparedStatementExecutor, multiple threads execute pre-compiled statement object request of the actuator. StatementExecutor has more parameters than StatementExecutor.

3.3 BatchPreparedStatementExecutor

BatchPreparedStatementExecutor, multithreaded execution batch pre-compiled statement object request actuators.

/ / BatchPreparedStatementExecutor. * * * to perform batch SQL in Java / * * @ return results * / public int [] executeBatch () {Context to Context = MetricsContext.start("ShardingPreparedStatement-executeBatch"); try { return accumulate(executorEngine.executeBatch(sqlType, batchPreparedStatementUnits, parameterSets, new ExecuteCallback<int[]>() { @Override public int[] execute(final BaseStatementUnit baseStatementUnit) throws Exception { return baseStatementUnit.getStatement().executeBatch(); }})); } finally { MetricsContext.stop(context); Private int[] accumulate(final List<int[]>) private int[] accumulate(final List<int[]> results) { int[] result = new int[parameterSets.size()]; int count = 0; / / each statement in sequence, read the corresponding SQL affect the number of rows to accumulate for each divided (BatchPreparedStatementUnit each: batchPreparedStatementUnits) { for (Map.Entry<Integer, Integer> entry : each.getJdbcAndActualAddBatchCallTimesMap().entrySet()) { result[entry.getKey()] += null == results.get(count) ? 0 : results.get(count)[entry.getValue()]; } count++; } return result; }Copy the code

Have BatchPreparedStatementExecutor eagle-eyed homecoming found, why, and no BatchStatementExecutor? Currently, Sharding-JDBC does not support batch operation of Statement, and only batch operation of PreparedStatement can be performed.

PreparedStatement ps = conn.prepareStatement(SQL) ps.addBatch(); ps.addBatch(); Ps.addbatch (SQL); / / error: the at com. Dangdang. Ddframe. RDB. Sharding. JDBC. Unsupported. AbstractUnsupportedOperationStatement. AddBatchCopy the code

4. ExecutionEvent

AbstractExecutionEvent, SQL execution event abstract interface.

Public Abstract Class AbstractExecutionEvent {private final String id = AbstractExecutionEvent; /** * dataSource */ private final String dataSource; /** * SQL */ private final String sql; /** * private final List<Object> parameters; /** * Event type */ private EventExecutionType EventExecutionType; /** * exception */ private Optional<SQLException> exception; }Copy the code

AbstractExecutionEvent has two subclasses that implement it:

  • DMLExecutionEvent: DML class SQL execution time event
  • DQLExecutionEvent: DQL SQL execution event

EventExecutionType: Event triggering type.

  • BEFORE_EXECUTE: before execution
  • EXECUTE_SUCCESS: The execution succeeds
  • EXECUTE_FAILURE: execution fails

4.1 EventBus

What exactly does that do? Sharding-jdbc implements the publishing and subscribing of events using Guava’s (yes, it again) EventBus. ExecutorEngine#executeInternal()

  • Before SQL execution: Publish events whose type is BEFORE_EXECUTE
  • SQL executed successfully: Published an event of type EXECUTE_SUCCESS
  • Failed to execute SQL: Publish an event of type EXECUTE_FAILURE

How do I subscribe to events? Very simple, here’s an example:

EventBusInstance. GetInstance (). The register (new Runnable () {@ Override public void the run () {} @ the Subscribe / / subscription AllowConcurrentEvents // Whether concurrency is allowed, Public void Listen (final DMLExecutionEvent event) {// DMLExecutionEvent system.out.println ("DMLExecutionEvent: " + event.getSql() + "\t" + event.getEventExecutionType()); } @subscribe // @allowConcurrentevents // Whether concurrent execution is allowed, Public void listen2(final DQLExecutionEvent event) {//DQLExecutionEvent System.out.println("DQLExecutionEvent: " + event.getSql() + "\t" + event.getEventExecutionType()); }});Copy the code
  • #register()Any class will do; you don’t have to use the Runnable class. This example is purely for convenience
  • @SubscribeAnnotations are methods that implement subscriptions to events
  • @AllowConcurrentEventsAnnotations, in terms of methods, are thread-safe and allow concurrent execution
  • In the way thatThe class of the parameterThat is, subscribed events. For example,#listen()The DMLExecutionEvent event is subscribed to
  • EventBus#post()Publish events,synchronousInvoke subscription logic

  • Recommended reading: Guava Study Notes: EventBus

Sharding-JDBC is collecting a list of companies that use: Portals. 🙂 Your registration will allow more people to participate and use Sharding-JDBC. The portal Sharding-JDBC will therefore be able to cover more business scenarios. Sign up for the portal, Slut! portal

4.2 BestEffortsDeliveryListener

Listener BestEffortsDeliveryListener, best service affairs.

This article will not analyze the implementation for the time being, just as another subscriber example. We’ll share that in Flexible Affairs.

public final class BestEffortsDeliveryListener { @Subscribe @AllowConcurrentEvents public void listen(final DMLExecutionEvent event) { if (! isProcessContinuously()) { return; } SoftTransactionConfiguration transactionConfig = SoftTransactionManager.getCurrentTransactionConfiguration().get(); TransactionLogStorage transactionLogStorage = TransactionLogStorageFactory.createTransactionLogStorage(transactionConfig.buildTransactionLogDataSource()); BEDSoftTransaction bedSoftTransaction = (BEDSoftTransaction) SoftTransactionManager.getCurrentTransaction().get(); switch (event.getEventExecutionType()) { case BEFORE_EXECUTE: / / TODO for batch execution of SQL need to parse into two layers list transactionLogStorage. Add (new TransactionLog (event. The getId (), bedSoftTransaction.getTransactionId(), bedSoftTransaction.getTransactionType(), event.getDataSource(), event.getSql(), event.getParameters(), System.currentTimeMillis(), 0)); return; case EXECUTE_SUCCESS: transactionLogStorage.remove(event.getId()); return; case EXECUTE_FAILURE: boolean deliverySuccess = false; for (int i = 0; i < transactionConfig.getSyncMaxDeliveryTryTimes(); i++) { if (deliverySuccess) { return; } boolean isNewConnection = false; Connection conn = null; PreparedStatement preparedStatement = null; try { conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE); if (! isValidConnection(conn)) { bedSoftTransaction.getConnection().release(conn); conn = bedSoftTransaction.getConnection().getConnection(event.getDataSource(), SQLType.UPDATE); isNewConnection = true; } preparedStatement = conn.prepareStatement(event.getSql()); For (int parameterIndex = 0; parameterIndex < event.getParameters().size(); parameterIndex++) { preparedStatement.setObject(parameterIndex + 1, event.getParameters().get(parameterIndex)); } preparedStatement.executeUpdate(); deliverySuccess = true; transactionLogStorage.remove(event.getId()); } catch (final SQLException ex) { log.error(String.format("Delivery times %s error, max try times is %s", i + 1, transactionConfig.getSyncMaxDeliveryTryTimes()), ex); } finally { close(isNewConnection, conn, preparedStatement); } } return; default: throw new UnsupportedOperationException(event.getEventExecutionType().toString()); }}}Copy the code

666. The eggs

This article is over, but not over.

Cross-shard transaction issues. Such as:

UPDATE t_order SET nickname = ? WHERE user_id = ?Copy the code

When connection.com MIT () occurs, the application suddenly hangs! Connection.com MIT () has not been executed yet. Let’s go to Flexible Affairs to find out.

Dao friends, share a wave of circle of friends?