Tags: springbatch


1. The introduction

In the last article “Convenient data Read and write – Spring Batch (5) With beetlSql for data read and write”, Spring Batch and beetlSql are used to synchronize database read and write components of the database, which is actually full synchronization. The problem of full synchronization is that the entire table data needs to be read at a time. If the table data is large, the resource consumption is high and it is difficult to update the existing data. Therefore, in the process of data synchronization, the use of more incremental synchronization, namely by certain conditions, distinguish to insert new data, to change the update data, to delete, and so on there is no data (of course, are generally not to physically deleted data, only do logic to delete, therefore becomes a data update operation).

Incremental updating more often need according to the last updated status data (such as time, since the ID, location, etc.), the next update based on the state of the last update, therefore, need to put each time the updated state preserved in the form of variable parameters, the next update the status data for dynamic parameters to use. Spring Batch supports dynamic parameters at task runtime, which in combination enables incremental synchronization of data.

2. Development environment

  • The JDK: jdk1.8
  • Spring the Boot: 2.1.4. RELEASE
  • Spring Batch: 4.1.2. RELEASE
  • Development IDE: the IDEA
  • Build tool Maven: 3.3.9
  • Log component Logback :1.2.3
  • Lombok: 1.18.6

3. Overview of incremental synchronization

Incremental synchronization is relative to full synchronization, that is, each synchronization only needs to synchronize the changed part of the source database, which improves the data synchronization efficiency. Is the current common way of data synchronization. Extract changed Data, also known as CDC. Change Data Capture CDC is explained in some detail in Pentaho Kettle Solution: Building Open Source ETL Solutions using PDI. There are four methods to implement incremental synchronization: source data based CDC, trigger based CDC, snapshot-based CDC, and log based CDC.

3.1 Based on source dataCDC

CDC based on source data requires the source data to have relevant attribute columns. Using these attribute columns, you can determine where incremental data is. The most common attribute columns are:

  • The timestamp identifies the data based on time, and requires at least one time, preferably two, one for creation and one for update. Therefore, when designing the database, we will add sys_CREATE_time and sys_update_time as the default fields, and design the default current time and update processing.

  • Increment sequences use increment sequence fields (typically primary keys) of database tables to identify newly inserted data. But the reality is used less.

This approach requires a temporary table to hold the last update or, in practice, to create the table in a separate schema to hold the data. The next update compares the previous time or sequence. This is a common approach, and incremental synchronization is used in this article.

3.2 Flip-flop basedCDC

Write triggers in the database. When the current database executes INSERT, UPDATE, DELETE, and so on, triggers in the database can be activated. The trigger can then save the changed data to an intermediate temporary table, which can then fetch the data from the temporary table and synchronize it to the target database. Of course, this approach is the most invasive, and most databases do not allow triggers to be added to the database (affecting performance).

3.3 Snapshot-basedCDC

This method is to extract all the current data into the buffer at one time, use it as a snapshot, read the data from the source data at the next synchronization, and compare the data with the snapshot to find the changed data. Simply put is to do the full table read and compare, find out the change of data. When you do a full table scan, the problem is performance, so you don’t usually use this method.

3.4 Log-basedCDC

The most advanced and least invasive approach is the log-based approach, where databases log inserts, updates, and deletions. For example, Mysql has a binlog. Incremental synchronization reads log files, converts the binary files to an understandable format, and then rewrites the operations in order. However, this method is only effective for the same database, and cannot be implemented for heterogeneous databases. And it is difficult to achieve.

3.5 This example describes the incremental synchronization method

In this example, the incremental synchronization is still based on the test_USER table, which has fields SYS_CREATE_time and sys_update_time to identify when data was created and updated (currently, if there is only one time in reality, it can be based on that time only). It just makes it harder to identify whether the data is updated or inserted. The incremental synchronization process is as follows:

Description:

  • For each synchronization, the temporary table is first read to obtain the data time since the last synchronization.
  • If the data is synchronized for the first time, all data is synchronized. If the data is not synchronized, the time is used as the parameter of the query statement.
  • After reading the data based on time, insert the data into the target table
  • Update the data time of the temporary table for the next synchronization.

4.Spring Batch dynamic parameter binding

According to the incremental synchronization process above, the key point is to save the data time to a temporary table that can be compared when the data is read. In Spring Batch, dynamic parameter binding is supported. You only need to use @stepScope annotation. With BeetlSql, incremental synchronization can be realized soon. This example is further developed based on the example in the previous article, and you can download the source code to see the complete example.

4.1 Keep the original database configuration and multiple data sources

  • Source database:mytest
  • Target database:my_test1
  • Spring Batch Database:my_spring_batch
  • Synchronized data tables:test_user

4.2 Creating temporary Tables

Using SQL/initCDCtemptable.sql in the example, in the my_spring_Batch library, create the temporary table CDC_temp and insert the record of record 1, identified as synchronizing the TEST_USER table. Here, we only need to focus on last_update_time and current_update_time. The former indicates the last time after the last synchronization, and the latter indicates the system time after the last synchronization.

4.3 Adding or Modifying a DAO

4.3.1 Adding dao and Service classes for temporary tables

  • Add the classCdcTempRepository

Depending on the configuration, since cdC_temp is in my_spring_batch and its reads and writes are in the dao.local package, we need to add the Dao.local package and then add the class CdcTempRepository as follows:

@Repository
public interface CdcTempRepository extends BaseMapper<CdcTemp> {
}
Copy the code
  • Add the classCdcTempServiceforcdc_tempTable reading and data update mainly consists of two functions, one is to get the current by IDcdc_tempRecord to obtain the last time data was last synchronized. One is to update after synchronization is completecdc_tempThe data. As follows:
/** * Get cdC_temp record by id * @param id Record ID * @return {@link CdcTemp}
 */
public CdcTemp getCurrentCdcTemp(int id){
    returncdcTempRepository.getSQLManager().single(CdcTemp.class, id); } @param cdcTempId cdcTempId @param status job status @param lastUpdateTime lastUpdateTime */ public Void updateCdcTempAfterJob(int cdcTempId,BatchStatus Status,Date lastUpdateTime){// Obtain CdcTemp CdcTemp = cdcTempRepository.getSQLManager().single(CdcTemp.class, cdcTempId); cdcTemp.setCurrentUpdateTime(DateUtil.date()); // The data time is updated upon normal completionif( status == BatchStatus.COMPLETED){
        cdcTemp.setLastUpdateTime(lastUpdateTime);
    }else{
        log.info(LogConstants.LOG_TAG+"Synchronization status abnormal:"+ status.toString()); } // Set the synchronization status cdctemp.setStatus (status.name()); cdcTempRepository.updateById(cdcTemp); }Copy the code

4.3.2 Modifying the Source Data DAO

Add the getOriginIncreUser function to OriginUserRepository in the source data DAO class, which corresponds to the SQL statement in user.md.

4.3.3 Modifying the Target Data DAO

Add selectMaxUpdateTime to the target data DAO class TargetUserRepository to query the last time the data was synchronized. Because of the SQL simplicity of this method, you can use the @SQL annotation directly, as follows:

@Sql(value="select max(sys_update_time) from test_user")
Date selectMaxUpdateTime();
Copy the code

4.4 to modifyuser.mdIn thesqlStatements.

4.4.1 Adding SQL for Incremental Read Data

Add SQL statement to user.md to add incremental read data as follows:

GetOriginIncreUser === * select * from test_user WHERE 1=1 @if(! isEmpty(lastUpdateTime)){ AND (sys_create_time >=#lastUpdateTime# OR sys_update_time >= #lastUpdateTime#)
@}
Copy the code

Description:

  • @The beginning isbeetlThe syntax of the variable can be read and logical judgment, here means if the variablelastUpdateTimeIf not empty, read according to this condition.
  • lastUpdateTimeVariables are passed in when called (Map)
  • specificbeetlTo use the syntax, seeThe official documentation

4.4.2 Writing incremental insert SQL statements

Insert into… on duplicate key update … If the data already exists, update it. If the data does not exist, insert it. In the user.md file, add the following statement:

InsertIncreUser === * Insert into data test_user(id,name,phone,title,email,gender,date_of_birth,sys_create_time,sys_create_user,sys_update_time,sys_update_user ) values (#id#,#name#,#phone#,#title#,#email#,#gender#,#dateOfBirth#
    ,#sysCreateTime#,#sysCreateUser#,#sysUpdateTime#,#sysUpdateUser#)
ON DUPLICATE KEY UPDATE 
id = VALUES(id),
name = VALUES(name),
phone = VALUES(phone),
title = VALUES(title),
email = VALUES(email),
gender = VALUES(gender),
date_of_birth = VALUES(date_of_birth),
sys_create_time = VALUES(sys_create_time),
sys_create_user = VALUES(sys_create_user),
sys_update_time = VALUES(sys_update_time),
sys_update_user = VALUES(sys_update_user)
Copy the code

4.5 Writing Spring Batch Components

The Spring Batch file structure is as follows:

4.5.1 ItemReader

As before, just change the getOriginUser function to getOriginIncreUser.

4.5.2 ItemWriter

As before, change the SQL ID from user.insertuser to user.insertincreuser.

4.5.3 addIncrementJobEndListener

After data synchronization, the last step is to update the last time data of the temporary table. As follows:

@Slf4j public class IncrementJobEndListener extends JobExecutionListenerSupport { @Autowired private CdcTempService cdcTempService; @Autowired private TargetUserRepository targetUserRepository; @Override public void afterJob(JobExecution jobExecution) { BatchStatus status = jobExecution.getStatus(); Date latestDate = targetUserRepository.selectMaxUpdateTime(); cdcTempService.updateCdcTempAfterJob(SyncConstants.CDC_TEMP_ID_USER,status,latestDate); }}Copy the code

Description:

  • Query data in the current databaseselectMaxUpdateTime)
  • Update intermediate table datacdc_tempIn thelast_update_time

4.5.4 Adding Parameter initialization during task startup

In the first step of data synchronization, you need to initialize the last update time of data in the temporary table. Therefore, before starting a task, you need to set task parameters so that the time parameters can be transferred to the task for use during task execution. As follows:

public JobParameters initJobParam(){ CdcTemp currentCdcTemp = cdcTempService.getCurrentCdcTemp(getCdcTempId()); // If not initialized, query the last time in the databaseif(SyncConstants.STR_STATUS_INIT.equals(currentCdcTemp.getStatus()) || SyncConstants.STR_STATUS_FAILED.equals(currentCdcTemp.getStatus())){ Date maxUpdateTime = selectMaxUpdateTime(); // If there is no data, the initial time is processedif(Objects.nonNull(maxUpdateTime)){ currentCdcTemp.setLastUpdateTime(maxUpdateTime); }}return JobUtil.makeJobParameters(currentCdcTemp);
}
Copy the code

4.5.5 Complete task assembly

Finally, a IncrementBatchConfig configuration is required to assemble the read, process, write, and listen components together. It is worth adding @stepScope annotation to configure the read component because of the need to use dynamic parameters, and using spEL in the parameters to get the parameter contents, as shown below:

@Bean
@StepScope
public ItemReader incrementItemReader(@Value("#{jobParameters['lastUpdateTime']}") String lastUpdateTime) { IncrementUserItemReader userItemReader = new IncrementUserItemReader(); Map<String,Object> params = collutil.newhashmap (); params.put(SyncConstants.STR_LAST_UPDATE_TIME,lastUpdateTime); userItemReader.setParams(params);return userItemReader;
}
Copy the code

4.5.6 test

Refer to BeetlsqlJobTest in the previous article and write the IncrementJobTest test file. Because incremental synchronization needs to be tested, the test process is as follows:

  • Before the test, the source table and the target table already have data in the source table, in the execution codesql/user-data-new.sqlAdd a new user. Note that due tosys_create_timeandsys_update_timeThe definition is as follows:
`sys_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,
`sys_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
Copy the code

So as to achieve automatic generation time when data insertion, automatic update time when modification.

  • Run tests to run incrementJob with unit tests.

  • After the command is executed, the following information is displayed:

After incremental synchronization, the data is as follows:

5. To summarize

This paper first gives a simple introduction to incremental synchronization, lists the current methods of incremental synchronization, and then uses Spring Batch and BeetlSql to implement incremental synchronization based on time stamps. This example is practical, and I hope it can be helpful to developers who do data synchronization or related Batch processing.