The method of passing data and parameters in Spring Batch.

1 the introduction

This is the ninth article in the Spring Batch series.

  • Data Batch artifact -Spring Batch(1) Introduction and usage scenarios
  • Quick introduction to component – Spring Batch (2) helloWorld
  • Quick use of the component – Spring Batch (3) reads file data to the database
  • Decisive Database – Spring Batch (4) Database to database
  • Convenient data read and write – Spring Batch (5) combines beetlSql for data read and write
  • Incremental Synchronization – Spring Batch (6) Dynamic parameter binding with incremental synchronization
  • Scheduling and monitoring – Spring Batch (7) Combines XXL-job to perform batch processing
  • Mongo Synchronization – Spring Batch (8) is used by mongo read-write components

The previous article in the way of the example of Spring Batch processing is detailed, I believe that you are familiar with the string, file, relational database and NoSQL database reading, processing, writing process. There are multiple steps in this task flow, from Job startup to Step execution, including read component, process component, and write component. What should be done if custom data needs to be transmitted in this process? This article will describe the method of Spring Batch data transfer, and will still use code examples to explain. Including the following contents:

  • Integrated multi-data source database access based on Mybatis- Plus
  • Use ExecutionContext to share data
  • StepScope Dynamically binding parameter transfer

2 Development Environment

  • JDK environment: JDk1.8
  • Spring the Boot: 2.1.4. RELEASE
  • Spring Batch: 4.1.2. RELEASE
  • Development IDE: the IDEA
  • Build tool Maven: 3.3.9
  • Log component Logback :1.2.3
  • Lombok: 1.18.6
  • MySQL: 5.6.26
  • Mybatis – plus: 3.4.0

This example source code has been put to github:https://github.com/mianshenglee/spring-batch-example/tree/master/spring-batch-param, combining with the example code, please read.

3. Database access of integrated multi-data sources based on Mybatis- Plus

This example uses the functionality of the original example to read user data from the source database, process the data, and then write to the target database. The parameters are passed when the task starts and in the job step. I have already described how to use BeetlSQL to configure multiple data sources ([Convenient data read and write – Spring Batch (5) with BeetlSQL][5]) to achieve data batch processing. There are also many friends who use Mybatis or Mybatis -Plus for database reading and writing, so it is worth mentioning how Spring Batch can configure multi-data source operations with Mybatis or Mybatis -Plus. This example uses Mybatis- Plus as an example.

The SQL directory in the sample project has the corresponding database script, where the source database mytest.sql script creates a test_USER table with the corresponding test data. The target database my_test1.sql has the same table structure as mytest. SQL, and spring-Batch-mysql. SQL is a database script provided by Spring Batch itself.

3.1 Introduction of Mybatis- Plus in POM files

<! --mybatis-plus-->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.4.0</version>
</dependency>

Copy the code

3.2 Configuring and Using Multiple Data sources

This example will involve three databases: the Spring Batch database itself, the source database for the Batch processing, and the target database for the Batch processing. Therefore, multiple databases need to be processed, and with multiple source policies, multiple data sources can be easily processed. In brief, it can be divided into the following steps:

  • Configure multi-data source connection information
  • Partition according to different data sourcesmapperPackage,entityPackage,mapper.xmlpackage
  • Configure independent according to different data sourcesSqlSessionFactory
  • The value varies according to different application scenariosmapper

For details on how to configure multiple source policies for multiple data sources, please refer to my article “Getting SpringBoot Multiple Data Sources (1) : Multiple Source Policies.”

4 ExecutionContext passes parameters

The configuration process of Spring Batch reads data (ItemReader), processes data (ItemProcessor), and writes data (ItemWriter) is described in previous articles. It is important to remember that when a Job is started, Spring Batch identifies different jobs by Job name and JobParameters as unique identifiers. A Job can have multiple steps, and each Step has specific operation logic (read, process, and write). You need to understand the concept of ExecutionContext in terms of how to pass between Job and Step steps.

4.1 — an optional ExecutionContext concept

Spring Batch provides ExecutionContext for data persistence during Job running and Step running. It can be used to share data based on services, such as static data and state data for restart. The diagram below:

The Execution Context is essentially a Map

, which is a key/value pair for persistence and control provided by the Spring Batch framework. It allows developers to save state that needs to be persisted during a Step or Job run. Job Execution context (BATCH_JOB_EXECUTION_CONTEXT) and Step Execution context (BATCH_STEP_EXECUTION_CONTEXT). Two types of context relationships: A Job runs in a Job Execution context (ExecutionContext in blue), Each Step Execution corresponds to a Step Execution context (ExecutionContext in pink); Step Execution in the same Job shares the context of Job Execution. That is, their scope of action is different. Therefore, if data needs to be shared between different steps of a Job, data can be shared in the context of Job Execution. Depending on the shared data nature of ExecutionContext, it is possible to pass data between steps.
,object>

4.2 ExecutionContext passes data

After a Job is started, a JobExecution is generated, which is used to store and record Job execution information. Similarly, after Step is started, StepExecution is also created. As mentioned earlier, both JobExecution and StepExecution have an ExecutionContext that stores the context. Therefore, the idea of data passing is to determine the scope of the data, pass in the data through ExecutionContext, and then share the data within that scope. ItemReader = ItemWriter = ItemWriter = ItemWriter = ItemWriter = ItemWriter In fact, Spring Batch automatically counts reads and writes, and this example is intended only to show data sharing capabilities.

So how do you capture Execution? Spring Batch provides JobExecutionListener and StepExecutionListener listener interfaces. By implementing the listener interface, You can capture Execution before starting a job (beforeJob) and afterJob (afterJob), beforeStep and afterStep respectively.

4.2.1 Implement listener interface

In the custom UserItemReader and UserItemWriter, implement the StepExecutionListener interface, which uses StepExecution as a member, retrieved from beforeStep. As follows:

public class UserItemWriter implements ItemWriter<TargetUser>, StepExecutionListener {
    private StepExecution stepExecution;
    / /... slightly
    @Override
    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution; }}Copy the code

The reader component (UserItemReader) works the same way. And at the end of the operation, access parameters, can inherit JobExecutionListenerSupport, the realizing methods of interest, also obtain JobExecution from the parameter, and then obtain parameters for processing.

public class ParamJobEndListener extends JobExecutionListenerSupport {
    @Override
    public void afterJob(JobExecution jobExecution) {}}Copy the code

4.2.2 Setting the data to be passed

JobExecution: Job executionContext: Job executionContext: Job executionContext: JobExecution: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext: Job executionContext The following configuration is done in a read component

ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
            executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size());
Copy the code

Also in the write component, once you get the ExecutionContext, you can process the parameters. In this example, the result is obtained by summating the number of processing parameters passed by ItemReader.

@Override
public void write(List<? extends TargetUser> items) {
    ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
    Object currentWriteNum = executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM);
    if (Objects.nonNull(currentWriteNum)) {
        log.info("currentWriteNum:{}", currentWriteNum);
        executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size()+(Integer)currentWriteNum);
    } else {
        executionContext.put(SyncConstants.PASS_PARAM_WRITE_NUM, items.size());
    }
Copy the code

Finally at the end of the work, in the realization of interface of JobExecutionListenerSupport, afterJob function, to the output parameter.

public class ParamJobEndListener extends JobExecutionListenerSupport {
    @Override
    public void afterJob(JobExecution jobExecution) {
        ExecutionContext executionContext = jobExecution.getExecutionContext();
        Integer writeNum = (Integer)executionContext.get(SyncConstants.PASS_PARAM_WRITE_NUM);
        log.info(LogConstants.LOG_TAG + "writeNum:{}",writeNum); }}Copy the code

5 StepScope Dynamic binding parameter transfer

5.1 StepScope and later binding

ExecutionContext is used to share data within the Job and Step scope, but if the parameters need to be set before the Job starts, and the parameters entered for each start change dynamically (for example, in incremental synchronization, the date is based on the time or ID of the last synchronization), that is to say, Each time you run it, you need to create an operation step (ItemReader, ItemWriter, etc.) based on the parameters. As we know, since Spring IOC loads beans in singleton mode by default, you need to run the new operation every time, after the destruction, the new operation is run time. This requires the use of StepScope and late binding technology.

In the previous example, we saw StepScope, which provides scope for the steps. A Spring Bean using the annotation StepScope means that the Bean is initialized at the beginning of the Step and destroyed at the end of the Step. That is, the scope of the Bean is within the Step lifecycle. Spring Batch, on the other hand, uses the attribute late binding technology to obtain attribute values at runtime and use the expression of SPEL for attribute binding. In StepScope, the Spring Batch framework provides JobParameters, JobExecutionContext, StepExecutionContext, and of course beans from the Spring container, JobExecution, StepExecution

5.2 Job parameter transfer and dynamic acquisition StepExecution

A Job is uniquely identified by Job name and JobParameters. That is, Spring Batch starts a new Job only when the Job name and JobParameters are inconsistent. If they are the same, the Job is the same. If the Job has not been executed, the Job is executed. If the Job has been executed and the state is FAILED, try to run the Job again. If the Job has been executed and the state is COMPLETED, an error message is displayed.

In this example, the Job starts with a time parameter, uses the StepScope annotation in ItemReader, and binds the time parameter to ItemReader, along with StepExecution. In order to operate on time parameters and StepExecution in ItemReader.

5.2.1 Setting Time Parameters

To start a Job using JobLauncher, you need to enter jobParameters as the parameter. You can therefore create this object and set the parameters.

JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",timMillis)
                .toJobParameters();
Copy the code

5.2.2 Dynamically Binding Parameters

When configuring Step, you need to create an ItemReader Bean. In order to use dynamic parameters, set Map to store parameters in ItemReader, and set StepExecution as a member for later use of ExecutionContext.

public class UserItemReader implements ItemReader<User> {
    protected Map<String, Object> params;
    private StepExecution stepExecution;

    public void setStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution; }}Copy the code

Use StepScope to configure:

@Bean
@StepScope
public ItemReader paramItemReader(@Value("#{stepExecution}") StepExecution stepExecution,
                                  @Value("#{jobParameters['time']}") Long timeParam) {
    UserItemReader userItemReader = new UserItemReader();
    // Set parameters
    Map<String, Object> params = CollUtil.newHashMap();
    Date datetime = new Date(timeParam);
    params.put(SyncConstants.PASS_PARAM_DATETIME, datetime);
    userItemReader.setParams(params);
    userItemReader.setStepExecution(stepExecution);

    return userItemReader;
}
Copy the code

Note: ItemReader can no longer be used to assign StepExecutionListener to the execution of stepExecution in the same way that ItemReader is implemented. Since ItemReader is dynamically bound, StepExecutionListener will no longer be used. Therefore, you need to bind the stepExecution Bean in a late binding to assign the value.

5.2.3 Setting and transferring parameters

ItemReader gets StepExecution and then gets ExecutionContext, and then passes the data using ExecutionContext as described above. As follows:

ExecutionContext executionContext = stepExecution.getJobExecution().getExecutionContext();
/ / readNum parameters
executionContext.put(SyncConstants.PASS_PARAM_READ_NUM, items.size());
/ / datetime parameter
executionContext.put(SyncConstants.PASS_PARAM_DATETIME,params.get(SyncConstants.PASS_PARAM_DATETIME));
Copy the code

6. Summary

ExecutionContext can be used to share data in different data ranges between Job and Step. In this paper, we use Mybatis -Plus, based on ExecutionContext, combined with StepScope and later binding technology, to pass in parameters in Job startup. Then share and transfer data after completion of ItemReader, ItemProcessor, ItemWriter and Job. If you need to share and deliver data using Spring Batch, try this approach.

The articles

  • Still manually generating database documents? Take a look at the three steps of automatic completion
  • Python processes Excel files
  • Distributed deployment of MinIO
  • Use MinIO to easily build static resource services
  • SpringBoot Multiple data sources (3) : Parameterize change sources
  • SpringBoot Multiple Data Sources (2) : dynamic data sources
  • SpringBoot Multiple Data Sources (1) : Multiple source policies
  • Java development must learn: dynamic proxy
  • Good books to read in 2019

If the content of this article is helpful to you, welcome to share ~

My public account (search Mason technical records) for more technical records