Recently, batch processing was needed for the project. I searched online and found a pretty good article. I reprinted it and shared it with you.

Transfer: https://blog.csdn.net/william_jm/article/details/78964538

1.1 Overview of the era of big data, data collection, processing, storage, analysis, mining, retrieval, display, interlocking. Among them, data processing is a typical batch processing scenario — regularly formatting massive data, verifying various business specifications, processing complex business logic, and processing it into its own database through transactions. At the same time, it should have high efficiency and no human intervention ability. The emergence of Spring Batch is a good response to this requirement. Spring Batch is a lightweight comprehensive Batch processing framework that can be applied to enterprise-level large data processing systems. SpringBatch can provide a large number of repeatable data processing functions, including logging/tracing, transaction management, task processing statistics, task restart, skip, and resource management. It also provides a number of advanced services and features that enable efficient batch processing of very large data sets through optimization and partitioning techniques. It’s important to note that Spring Batch does not provide features like timing, which is what scheduling frameworks like Quartz, Tivoli, Control-M, and others do, and they are collaborative, not a replacement. 1.2 Background While microservices architecture is being discussed, java-based batch frameworks have been ignored. Even though there has always been a need for batch processing in the enterprise, the lack of a standard, reusable framework for batch processing has resulted in a large number of once-write, once-use code snippets and many other different AD hoc solutions in projects/products. SpringSource and Accenture are working together to improve the situation. Accenture has rich industry experience in implementing batch architecture, and SpringSource has in-depth technical development accumulation. Backed by the programming model provided by Spring framework, the combination of strong players is bound to create high quality and market recognized enterprise Java solution — SpringBatch. Projects built on accenture’s decades of valuable experience and based on the latest software platforms such as COBOL/Mainframe, C++/Unix, and now the very popular Java platform. Spring Batch will be driven in the future by open source community committers for project development, enhancement, and future roadmap. Accenture’s goal with SpringSource is to promote standardized improvements in software processing methods, frameworks and tools. 1.3 Scenario A typical batch processing process consists of reading, processing, and writing data in a three-step architecture. A large amount of data is read from a database, file, or queue, processed by service rules, and written as required (database, file, etc.). Spring Batch typically works in offline mode, automating basic Batch iterations for transaction-like processing without user intervention. ø Concurrent batch processing: Execute tasks in parallel ø Phased, enterprise message-driven processing ø High concurrent batch processing ø Manual or timed restart after failure ø Sequential processing of dependent tasks (using workflow-driven batch plug-in) ø Local processing: Skip records (e.g. when rolling back) ø full batch transactions: because there may be a small amount of data in the batch or stored procedures/scripts exist Enabling developers to focus on business logic, enabling the framework to address basic functions ø clearly divided in the batch infrastructure, execution environment, application ø common core services in the form of interfaces ø provide simple default implementation, Easy to configure, customize, and extend ø core services that are easy to extend and replace, The architecture design of Spring Batch fully considers the scalability of the system and the universality of various terminal development. Figure 2.1.1 shows the hierarchical architecture of Spring Batch.

The SpringBatch architecture is divided into three types of high-level components: Application, Core, and Infrastructure. Application layer: Refers to all batch business jobs and custom code written by developers. Core: Refers to the Core classes required to load and control batch jobs. Implementation of JobLauncher, Job, and Step. Infrastructure: Both the application layer and the core layer are built on top of the Infrastructure layer. The infrastructure includes generic readers(ItemReaders) and writers(ItemWriter), as well as services (such as the retry module RetryTemplate), which can be used by both the application and core layers. 2.2 Domain term Step: refers to a complete business logic Step in a Job. A Job can be composed of one or more steps. StepExecution: A handle to the execution of a step. The step is created only if it actually runs. Job: A Job is the entity that encapsulates the entire batch process. A simple job requires the configuration of a job name, an ordered step, and whether to restart. JobInstance: a JobInstance has no hard association with the data to be loaded. This is determined solely by the data reader ItemReader. For example, whether to use the same job instance is determined by the ItemReader based on the state bits of the previous execution. Using the new JobInstance means reading data from the beginning, and using the existing one means starting where it left off last time. JobParameter: a set of parameters to start a batch job. Also, it can be used to identify the uniqueness of JobInstance. Therefore, JobInstance=Job+JobParameter. JobExecution: Handle that represents the attempt to run a job. As shown in Figure 2.2.1 below, a Job is like a container that can contain multiple business logic steps step and JobInstance to organize the execution of a Job (and ensure its restart), while JobExecution is dedicated to recording the execution state. Each execution of JobExecution and step transfers data, such as commitCount, rollbackCount, startTime, endTime, etc., which are recorded into StepExecution. Figure 2.2.1- Batch framework

Run-time model JobLauncher: It is the Spring Batch framework infrastructure layer that provides the capability to run jobs. For a Job with a given Job name and Job Parameters, JobLauncher is called in a Java program, command line, or other scheduling framework (such as Quartz) to execute the Job. JobRepository (JobRepository) : To store Job Execution metadata (Job Instance, Job Execution, Job Parameters, Step Execution, and Execution Context). There are two default implementations — memory or database. If metadata is stored in a database, the execution status of batch jobs can be monitored at any time. The result of Job execution is success or failure, and it is possible to restart the failed Job. ItemReader: Abstracts the input to a step, reads one record at a time, and returns null when all records are read. ItemProcessor: An abstraction that processes each record according to business logic. ItemWriter: An abstraction of the output of a step that can be provided to only one batch job or chunk at a time. Figure 2.2.2 shows the complete SpringBatch domain concept model. JobLancaster starts a Job. A Job can have multiple steps, and each Step corresponds to an ItemReader, ItemProcessor, and ItemWriter. JobRepository records Job execution information.

2.2.2-Spring Batch Domain concept model

3实战演习 光说不练假把式,这个章节就让我们一起实战操练下。 3.1What I’ll build 定时每天凌晨1点,按业务需求将TEST_TASK_PROPERTY表和DQP_TEST_FILE表数据汇总整合到表DQP_REPORT_A,即将结果数据表汇总到统计表中。 3.2What you’ll need ● Eclipse ● JDK 1.7 or later ● Maven 3.0 3.3Set up the project 本工程是由maven构建,使用SpringBoot简化复杂的依赖配置及部署,使用Quartz作为任务调度框架,SpringBatch作为批处理框架,数据持久化使用JPA。 3.3.1pom.xml文件 [html] view plain copy 120.

121. <projectxmlnsprojectxmlns=”http://maven.apache.org/POM/4.0.0″xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”

122. xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd”>

123. 4.0.0

124. com.william.lab.springboot.springbatch

125. springbatch

126. 0.0.1-SNAPSHOT

127. jar

128. springbatch

129. Testproject for Spring Boot + Spring Batch + Quartz

130.

131.

132. org.springframework.boot

133. spring-boot-starter-parent

134. 1.5.6.RELEASE

135.

136.

137.

138.

139. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

140. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

141. <java.version>1.7</java.version>

142.

143.

144.

145.

146. org.springframework.boot

147. spring-boot-starter-batch

148.

149.

150. org.springframework.boot

151. spring-boot-starter-data-jpa

152.

153.

154. slf4j-api

155. org.slf4j

156.

157.

158. jboss-logging

159. org.jboss.logging

160.

161.

162.

163.

164. org.springframework.boot

165. spring-boot-starter-web

166.

167.

168. log4j-over-slf4j

169. org.slf4j

170.

171.

172.

173.

174. mysql

175. mysql-connector-java

176. runtime

177.

178.

179. org.springframework

180. spring-context-support

181.

182.

183. org.springframework

184. spring-tx

185.

186.

187. org.quartz-scheduler

188. quartz

189. 2.2.1

190.

191.

192. slf4j-api

193. org.slf4j

194.

195.

196.

197.

198. org.quartz-scheduler

199. quartz-jobs

200. 2.2.1

201.

202.

203. commons-lang

204. commons-lang

205. 2.6

206.

207.

208.

209. com.jcraft

210. jsch

211. 0.1.54

212.

213.

214. commons-io

215. commons-io

216. 2.4

217.

218.

219. commons-net

220. commons-net

221. 3.1

222.

223.

224. org.springframework.boot

225. spring-boot-starter-test

226. test

227.

228.

229.

230.

231.

232.

233. org.springframework.boot

234. spring-boot-maven-plugin

235.

236.

237.

238.

3.3.2Batch作业模块配置 [java] view plain copy 50. @Configuration

51. @EnableBatchProcessing

52. public class BatchConfiguration {

53. @Autowired

54. private JobBuilderFactoryjobBuilderFactory;

55. @Autowired

56. private StepBuilderFactorystepBuilderFactory;

57. @PersistenceUnit

58. private EntityManagerFactory emf;

59.

60. @StepScope

61. publicJpaPagingItemReader reader() {

62. JpaPagingItemReaderreader = new JpaPagingItemReader();

63. reader.setQueryString(“selectnew TestReport(ttp.taskId, tra.fileId, ttp.ruleId,sum( tra.count))”

64. + ” fromTestFile tra,TestTaskProperty ttp WHERE ttp.taskId=tra.taskId AND ttp.beginTimeBETWEEN ?1 AND ?2 “

65. + “GROUP BYttp.taskId, tra.fileId, ttp.ruleId”);

66. Map<String, Object>parameterValues = new HashMap<>();

67. parameterValues.put(“1”,CommonUtils.getTimeSection(0, 0, 0));

68. parameterValues.put(“2”,CommonUtils.getTimeSection(23, 59, 59));

69. reader.setParameterValues(parameterValues);

70. reader.setEntityManagerFactory(emf);

71. reader.setPageSize(Integer.MAX_VALUE);

72. return reader;

73. }

74.

75. @Bean

76. public TestFileProcessor processor(){

77. return newTestFileProcessor();

78. }

79.

80. @Bean

81. publicJpaItemWriter writer() {

82. JpaItemWriterwriter = new JpaItemWriter();

83. writer.setEntityManagerFactory(emf);

84. return writer;

85. }

86.

87. @Bean

88. public Step step() {

89. returnstepBuilderFactory.get(“step”).<TestReport, TestReport>chunk(10).reader(reader()).processor(processor())

90. .writer(writer()).build();

91. }

92.

93. @Bean

94. public Job importUserJob(JobRepositoryjobRepository) {

95. returnjobBuilderFactory.get(“importUserJob”).incrementer(newRunIdIncrementer()).repository(jobRepository)

96. .flow(step()).end().build();

97. }

98. }

在Spring的体系中@EnableBatchProcessing 注释的工作原理与其它的带有 @Enable * 的注释类似。在这种情况下, @EnableBatchProcessing 提供了构建批处理任务的基本配置。在这个基本的配置中,除了创建了一个StepScope的实例,还可以将一系列可用的bean进行自动装配: JobRepositorybean 名称 “jobRepository” JobLauncher bean名称”jobLauncher” JobRegistry bean名称”jobRegistry” PlatformTransactionManagerbean名称 “transactionManager” JobBuilderFactorybean名称”jobBuilders” StepBuilderFactorybean名称”stepBuilders” 这种配置的核心接口是BatchConfigurer。它为以上所述的bean提供了默认的实现方式,并要求在context中提供一个bean,即DataSource。数据库连接池由被JobRepository使用。 注意只有一个配置类需要有@ enablebatchprocessing注释。只要有一个类添加了这个注释,则以上所有的bean都是可以使用的。 3.3.2.1作业Job和步骤Step Step()方法是组合特定业务需求步骤的,如上章节介绍,是由reader、processor和writer组成。importUserJob()方法提供的是组合业务作业的,由Step组成,并可以由jobRepository()方法将作业持久化。 3.3.2.2作业处理单元reader、writer、processor reader()方法是读取数据的方法,这里实例化是JpaPagingItemReader()方法。JpaPagingItemReader允许您声明一个JPQL语句,并传入一个 EntityManagerFactory。然后就和其他的 ItemReader 一样,每次调用它的 read 方法都会返回一个 item。当需要更多实体,则内部就会自动发生分页。 writer()方法是将处理结果持久化进数据库的,其中JpaItemWriter是 JPA EntityManager aware 的,用来处理事务性工作,而执行实际的写入工作是委托另一个非jpa相关的(non-“jpa aware”) ItemWriter做的。 processor()方法是业务数据处理方法,如下代码段,处理了简单业务逻辑。 [java] view plain copy 10. public class TestFileProcessor implementsItemProcessor<TestReport, TestReport> {

11. private static final Logger log=LoggerFactory.getLogger(TestFileProcessor.class);

12. @Override

13. public TestReport process(finalTestReport testReport) throws Exception {

14. testReport.setTimeSection(CommonUtils.getTimeSection(0,0, 0));

15. log.info(“StatisticResult 【” +testReport + “】”);

16. return testReport;

17. }

18. }

3.3.3Quartz调度模块配置 3.3.3.1Trigger触发器 [java] view plain copy 55. @Component(“cronTriggerFactoryBean”)

56. public class CronTriggerFactoryBean {

57. @Autowired

58. private SchedulerFactoryBeanschedulerFactoryBean;

59. /** 60. * 添加或修改一个定时任务 61. */

62. public void createNewTask(Stringexpression, int taskId) throws SchedulerException {

63. TriggerKey triggerKey =TriggerKey.triggerKey(“TASK-” + taskId, “JOB-” +taskId);

64. CronTrigger trigger = null;

65. // 不存在,创建一个

66. JobKey jobKey = newJobKey(“TASK-” + taskId, “JOB-” + taskId);

67. JobDetail jobDetail = JobBuilder.newJob(SpringQuartzJob.class).withIdentity(jobKey).build();

68. // 稽核任务基础信息

69. jobDetail.getJobDataMap().put(“taskId”,taskId);

70. // 表达式调度构建器

71. CronScheduleBuildercronScheduleBuilder = null;

72. cronScheduleBuilder =CronScheduleBuilder.cronSchedule(expression);

73. // 按cronExpression表达式构建一个新的trigger

74. trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).startAt(newDate()).withSchedule(cronScheduleBuilder).build();

75. // 加入任务队列

76. Scheduler scheduler =schedulerFactoryBean.getScheduler();

77. scheduler.scheduleJob(jobDetail,trigger);

78. scheduler.rescheduleJob(triggerKey,trigger);

79. }

80. }

81. 这是一个简单生成周期任务触发器类,由任务配置接口传入任务执行周期表达式(cron表达式)和任务编号等基础信息,建立CronTrigger定时触发器,调度quartz作业类。

82. 3.3.3.2

83.

84. @Component(“springQuartzJob”)

85. public class SpringQuartzJob extends QuartzJobBean {

86. @Autowired

87. Job importUserJob;

88. @Autowired

89. private JobLauncher jobLauncher;

90. @Override

91. public void executeInternal(finalJobExecutionContext context) throws JobExecutionException {

92. System.out.println(“TestJobStart:” + Thread.currentThread().getId());

93. try {

94. init();

95. JobParameters jobParameters= new JobParametersBuilder().addLong(“time”, System.currentTimeMillis())

96. .toJobParameters();

97. JobExecution result =jobLauncher.run(importUserJob, jobParameters);

98. } catch (Exception e) {

99. e.printStackTrace();

100. }

101. System.out.println(“Job1End”);

102. }

103.

104. public void init() {

105. importUserJob =(Job) MyApplicationContextUtil.getBeanObj(“importUserJob”);

106. jobLauncher =(JobLauncher) MyApplicationContextUtil.getBeanObj(“jobLauncher”,JobLauncher.class);

107. }

108. }

JobParameters类似与Quartz中的JobDataMap,传递作业需要的数据。 jobLauncher.run()方法是通过作业Job和作业参数JobParameters来唯一标识作业仓库中已有的作业,并执行作业。 3.3.3.3ApplicationContextAware [java] view plain copy 20. public class MyApplicationContextUtil implementsApplicationContextAware {

21. private staticApplicationContext context;

22. public static void setContext(ApplicationContextcontext) {

23. MyApplicationContextUtil.context= context;

24. }

25. @Override

26. public void setApplicationContext(ApplicationContextcontext) throws BeansException {

27. this.context =context;

28. }

29. public staticApplicationContext getContext() {

30. return context;

31. }

32. public final staticObject getBeanObj(String beanName) {

33. return context.getBean(beanName);

34. }

35. public final static Object getBeanObj(StringbeanName, Class<?> requiredType) {

36. return context.getBean(beanName,requiredType);

37. }

38. }

MyApplicationContextUtil继承了ApplicationContextAware接口,实现public void setApplicationContext(ApplicationContext context)throwsBeansException方法,获取spring配置上下文ApplicationContext,用于通过bean名字获取bean方法public final static ObjectgetBeanObj(StringbeanName)。 3.3.4SpringbatchApplication启动类 [java] view plain copy 22. @SpringBootApplication

23. @PropertySource(value = {“./application.properties” })

24. publicclass SpringbatchApplication {

25. publicstatic ConfigurableApplicationContext ctx;

26. publicstatic void main(String[] args) {

27. ctx= SpringApplication.run(new Object[] { QuartzResource.class}, args);

28. }

29. @Bean

30. publicSchedulerFactoryBean schedulerFactoryBean() throws Exception {

31. SchedulerFactoryBeanschedulerFactoryBean = new SchedulerFactoryBean();

32. PropertiesquartzProperties = new Properties();

33. FileInputStream in = newFileInputStream(“./src/main/resources/quartz.properties”);

34. quartzProperties.load(in);

35. schedulerFactoryBean.setQuartzProperties(quartzProperties);

36. returnschedulerFactoryBean;

37. }

38. @Bean

39. publicMyApplicationContextUtil myApplicationContextUtil() {

40. returnnew MyApplicationContextUtil();

41. }

42. }

public SchedulerFactoryBean schedulerFactoryBean()throwsException方法是用于初始化quartz配置信息quartz.properties。 3.3.5一个创建定时任务的web接口 [java] view plain copy 20. @RestController

21. @ComponentScan(basePackages= { “com.william.lab.springboot.springbatch.springbatch” })

22. @RequestMapping(“/quartz”)

23. public class QuartzResource {

24. private Logger LOGGER =LoggerFactory.getLogger(QuartzResource.class);

25. @Autowired

26. private CronTriggerFactoryBeancronTriggerFactoryBean;

27.

28. final int CREATE_ID = 17;

29.

30. @RequestMapping(value =”/get/{taskId}”, method = RequestMethod.GET)

31. public void createTask(@PathVariable(“taskId”)String taskId) throws SchedulerException {

32. String str[] =taskId.split(“,”);

33. for (int i = 0; i< str.length; i++) {

34. int taskIdx =Integer.parseInt(str[i]);

35. cronTriggerFactoryBean.createNewTask(“00/1 * * * ?”, 1);

36. }

37. }

38. }

这是一个简单的接口,用户可以通过此接口定义quartz调度batch作业任务。 3.3.6配置文件application.properties与quartz.properties 3.3.6.1application.properties [plain] view plain copy 23. # Tomcatport

24. server.port=18080

25. #Spring Batch

26. spring.batch.job.enabled=false

27. # MySQL DB

28. spring.datasource.url=jdbc:mysql://localhost:3306/william_lab?useUnicode=true&characterEncoding=UTF-8

29. spring.datasource.username=root

30. spring.datasource.password=123456

31. spring.datasource.driver-class-name=com.mysql.jdbc.Driver

32. # log config

33. logging.config=file:./src/main/resources/logback-spring.xml

34. #database pool

35. spring.datasource.tomcat.max-idle=15

36. spring.datasource.tomcat.max-wait=1000

37. spring.datasource.tomcat.maxActive=50

38. spring.datasource.tomcat.min-idle=5

39. spring.datasource.tomcat.initial-size=10

40. spring.datasource.tomcat.validation-query=SELECT1

41. spring.datasource.tomcat.test-on-borrow=false

42. spring.datasource.tomcat.test-while-idle=true

43. spring.datasource.tomcat.time-between-eviction-runs-millis=18800

44. spring.datasource.tomcat.jdbc-interceptors=ConnectionState;SlowQueryReport(threshold=0)

注意:当配置文件里定义spring.batch.job.enabled为true,或者没定义(默认为true)的时候,会初始化一个JobLauncherCommandLineRunner的bean,自动执行batch配置好的作业Job。鉴于我们将batch的作业Job调度任务交由Quartz调度,所以设置为false,这样工程启动后只会初始化batch作业配置,但不执行。 3.3.6.2quartz.properties [plain] view plain copy 27. # Configure MainScheduler Properties

28. org.quartz.scheduler.instanceName:DQPScheduler

29. org.quartz.scheduler.instanceId:AUTO

30. org.quartz.scheduler.skipUpdateCheck:false

31. # Configure ThreadPool

32. org.quartz.threadPool.class:org.quartz.simpl.SimpleThreadPool

33. org.quartz.threadPool.threadCount:1000

34. org.quartz.threadPool.threadPriority:5

35. # ConfigureJobStore

36. org.quartz.jobStore.misfireThreshold:60000

37. org.quartz.jobStore.class:org.quartz.impl.jdbcjobstore.JobStoreTX

38. org.quartz.jobStore.driverDelegateClass:org.quartz.impl.jdbcjobstore.StdJDBCDelegate

39. org.quartz.jobStore.useProperties:false

40. org.quartz.jobStore.dataSource:dqpDS

41. org.quartz.jobStore.tablePrefix:dqp_qrtz_

42. org.quartz.jobStore.isClustered:false

43. # Configure Datasources

44. org.quartz.dataSource.dqpDS.driver:com.mysql.jdbc.Driver

45. org.quartz.dataSource.dqpDS.URL:jdbc:mysql://localhost:3306/william_lab?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true

46. org.quartz.dataSource.dqpDS.user:root

47. org.quartz.dataSource.dqpDS.password:123456

48. org.quartz.dataSource.dqpDS.maxConnections:100

49. org.quartz.dataSource.dqpDS.validationQuery=select1

50. org.quartz.dataSource.dqpDS.idleConnectionValidationSeconds=60

51. org.quartz.dataSource.dqpDS.validateOnCheckout=true

52. org.quartz.dataSource.dqpDS.discardIdleConnectionsSeconds=60

注意:最后4行配置是保证quartz的数据库连接池中,无效链接的释放。 4总结 Spring Batch将整个批处理作业流程分了3个基础阶段:读数据、业务处理、归档结果数据,且提供了许多读数据接口(文件,jpa,jdbc、MongDB等),同样写数据接口也很丰富(文件,jpa,jdbc、MongDB等),还有日志、监控、任务重启与跳过等特性。而开发者只需要关注事务的粒度,日志监控,执行方式,资源管理,读数据,处理数据,写数据的解耦等方面。但是,Spring Batch未提供关于批处理任务调度的功能,因此如何周期性的调用批处理任务需要自己想办法解决,就Java来说,Quartz是一个不错的解决方案,或者写脚本处理之。