With the rapid development of Internet information technology, an era of massive data explosion has come. How to deal with and analyze these massive data resources effectively has become a sharp tool for major technology manufacturers to stand out in the fierce competition. It can be said that if these massive data resources cannot be quickly processed and analyzed, they will soon be eliminated ruthlessly by the market. Of course, there are many schemes for processing and analyzing these massive data. First, in distributed computing, there is the MapReduce parallel computing framework in Hadoop, which is mainly aimed at offline data mining and analysis. In addition, Storm, also a distributed computing framework for real-time online streaming data processing, can also meet the requirements of real-time data analysis and processing. Finally, there is Spring Batch, a fully batch-oriented framework that can be applied to large-scale enterprise-level massive data processing.

I will not expand here into the detailed tutorial instructions that explain how these frameworks are deployed and developed for use. I would like to go further: can we learn from the technical background behind these open source frameworks and customize a batch processing framework that meets their own data processing requirements for the enterprises or companies serving them?

First of all, I would like to describe the current background of a user data storage process facing the company I work for. At present, the scale of the user data of mobile companies in a province has reached tens of millions, and each province has divided the user data according to the region of cities. We store this batch of data in the traditional relational database (based on Oracle, cities are partitions). The billing and settlement system of the mobile company will inform the business processing system in real time according to the balance of the user’s mobile phone bill, and give the user the operation of stopping or resuming the phone. After receiving the request from the billing and settlement system, the business processing system will send the user data to be processed to specific switch nes and distribute different switch instructions, which can simply be called Hlr shutdown instructions (hereinafter referred to as Hlr instructions). The current situation is that, in everyday circumstances, traditional C++ multi-process background programs can barely handle these data requests in “quasi-real time”, but once it comes to the beginning of the month, the amount of data to be processed tends to surge, and the C++ background program processing efficiency is not very high. At this moment the problem comes, often can have user complaint, oneself pay cost, why did not answer the phone? Or some users have clearly overdue fees, but has not stopped in time. Such a result will directly reduce customers’ satisfaction with the support of mobile operators, and at the same time, mobile operators themselves may lose these customer resources.

I seriously evaluated myself and found several bottlenecks that caused the above problems.

  1. A province all user data in the database of an entity in the table, the database server to be fully prepared to reach top minicomputer configuration, also may not be able to meet the performance requirements, the surge of early processing can say frequently on a single server to read and write IO overhead is very large, poor performance of the entire server processing.
  2. When processing these data, Hlr instructions are sent synchronously to the physical devices of the switch. When the switch fails to process this request instruction, it can only block and wait, further causing a backlog of subsequent data to be processed.

In view of the above problems, I have thought of several optimization schemes.

  1. The entity table in the database can be split according to the user’s city of the entity table. That is, the pressure of one or several servers, horizontal split. Does a database server focus on processing data requests from one or more cities? Reduce I/O overhead.
  2. Since there are blocking operations when the switch processes Hlr instructions, can we change it to: through asynchronous return processing, the tasks in the processing task queue are first notified to the switch, and then the switch reversively notifies the processing module through asynchronous callback mechanism to report the execution of the tasks. In this way, the processing module changes from an active task polling and waiting to an asynchronous notification waiting for the execution result of the switch, so that it can focus on the distribution of processing data, and will not be limited by the processing time of several tasks, thus affecting the subsequent data processing of the whole batch.
  3. Can the entity table of database be loaded in parallel due to horizontal disassembly? This will greatly save the processing time of serial data loading.
  4. The data to be processed by parallel loading can best be put into a batch processing framework, which can adjust the configuration parameters according to the situation of data to be processed, so as to meet the requirements of real-time performance. For example, at the beginning of the month, the value of processing parameters can be increased to improve processing efficiency. Normally, you can lower the value of the processing parameter to reduce the CPU/I/O overhead.

Based on the above considerations, the component diagram of the design scheme as shown in the figure below is obtained:

  

Here is a detailed description of how the key modules work together.

  1. Asynchronous parallel query loading module BatchQueryLoader: support to pass in multiple data source objects, at the same time using the Google – Guava library for Future interface extension ListenableFuture, to realize the parallel loading of batch query data. The Future interface is used to represent the results of asynchronous computations, and when the computations are complete, only get() methods can be used to obtain the results, and one of the methods in get can be set to a timeout. In the parallel loading module, the data in the entity table of multiple data sources are loaded in batches and in parallel, and the result set is finally fed back. The time taken for parallel data loading and serial data loading can be simply illustrated by the following legend: The total time taken for serial data loading is the sum of the time taken for each data source. The total time of parallel loading depends on the time of the data source with the largest load. (Note: we collect the user data to be processed by the shutdown every day into the notify_Users table of the horizontal sub-database by the collection program.)
  2. BatchTaskReactor: Internally, it is realized by thread pool mechanism. The loading result data obtained by asynchronous parallel query loading module BatchQueryLoader is put into the thread pool for asynchronous task distribution. Finally, HlrBusinessEventTask module delivers instruction task through Hlr sending single instruction asynchronous task. Then they constantly fetch from the blocking queue and dispatch tasks from the list of tasks to be executed. At the same time, he receives the execution feedback of HlrBusinessEventTask dispatch instructions asynchronously through the Future interface.
  3. Batch processing thread pool operation parameters configuration loaded BatchTaskConfigurationLoader: Load the configuration of the thread pool running parameters and notify the BatchTaskReactor. The batchtask-configuration. XML configuration file is shown below.
    <? The XML version = "1.0" encoding = "GBK"? > <batchtask> <! -- Batch asynchronous thread pool parameter configuration --> < jobPool name=" newLandFramework_batchTask "> <attribute name="corePoolSize" value="15" /> <attribute name="maxPoolSize" value="30" /> <attribute name="keepAliveTime" value="1000" /> <attribute name="workQueueSize" value="200" /> </jobpool> </batchtask>Copy the code

    CorePoolSize indicates the size of the reserved thread pool, workQueueSize indicates the size of the blocked queue, maxPoolSize indicates the maximum size of the thread pool, and keepAliveTime indicates the timeout period for an idle thread to terminate. ThreadPoolExecutor has a parameter called Unit, which represents an enumeration, the unit of keepAliveTime. After all, what is the relationship between these parameters? As an example, ThreadPoolExecutor allocates a pool of corePoolSize threads to handle a task that needs to be processed. If not, ThreadPoolExecutor places the task in a blocking queue of workQueueSize size, which may not be sufficient. What to do. The ThreadPoolExecutor thread pool has four different processing strategies. If the number of ThreadPoolExecutor threads is maxPoolSize-corePoolSize, then the number of ThreadPoolExecutor threads is maxPoolSize.

  4. Now let’s talk about those strategies. First is ThreadPoolExecutor AbortPolicy, throws runtime RejectedExecutionException handler was refused. Then the ThreadPoolExecutor. CallerRunsPolicy, thread calls the execute itself to run this task. This strategy provides a simple feedback control mechanism that slows down the delivery of new tasks. . The second is, ThreadPoolExecutor DiscardPolicy, it is impossible to carry out the task will be deleted. Finally ThreadPoolExecutor DiscardOldestPolicy, if enforcement procedure has not been closed, is located in the work queue head task will be deleted, and then retry the executor (if failed again, repeat this process). If there are fewer tasks to process, the ThreadPoolExecutor thread pool will reclaim the excess “temp threads” based on the time unit set by keepAliveTime. You can think of keepAliveTime as dedicated to the “temp thread” of maxPoolSize-corePoolSize.

  5. Setting of thread pool parameters. How do we normally set the thread pool parameters? I, workQueueSize Block queue size is at least greater than or equal to corePoolSize. MaxPoolSize The size of the thread pool must be greater than or equal to the size of corePoolSize. CorePoolSize is the default number of threads you want to process. Otherwise, your thread pool is a single thread processing task, thus losing the purpose of thread pool design.

  6. The BatchTaskMonitor (Java Management Extensions (JMX) batch task monitoring module monitors the task execution status (success/failure status) in the thread pool BatchTaskReactor in real time.

After introducing the main functions of several core modules, the detailed design ideas of the main modules will be introduced in turn.

  1. We collect the daily user data to be stopped and restarted into the notify_Users table through the collection program. The first definition is the structural description of the notification user data object that we are dealing with in the collection, which corresponds to the JavaBean object of the horizontal branch table NotifY_USERS. The table structure of notify_Users is simply designed for demonstration purposes (based on an Oracle database) as follows:

    Create table notify_users (home_city number(3) /* Phone user’s home city number */, MSISdn number(15) /* phone number */, User_id number(15) /* Mobile user id */);

    Corresponding JavaBean entity class NotifyUsers, the specific code definition is as follows:

    / * * * @ filename: NotifyUsers. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: must carry on the batch notify user object * @ the author Tangjie * @ version 1.0 * * / package newlandframework. Batchtask. Model; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; Public class NotifyUsers {public NotifyUsers() {} private Integer homeCity; public class NotifyUsers {public NotifyUsers() {} private Integer homeCity; // User's mobile number private Integer MSisdn; // userId private Integer userId; public Integer getHomeCity() { return homeCity; } public void setHomeCity(Integer homeCity) { this.homeCity = homeCity; } public Integer getMsisdn() { return msisdn; } public void setMsisdn(Integer msisdn) { this.msisdn = msisdn; } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("homeCity", homeCity).append("userId", userId) .append("msisdn", msisdn).toString(); }}Copy the code
  2. Asynchronous parallel query loading module BatchQueryLoader class diagram structureWe use the parallel query loading module BatchQueryLoader to call the asynchronous parallel query executor BatchQueryExecutor to load query result sets of different data sources in parallel. StatementWrapper encapsulates the Statement in JDBC. The specific code is as follows:
  3. / * * * @ filename: StatementWrapper. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Statement wrapper class * @ the author Tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; import java.sql.Connection; import java.sql.Statement; public class StatementWrapper { private final String sql; private final Statement statement; private final Connection con; public StatementWrapper(String sql, Statement statement, Connection con) { this.sql = sql; this.statement = statement; this.con = con; } public String getSql() { return sql; } public Statement getStatement() { return statement; } public Connection getCon() { return con; }}Copy the code

    Define two parallel loading exception class BatchQueryInterruptedException, BatchQueryExecutionException

    /** * @filename:BatchQueryInterruptedException.java * * Newland Co. Ltd. All rights reserved. * * @description: Concurrent query loads InterruptedException class * @author Tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; public class BatchQueryInterruptedException extends RuntimeException { public BatchQueryInterruptedException(final String errorMessage, final Object... args) { super(String.format(errorMessage, args)); } public BatchQueryInterruptedException(final Exception cause) { super(cause); }}Copy the code
    /** * @filename:BatchQueryExecutionException.java * * Newland Co. Ltd. All rights reserved. * * @description: Parallel query load ExecutionException class * @author Tangjie * @version 1.0 * */ package newlandframework.batchtask.parallel; public class BatchQueryExecutionException extends RuntimeException { public BatchQueryExecutionException(final String errorMessage, final Object... args) { super(String.format(errorMessage, args)); } public BatchQueryExecutionException(final Exception cause) { super(cause); }}Copy the code

    Abstract a batch query interface, mainly for the subsequent expansion of batch loading between different databases. The interface class BatchQuery is defined as follows

    /** * @filename: batchQuery.java ** Newland Co. Ltd. All rights reserved. ** @description: Asynchronous query interface definition * @author tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; public interface BatchQuery<IN, OUT> { OUT query(IN input) throws Exception; }Copy the code

    Ok, now encapsulate an asynchronous parallel query executor, BatchQueryExecutor

    / * * * @ filename: BatchQueryExecutor. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: asynchronous parallel query executor * @ the author Tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.collections.Closure; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.ForClosure; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; public class BatchQueryExecutor { private final static int FUTUREQUERYNUMBER = 1; public BatchQueryExecutor() { } public <IN, OUT> List<OUT> executeQuery(final Collection<IN> inputs,final BatchQuery<IN, OUT> executeUnit) { ListenableFuture<List<OUT>> futures = submitBatchTaskFutures(inputs,executeUnit); delegateAsynTask(futures); return getAsynResults(futures); } private <IN, OUT> ListenableFuture<List<OUT>> submitBatchTaskFutures( final Collection<IN> inputs, final BatchQuery<IN, OUT> executeUnit) { final Set<ListenableFuture<OUT>> result = new HashSet<ListenableFuture<OUT>>( inputs.size()); final ListeningExecutorService service = MoreExecutors .listeningDecorator(Executors.newFixedThreadPool(inputs.size()));  Closure futureQuery = new Closure() { public void execute(Object input) { final IN p = (IN) input; result.add(service.submit(new Callable<OUT>() { @Override public OUT call() throws Exception { return executeUnit.query(p); }})); }}; Closure parallelTask = new ForClosure(FUTUREQUERYNUMBER, futureQuery); CollectionUtils.forAllDo(inputs, parallelTask); service.shutdown(); return Futures.allAsList(result); } private <OUT> OUT getAsynResults(final ListenableFuture<OUT> futures) { try { return futures.get(); } catch (InterruptedException ex) { throw new BatchQueryInterruptedException(ex); } catch (ExecutionException ex) { throw new BatchQueryExecutionException(ex); } } private <TYPE> void delegateAsynTask( final ListenableFuture<TYPE> allFutures) { Futures.addCallback(allFutures, New FutureCallback<TYPE>() {@override public void onSuccess(final TYPE result) {system.out.println (" parallel load query executed successfully "); } @override public void onFailure(final Throwable thrown) {system.out.println (); }}); }}Copy the code

    Finally, the BatchQueryLoader directly calls the above asynchronous parallel query executor BatchQueryExecutor to complete the parallel asynchronous loading of data from different data sources. The code is as follows

    / * * * @ filename: BatchQueryLoader. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: parallel query module loading * @ the author Tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Iterator; import java.util.List; public class BatchQueryLoader { private final Collection<StatementWrapper> statements = new ArrayList<StatementWrapper>(); public void attachLoadEnv(final String sql, final Statement statement, final Connection con) { statements.add(new StatementWrapper(sql, statement, con)); } public Collection<StatementWrapper> getStatements() { return statements; } public void close() throws SQLException { Iterator<StatementWrapper> iter = statements.iterator(); while (iter.hasNext()) { iter.next().getCon().close(); } } public List<ResultSet> executeQuery() throws SQLException { List<ResultSet> result; if (1 == statements.size()) { StatementWrapper entity = statements.iterator().next(); result = Arrays.asList(entity.getStatement().executeQuery( entity.getSql())); return result; } else { BatchQueryExecutor query = new BatchQueryExecutor(); result = query.executeQuery(statements, new BatchQuery<StatementWrapper, ResultSet>() { @Override public ResultSet query(final StatementWrapper input) throws Exception { return input.getStatement().executeQuery( input.getSql()); }}); return result; }}}Copy the code
  4. Batch processing load BatchTaskConfigurationLoader thread pool operation parameters configuration module, mainly responsible for from batchtask – configuration. The XML loading operation parameters of the thread pool. BatchTaskConfiguration JavaBean structure corresponding to the running parameters of the batch thread pool
    / * * * @ filename: BatchTaskConfiguration. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * batch thread pool parameter configuration @ the author tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class BatchTaskConfiguration { private String name; private int corePoolSize; private int maxPoolSize; private int keepAliveTime; private int workQueueSize; public void setName(String name) { this.name = name; } public String getName() { return this.name; } public int getCorePoolSize() { return corePoolSize; } public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } public int getMaxPoolSize() { return maxPoolSize; } public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; } public int getKeepAliveTime() { return keepAliveTime; } public void setKeepAliveTime(int keepAliveTime) { this.keepAliveTime = keepAliveTime; } public int getWorkQueueSize() { return workQueueSize; } public void setWorkQueueSize(int workQueueSize) { this.workQueueSize = workQueueSize; } public int hashCode() { return new HashCodeBuilder(1, 31).append(name).toHashCode(); } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("name", name).append("corePoolSize", corePoolSize) .append("maxPoolSize", maxPoolSize) .append("keepAliveTime", keepAliveTime) .append("workQueueSize", workQueueSize).toString(); } public boolean equals(Object o) { boolean res = false; if (o ! = null && BatchTaskConfiguration.class.isAssignableFrom(o.getClass())) { BatchTaskConfiguration s = (BatchTaskConfiguration) o; res = new EqualsBuilder().append(name, s.getName()).isEquals(); } return res; }}Copy the code

    Of course, when you configure parameters, you can also specify multiple thread pool, so to design a: batch processing thread pool factory class BatchTaskThreadFactoryConfiguration, to cycle in turn save several thread pool configuration parameters

    /** * @filename:BatchTaskThreadFactoryConfiguration.java * * Newland Co. Ltd. All rights reserved. * * @ the Description: the thread pool parameters configuration factory * @ author tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; import java.util.Map; import java.util.HashMap; Public class BatchTaskThreadFactoryConfiguration {/ / batch thread pool configuration parameters private Map < String, BatchTaskConfiguration> batchTaskMap = new HashMap<String, BatchTaskConfiguration>(); public BatchTaskThreadFactoryConfiguration() { } public void joinBatchTaskConfiguration(BatchTaskConfiguration batchTaskConfiguration) { if (batchTaskMap.containsKey(batchTaskConfiguration.getName())) { return; }else{ batchTaskMap.put(batchTaskConfiguration.getName(), batchTaskConfiguration); } } public Map<String, BatchTaskConfiguration> getBatchTaskMap() { return batchTaskMap; }}Copy the code

    The rest is, loaded BatchTaskConfigurationLoader runtime parameters configuration module

    / * * * @ filename: BatchTaskConfigurationLoader. Java * * Newland Co., Ltd. All rights reserved. * * @ the Description: the thread pool configuration parameters load * @ the author tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; import java.io.InputStream; import org.apache.commons.digester.Digester; public final class BatchTaskConfigurationLoader { private static final String BATCHTASK_THREADPOOL_CONFIG = "./newlandframework/batchtask/parallel/batchtask-configuration.xml"; private static BatchTaskThreadFactoryConfiguration config = null; Private BatchTaskConfigurationLoader () {} / / singleton pattern in order to control concurrent for synchronous control public static BatchTaskThreadFactoryConfiguration getConfig() { if (config == null) { synchronized (BATCHTASK_THREADPOOL_CONFIG) { if (config == null) { try { InputStream  is = getInputStream(); config = (BatchTaskThreadFactoryConfiguration) getDigester().parse(getInputStream()); } catch (Exception e) { e.printStackTrace(); } } } } return config; } private static InputStream getInputStream() { return BatchTaskConfigurationLoader.class.getClassLoader() .getResourceAsStream(BATCHTASK_THREADPOOL_CONFIG); } private static Digester getDigester() { Digester digester = new Digester(); digester.setValidating(false); digester.addObjectCreate("batchtask", BatchTaskThreadFactoryConfiguration.class); / / load the asynchronous batch batch digester. The parameters of the thread pool configuration addObjectCreate (" * / jobpool, "BatchTaskConfiguration. Class); digester.addSetProperties("*/jobpool"); digester.addSetProperty("*/jobpool/attribute", "name", "value"); digester.addSetNext("*/jobpool", "joinBatchTaskConfiguration"); return digester; }}Copy the code

    The above modules are designed to allow the running parameters of the thread pool to be adjusted.

  5. The main class diagram structure of the BatchTaskReactor is as followsThe BatchTaskRunner interface defines the actions of the batch framework to initialize and reclaim resources.
    / * * * @ filename: BatchTaskRunner. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: defining interfaces * @ author batch resource management Tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; import java.io.Closeable; public interface BatchTaskRunner extends Closeable { public void initialize(); public void close(); }Copy the code

    We will also re-implement a thread factory class, BatchTaskThreadFactory, to manage threads in our thread pool. We can put threads in a thread pool into a thread group for unified management. For example, threads in a thread pool, its health monitoring, etc., you can run and track the status of threads in a thread group by creating a new monitor thread. Of course, you can also re-package a JMX (Java Management Extensions) MBean object to monitor the thread pool through JMX. In the end of this article, there is a implementation of JMX technology to monitor the task completion of the batch thread pool. Implementation of thread pool thread health monitoring can be referred to. The JMX module code for thread pool thread state monitoring will not be shown here. Without further ado, the thread factory class BatchTaskThreadFactory is implemented as follows

    / * * * @ filename: BatchTaskThreadFactory. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author thread pool factory Tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ThreadFactory; public class BatchTaskThreadFactory implements ThreadFactory { final private static String BATCHTASKFACTORYNAME = "batchtask-pool"; final private String name; final private ThreadGroup threadGroup; final private AtomicInteger threadNumber = new AtomicInteger(0); public BatchTaskThreadFactory() { this(BATCHTASKFACTORYNAME); } public BatchTaskThreadFactory(String name) { this.name = name; SecurityManager security = System.getSecurityManager(); threadGroup = (security ! = null) ? security.getThreadGroup() : Thread.currentThread().getThreadGroup(); } @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(threadGroup, runnable); thread.setName(String.format("BatchTask[%s-%d]", threadGroup.getName(), threadNumber.incrementAndGet())); System.out.println(String.format("BatchTask[%s-%d]", threadGroup.getName(), threadNumber.incrementAndGet())); if (thread.isDaemon()) { thread.setDaemon(false); } if (thread.getPriority() ! = Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; }}Copy the code

    The BatchTaskReactor is a parallel asynchronous batch reactor that encapsulates ThreadPoolExecutor and uses ArrayBlockingQueue, a bounded ArrayBlockingQueue, to prevent: The producer requests the service endlessly, causing the memory to crash, and finally making the memory usage controllable takes the measure.

    / * * * @ filename: BatchTaskReactor. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * batch parallel asynchronous thread pool processing module @ the author tangjie * @ version 1.0 * * / package newlandframework. Batchtask. The parallel; import java.util.Set; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public final class BatchTaskReactor implements BatchTaskRunner { private Map<String, ExecutorService> threadPools = new ConcurrentHashMap<String, ExecutorService>(); private static BatchTaskReactor context; private static Lock REACTORLOCK = new ReentrantLock(); public static final String BATCHTASK_THREADPOOL_NAME = "newlandframework_batchtask"; private BatchTaskReactor() { initialize(); Public static BatchTaskReactor getReactor() {if (context == null) {try {reactorLock. lock(); if (context == null) { context = new BatchTaskReactor(); } } finally { REACTORLOCK.unlock(); } } return context; } public ExecutorService getBatchTaskThreadPoolName() { return getBatchTaskThreadPool(BATCHTASK_THREADPOOL_NAME); } public ExecutorService getBatchTaskThreadPool(String poolName) { if (! Threadpools.containskey (poolName) {throw new IllegalArgumentException(string. format(" batch thread poolName :[%s] parameter configuration does not exist ", poolName)); } return threadPools.get(poolName); } public Set<String> getBatchTaskThreadPoolNames() { return threadPools.keySet(); Public void close() {for (Entry<String, ExecutorService> Entry: threadPools.entrySet()) { entry.getValue().shutdown(); Println (string. format(" Close batch thread pool :[%s] succeeded ", entry.getKey())); } threadPools.clear(); } / / initializes the batch thread pool public void the initialize () {BatchTaskThreadFactoryConfiguration poolFactoryConfig = BatchTaskConfigurationLoader.getConfig(); if (poolFactoryConfig ! = null) { initThreadPool(poolFactoryConfig); } } private void initThreadPool(BatchTaskThreadFactoryConfiguration poolFactoryConfig) { for (Entry<String, BatchTaskConfiguration> entry : poolFactoryConfig.getBatchTaskMap().entrySet()) { BatchTaskConfiguration config = entry.getValue(); BlockingQueue<Runnable> queue = new; // Use BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(config.getWorkQueueSize()); ThreadPoolExecutor threadPool = new ThreadPoolExecutor( config.getCorePoolSize(), config.getMaxPoolSize(), config.getKeepAliveTime(), TimeUnit.SECONDS, queue, new BatchTaskThreadFactory(entry.getKey()),new ThreadPoolExecutor.CallerRunsPolicy()); threadPools.put(entry.getKey(), threadPool); System.out.println(string. format(" Batch Thread pool :[%s] created successfully ",config.toString())); }}}Copy the code
  6. The following design is implemented: switch Hlr instruction processing task module. Of course, in the subsequent business development process, there may be other types of instructions to handle tasks, so according to the definition of the “open and close” principle, to abstract an interface class: BusinessEvent
    / * * * @ filename: BusinessEvent. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: business event task interface definition * @ the author Tangjie * @ version 1.0 * * / package newlandframework. Batchtask. Model; Public interface BusinessEvent {// Execute specific batch task public int execute(Integer userId); }Copy the code

    Then the specific Hlr instruction sending task module HlrBusinessEvent implements the method of this interface class to complete the distribution of Hlr instructions from the user stop and return machine. The code is as follows:

    / * * * @ filename: HlrBusinessEvent. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Hlr distribute task interface definition * @ the author instructions Tangjie * @ version 1.0 * * / package newlandframework. Batchtask. Model; import org.apache.commons.lang.math.RandomUtils; Public class HlrBusinessEvent implements BusinessEvent {public final static int TASKSUCC public final static int TASKSUCC = 0; public final static int TASKFAIL = 1; private final static int ELAPSETIME = 1000; @override public int execute(Integer userId) {int millis = randomUtils.nextint (ELAPSETIME); Thread.sleep(millis); thread.sleep (millis); String strContent = string. format(" Thread id [%s] userId :[%d] time spent executing switch order :[%d] ms ", thread.currentthread ().getname (), userId, millis); System.out.println(strContent); Return (millis % 2 == 0)? TASKSUCC : TASKFAIL; } catch (InterruptedException e) { e.printStackTrace(); return TASKFAIL; }}}Copy the code

    In actual operation, we may need to monitor the length of sending instructions, so we design another one: for the HlrBusinessEvent task module, we embed the Hlr instruction length calculation agent class HlrBusinessEventAdvisor, the specific code is as follows:

    / * * * @ filename: HlrBusinessEventAdvisor. Java * * Newland Co., Ltd. All rights reserved. * * @ the Description: the proxy class * distributed Hlr instruction length calculation @ the author tangjie * @ version 1.0 * * / package newlandframework. Batchtask. Model; import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.lang.time.StopWatch; public class HlrBusinessEventAdvisor implements MethodInterceptor { public HlrBusinessEventAdvisor() { } @Override Public Object Invoke (MethodInvocation) throws Throwable {// Calculate the command dispatch time StopWatch sw = new StopWatch(); sw.start(); Object obj = invocation.proceed(); sw.stop(); System.out.println(" Time to execute switch order: [" + sw.getTime() + "] ms "); return obj; }}Copy the code

    The rest, since we want to do asynchronous parallel computation to get the execution result, we design a: Batch Hlr HlrBusinessEventTask task execution module, it will implement Java. Util. Concurrent. Callable interface method call, it returns an asynchronous task execution results.

    / * * * @ filename: HlrBusinessEventTask. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: Hlr instructions sent task execution class * @ the author tangjie * @ version 1.0 * * / package newlandframework. Batchtask. Model; import java.util.concurrent.Callable; import org.springframework.aop.framework.ProxyFactory; import org.springframework.aop.support.NameMatchMethodPointcutAdvisor; public class HlrBusinessEventTask implements Callable<Integer> { private NotifyUsers user = null; private final static String MAPPERMETHODNAME = "execute"; public HlrBusinessEventTask(NotifyUsers user) { this.user = user; } @Override public Integer call() throws Exception { synchronized (this) { ProxyFactory weaver = new ProxyFactory(new HlrBusinessEvent()); NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(); advisor.setMappedName(MAPPERMETHODNAME); advisor.setAdvice(new HlrBusinessEventAdvisor()); weaver.addAdvisor(advisor); BusinessEvent proxyObject = (BusinessEvent) weaver.getProxy(); Integer result = new Integer(proxyObject.execute(user.getUserId())); Return result; }}}Copy the code
  7. Next, we need to combine the query results of parallel asynchronous loading with the module of parallel asynchronous processing task execution, so we re-package a module to notify users of batch task management class: NotifyUsersBatchTask. Its main functions are as follows: batch parallel asynchronous loading of mobile phone users who are to be stopped, and then it is put into the thread pool of parallel asynchronous processing for asynchronous processing. We then print out the total number of tasks in the batch, and the number of successes and failures (of course, this article also provides another JMX type of monitoring). NotifyTaskFailCounter Class NotifyTaskFailCounter class NotifyTaskFailCounter class NotifyTaskFailCounter class The specific code is as follows
    / * * * @ filename: NotifyUsersBatchTask. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: notifies the user management class * batch processing tasks @ the author tangjie * @ version 1.0 * * / package newlandframework. Batchtask; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import javax.sql.DataSource; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.commons.collections.Closure; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.IfClosure; import org.apache.commons.lang.StringUtils; import newlandframework.batchtask.jmx.BatchTaskMonitor; import newlandframework.batchtask.model.NotifyUsers; import newlandframework.batchtask.parallel.BatchQueryLoader; import newlandframework.batchtask.parallel.BatchTaskReactor; public class NotifyUsersBatchTask { public NotifyUsersBatchTask() { } private ArrayList<DataSource> dataSource; // JMx-based task completion monitoring counter private BatchTaskMonitor monitor = new BatchTaskMonitor(BatchTaskReactor.BATCHTASK_THREADPOOL_NAME); Public NotifyUsersBatchTask(ArrayList<DataSource> DataSource) {this. DataSource = DataSource; } class NotifyTaskSuccCounter implements Closure {public static Final String NotifyTaskSuccCounter = "TASKSUCCCOUNTER"; private int numberSucc = 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKSUCCCOUNTER); numberSucc++; } public int getSuccNumber() { return numberSucc; Class NotifyTaskFailCounter implements Closure {public static Final String NotifyTaskFailCounter = "TASKFAILCOUNTER"; private int numberFail = 0; public void execute(Object input) { monitor.increaseBatchTaskCounter(NOTIFYTASKFAILCOUNTER); numberFail++; } public int getFailNumber() { return numberFail; Public List<NotifyUsers> Query () throws SQLException {BatchQueryLoader loader = new BatchQueryLoader(); String strSQL = "select home_city, msisdn, user_id from notify_users"; for (int i = 0; i < dataSource.size(); i++) { Connection con = dataSource.get(i).getConnection(); Statement st = con.createStatement(); loader.attachLoadEnv(strSQL, st, con); } List<ResultSet> list = loader.executeQuery(); System.out.println(" + list.size()); final List<NotifyUsers> listNotifyUsers = new ArrayList<NotifyUsers>(); for (int i = 0; i < list.size(); i++) { ResultSet rs = list.get(i); while (rs.next()) { NotifyUsers users = new NotifyUsers(); users.setHomeCity(rs.getInt(1)); users.setMsisdn(rs.getInt(2)); users.setUserId(rs.getInt(3)); listNotifyUsers.add(users); }} // Release the connection resource loader.close(); return listNotifyUsers; Public void batchNotify(List<NotifyUsers> List, Final ExecutorService excutor) {system.out.println (" Processing record count :" + list.size()); System.out.println(stringutils. center(" record details as follows ", 40, "-")); NotifyTaskSuccCounter cntSucc = new NotifyTaskSuccCounter(); NotifyTaskFailCounter cntFail = new NotifyTaskFailCounter(); BatchTaskPredicate predicate = new BatchTaskPredicate(excutor); Closure batchAction = new IfClosure(predicate, cntSucc, cntFail); CollectionUtils.forAllDo(list, batchAction); System.out.println(" batch total :" + list.size() + "record :" + cntsucc.getSuccNumber () +" record :" + Cntfail.getfailnumber () + "record "); }}Copy the code

    The BatchTaskPredicate module collects tasks to be processed asynchronously from the thread pool and reports the results of the asynchronous predicate to the thread pool: Successful or failed. The specific code is as follows:

    / * * * @ filename: BatchTaskPredicate. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * batch asynchronous tasks submitted to perform a task modules @ the author tangjie * @ version 1.0 * * / package newlandframework. Batchtask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.collections.Predicate; import newlandframework.batchtask.model.HlrBusinessEvent; import newlandframework.batchtask.model.HlrBusinessEventTask; import newlandframework.batchtask.model.NotifyUsers; public class BatchTaskPredicate implements Predicate { private ExecutorService excutor = null; public BatchTaskPredicate(ExecutorService excutor) { this.excutor = excutor; } public boolean evaluate(Object object) { if (object instanceof NotifyUsers) { NotifyUsers users = (NotifyUsers) object; Future<Integer> future = excutor.submit(new HlrBusinessEventTask(users)); Try {// Set timeout for 5s Integer result = future.get(5, timeunit.seconds); return result.intValue() == HlrBusinessEvent.TASKSUCC; } catch (Exception e) {// Try to cancel the execution of future.cancel(true) for this task if it fails; e.printStackTrace(); return false; } } else { return false; }}}Copy the code

    Finally, we notify users of the batch task management class NotifyUsersBatchTask, which is constructed to batch load data objects from multiple data sources by specifying a database connection pool. Here we assume that the notify_Users table corresponding to CMS/CCS data sources is loaded in parallel, and its spring configuration batchtask-multidb.xml is as follows:

    <? The XML version = "1.0" encoding = "utf-8"? > <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>newlandframework/batchtask/jdbc-cms.properties</value> <value>newlandframework/batchtask/jdbc-ccs.properties</value> </list> </property> </bean> <bean id="dtSource-cms" destroy-method="close" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="${jdbc.cms.driverClassName}"/> <property name="url" value="${jdbc.cms.url}"/> <property name="username" value="${jdbc.cms.username}"/> <property name="password" value="${jdbc.cms.password}"/> </bean> <bean id="dtSource-ccs" destroy-method="close" class="org.apache.commons.dbcp.BasicDataSource"> <property name="driverClassName" value="${jdbc.ccs.driverClassName}"/> <property name="url" value="${jdbc.ccs.url}"/> <property name="username" value="${jdbc.ccs.username}"/> <property name="password" value="${jdbc.ccs.password}"/> </bean> <bean id="notifyUsers" class="newlandframework.batchtask.NotifyUsersBatchTask"> <constructor-arg name="dataSource"> <list> <ref bean="dtSource-ccs"/> <ref bean="dtSource-cms"/> </list> </constructor-arg> </bean> </beans>Copy the code
  8. Let’s implement a monitoring module for the completion of thread pool batch processing tasks through JMX. Start by defining an MBean interface that returns the count result based on the name of the counter.
    / * * * @ filename: BatchTaskMonitorMBean. Java * * Newland Co., Ltd. All rights reserved. * * @ Description: JMX monitoring interface * batch processing tasks @ the author tangjie * @ version 1.0 * * / package newlandframework. Batchtask. JMX; public interface BatchTaskMonitorMBean { public int getBatchTaskCounter(String taskName); }Copy the code

    Let’s implement this interface again, and then design the BatchTaskMonitor module

    / * * * @ filename: BatchTaskMonitor. Java * * Newland Co., Ltd. All rights reserved. @ Description: * * * @ author JMX batch task monitoring module Tangjie * @ version 1.0 * * / package newlandframework. Batchtask. JMX; import javax.management.AttributeChangeNotification; import javax.management.NotificationBroadcasterSupport; import javax.management.ObjectName; import javax.management.InstanceAlreadyExistsException; import javax.management.MBeanRegistrationException; import javax.management.MalformedObjectNameException; import javax.management.NotCompliantMBeanException; import java.util.concurrent.atomic.AtomicInteger; import java.lang.management.ManagementFactory; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; public class BatchTaskMonitor extends NotificationBroadcasterSupport implements BatchTaskMonitorMBean { private static final String TASKMONITOR_NAME = "newlandframework.batchtask.jmx.{0}:type=BatchTaskMonitor"; Map<String, AtomicInteger> batchTaskCounter; private int sequenceTaskNumber = 0; Public BatchTaskMonitor(String taskName) {batchTaskCounter = new HashMap<String, AtomicInteger>(); try { registerMBean(taskName); } catch (InstanceAlreadyExistsException e) { System.out.println("InstanceAlreadyExistsException BatchTaskMonitor Register Fail"); } catch (MBeanRegistrationException e) { System.out.println("MBeanRegistrationException BatchTaskMonitor Register Fail"); } catch (NotCompliantMBeanException e) { System.out.println("NotCompliantMBeanException BatchTaskMonitor Register Fail"); } catch (MalformedObjectNameException e) { System.out.println("MalformedObjectNameException BatchTaskMonitor Register Fail"); } } private void registerMBean(String taskName) throws InstanceAlreadyExistsException, MBeanRegistrationException, NotCompliantMBeanException, MalformedObjectNameException { String strObjectName = MessageFormat.format(TASKMONITOR_NAME, taskName); ManagementFactory.getPlatformMBeanServer().registerMBean(this, new ObjectName(strObjectName)); } / / batch task counter increasing public void increaseBatchTaskCounter (String taskName) {if (batchTaskCounter. Either containsKey (taskName)) { notifyMessage(taskName, batchTaskCounter.get(taskName).incrementAndGet()); } else { batchTaskCounter.put(taskName, new AtomicInteger(1)); } } private void notifyMessage(String taskName, int batchNewTaskCounter) { sendNotification(new AttributeChangeNotification(this, sequenceTaskNumber++, System.currentTimeMillis(), "batchTaskCounter \"" + taskName + "\" incremented", "batchTaskCounter", "int", batchNewTaskCounter - 1, batchNewTaskCounter)); } / / counter for counting results public int getBatchTaskCounter (String taskName) {if (batchTaskCounter. Either containsKey (taskName)) {return batchTaskCounter.get(taskName).intValue(); } else { return 0; }}}Copy the code

    The name of the counter I have specified in the NotifyUsersBatchTask module. The batch task success counter is called String NOTIFYTASKSUCCCOUNTER = “TASKSUCCCOUNTER”. The batch task failure counter is called String NOTIFYTASKFAILCOUNTER = “TASKFAILCOUNTER”. This allows us to monitor the execution of thread pool tasks via JConsole.

  9. Eventually, we need to “assemble” all the above modules. The reference code for client invocation is shown in the following example
    Try {/ / initializes the asynchronous tasks in parallel reactor BatchTaskReactor reactor. = BatchTaskReactor getReactor (); final ExecutorService excutor = reactor.getBatchTaskThreadPool(BatchTaskReactor.BATCHTASK_THREADPOOL_NAME); List<NotifyUsers> listNotifyUsers = null; NotifyUsersBatchTask notifyTask = (NotifyUsersBatchTask) context.getBean("notifyUsers"); ListNotifyUsers = notifytask.query (); listNotifyUsers = notifytask.query (); listNotifyUsers = notifytask.query (); StopWatch sw = new StopWatch(); sw.start(); BatchNotify (listNotifyUsers, excutor); sw.stop(); reactor.close(); String strContent = the String. Format (" = = = = = = = = = end of batch parallel task execution, time-consuming [% d] milliseconds = = = = = = = = = ", sw, getTime ()); System.out.println(strContent); } catch (SQLException e) { e.printStackTrace(); }Copy the code

Let’s run it again and see what happens? Insert fuzhou 591 and Xiamen 592 into the database (in fact, you can insert more, and the more you see the value of this asynchronous parallel batch framework). The operation screenshot is as follows:

     

Just as we expected. Very good.

Now let’s take a look at monitoring the completion of parallel batch asynchronous thread pool tasks using JMX technology. Let’s first connect to our MBean object, BatchTaskMonitor.

     

There is an exposed method called getBatchTaskCounter (returns count results by counter name). In the red circle above, enter the TASKFAILCOUNTER counter counter and click OK. The final running result is as follows:

     

We found that we have failed to process 196 batch tasks so far! As we hoped, visual real-time monitoring felt good.

Write in the last

Finally, we design a simplified batch processing framework through parallel asynchronous loading technology and thread pool mechanism. Although the above code is not much, but, it is very unique application scenarios, sparrow is small five viscera. Believe it to other colleague friend, still have very draw lessons from significance. Besides, now the server is multi – core, multi – CPU configuration, we want to make good use of this hardware resources. For IO intensive applications, can be based on the above ideas, to improve, I believe that must be able to receive good results!

Okay, so that’s a lot of content and code that I didn’t know I was writing. The preparation, coding, debugging and writing of this paper also consumed a lot of my mental and energy. However, I am still very happy, thinking that SOME of my ideas can be deposited through the way of blog, which has reference significance for others, and is a kind of “learning and summary” for myself. The way ahead is so long without ending, yet high and low I’ll search with my will unbending. So here, throw out a brick to attract jade. If I have said the wrong place, I hope you garden friends criticism and correction! Don’t hesitate to give advice!