The more concise, the clearer

Brief description

This article focuses on two things about Datax:

  1. Scheduling process
  2. Data transfer process

Scheduling refers to the order and priority in which the Datax executes tasks based on data (task execution); Data transfer refers to how readers and writers interact with each other and how Datax features such as rate control and parallel operation are implemented.

The source entry

Scheduling process

The code entry to the scheduling process is in the Schedule () method of JobContainer.java.

We first get the global number of channels, the number of channels for each TaskGroup, and calculate the required number of channels.

    int channelsPerTaskGroup = this.configuration.getInt(
            CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
    int taskNumber = this.configuration.getList(
            CoreConstant.DATAX_JOB_CONTENT).size();

    this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
Copy the code

JobAssignUtil is then called to allocate appropriate channels to each TaskkGroup.

    List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
            this.needChannelNumber, channelsPerTaskGroup);      // Fair distribution
Copy the code

AbstractScheduler is then instantiated for scheduling.

    private void schedule(a) { 
        / /...
        scheduler.schedule(taskGroupConfigs);
        / /...
    }
Copy the code

There are two main things that the scheduling method does:

  1. Register monitoring information (covered in a separate article below)
  2. Start all tasks

The code to start the task is

    int totalTasks = calculateTaskCount(configurations);  // Count the total number of tasks
    startAllTaskGroup(configurations);      // Start all tasks
Copy the code

ProcessInnerScheduler#startAllTaskGroup

    public void startAllTaskGroup(List<Configuration> configurations) {
        this.taskGroupContainerExecutorService = Executors
                .newFixedThreadPool(configurations.size());

        for (Configuration taskGroupConfiguration : configurations) {
            TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);        TaskGroupContainerRunner -> TaskGroupContaine
            this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
        }

        this.taskGroupContainerExecutorService.shutdown();
    }
Copy the code

Datax uses TaskGroupContainerRunner to encapsulate the Configuration as a TaskGroupContainer, The state attribute of TaskGroupContainerRunner represents the state of its TaskGroupContainer. It is then thrown to the thread pool to execute. Next is the code that starts the task.

    try {
        Thread.currentThread().setName(
                String.format("taskGroup-%d".this.taskGroupContainer.getTaskGroupId()));
        this.taskGroupContainer.start();
                this.state = State.SUCCEEDED;
    } catch (Throwable e) {
        this.state = State.FAILED;
        throw DataXException.asDataXException(
                        FrameworkErrorCode.RUNTIME_ERROR, e);
    }
Copy the code

At this point, the scheduled process ends and the data transfer process follows.

Data transfer process

Initialize the

The data transfer process is stored in TaskGroupContainer. Therefore, the configuration of control indicators and retry policies is handled in detail here. First, Datax uses a producer-consumer model to decouple, improve parallelism, and control the rate at which data is processed. In TaskGroupContainer, readers and writers are actually linked through channels. A channel can be in-memory or persistent, and the plug-in does not care. The plug-in writes data to the channel through the RecordSender and reads data from the channel through the RecordReceiver.

The code is too long, so I’ll take a screenshot. TaskGroupContainer#start code entry, representing the TaskGroup start. The main contents of this method are:

  1. Gets parameters to configure the monitor
  2. Encapsulate each parameter into an executable task, perform the task and register the monitor
  3. You can monitor the task status and retry the task

We focus here on the execution of the mission. TaskExecutor is an inner class of TaskGoupContainer.

  class TaskExecutor {
        private Configuration taskConfig;

        private int taskId;

        private int attemptCount;

        private Channel channel;

        private Thread readerThread;

        private Thread writerThread;
        
        private ReaderRunner readerRunner;
        
        private WriterRunner writerRunner;

        private Communication taskCommunication;
        
        // method area
   }
Copy the code

TaskGoupContainer encapsulates the shard configuration into a TaskExecutor

    TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
        taskStartTimeMap.put(taskId, System.currentTimeMillis());
            taskExecutor.doStart();
Copy the code

The code to start is the following

    public void doStart(a) {
        this.writerThread.start(); / / 1.
        / / 2.
        if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
            //throw exception
        }
        this.readerThread.start();  / / 3.

        / / 4.
        if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
             //throw exception}}Copy the code

Code ① and code ③ start two threads, the reader thread and the writer thread respectively. Both threads are generated when TaskExecutor is instantiated. First let’s look at the amount of Runnable the writer thread needs to execute. This is mainly the generateRunner method.

    / / 1.
    newRunner = LoadUtil.loadPluginRunner(pluginType,
            this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME)); 
    / / 2.
    newRunner.setJobConf(this.taskConfig
            .getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
    / / 3.
    pluginCollector = ClassUtil.instantiate(
            taskCollectorClass, AbstractTaskPluginCollector.class,
            configuration, this.taskCommunication,
            PluginType.WRITER);
    ((WriterRunner) newRunner).setRecordReceiver(new BufferedRecordExchanger(
            this.channel, pluginCollector));
    / / 4.
    newRunner.setTaskPluginCollector(pluginCollector);
Copy the code

Code ① loads the WriterRunner task; Code ② passes in the write task configuration as an argument. Codes ③ and ④ actually load the taskPlugin for handling dirty data and job/ Task traffic data

And then we look at the read task, which is also in the generateRunner method

    / / 1.
    newRunner = LoadUtil.loadPluginRunner(pluginType,
                            this.taskConfig.getString(CoreConstant.JOB_READER_NAME));
    / / 2.
    newRunner.setJobConf(this.taskConfig.getConfiguration(
            CoreConstant.JOB_READER_PARAMETER));
   / / 3.
    pluginCollector = ClassUtil.instantiate(
            taskCollectorClass, AbstractTaskPluginCollector.class,
            configuration, this.taskCommunication,
            PluginType.READER);
    / / 4.
    RecordSender recordSender;
    if(transformerInfoExecs ! =null && transformerInfoExecs.size() > 0) {
        recordSender = new BufferedRecordTransformerExchanger(taskGroupId, this.taskId, this.channel,this.taskCommunication ,pluginCollector, transformerInfoExecs);
    } else {
        recordSender = new BufferedRecordExchanger(this.channel, pluginCollector);
    }
    
    ((ReaderRunner) newRunner).setRecordSender(recordSender);
    / / 5.
    newRunner.setTaskPluginCollector(pluginCollector);
Copy the code

Code ① loads the ReaderRunner task. Code ② writes the read configuration as an argument. Codes ③ and ⑤ actually load the taskPlugin for handling dirty data and job/ Task traffic data. It is worth noting in code 4 that it determines which Exchange to use. Exchange can be understood as an external middleware interacting with a channel. BufferedRecordTransformerExchanger refers to may, according to a specific format. BufferedRecordExchanger transfers only according to the Datax format.

Task read/write starts (MySQl as an example)

When the read/write task is initialized, the next step is the start method. First, read tasks. The read task is defined in ReaderRunner, and for more flexibility, the plugin mechanism is also used in ReaderRunner. If we need to extend, just extend reader.task.

    Reader.Task taskReader = (Reader.Task) this.getPlugin();
Copy the code

Then focus on the startRead method of Reader.Task that calls the startRead method passed into Exchange

    taskReader.startRead(recordSender);
   recordSender.terminate();
Copy the code

Calling the startRead method requires the Exchange to be passed in, followed by the concrete data source implementation. Implementation class CommonRdbmsReader

The first is to get QUERY_SQL, USERNAME, password and other parameters, to connect

    String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
    String table = readerSliceConfig.getString(Key.TABLE);
    Connection conn = DBUtil.getConnection(this.dataBaseType, jdbcUrl, username, password);
Copy the code

Then execute QUERY_SQL to retrieve the data

    rs = DBUtil.query(conn, querySql, fetchSize);
Copy the code

Then loop through the query results and start writing

    while (rs.next()) {
        this.transportOneRecord(recordSender, rs,
         metaData, columnNumber, mandatoryEncoding, taskPluginCollector);
    }
Copy the code

The transportOneRecord actually uses sanodomain for interaction

    protected Record transportOneRecord(RecordSender recordSender, ResultSet rs, 
        ResultSetMetaData metaData, int columnNumber, String mandatoryEncoding, 
        TaskPluginCollector taskPluginCollector) {
        Record record = buildRecord(recordSender,rs,metaData,columnNumber,mandatoryEncoding,taskPluginCollector); 
        recordSender.sendToWriter(record);
        return record;
    }
Copy the code

Next comes writing tasks. Write task implementation in commonRdbmsWriterTask#startWrite. The write task first gets the connection, then handles session issues for the connection, and finally

    Connection connection = DBUtil.getConnection(this.dataBaseType,
                    this.jdbcUrl, username, password);
    DBUtil.dealWithSessionConfig(connection, writerSliceConfig,
                    this.dataBaseType, BASIC_MESSAGE);
Copy the code

And then you start writing data. The write data is obtained from the SANo11003. Obtain data loops from the RECOVERY machine and check whether the number of columns is correct

    // Use buffer cache, batchSize is to control the number of each send
    List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);
    
    while((record = recordReceiver.getFromReader()) ! =null) {
        if(record.getColumnNumber() ! =this.columnNumber) {
            // throw error... }}Copy the code

If it meets the criteria, it can be added to the cache. And then it goes into the database

    doBatchInsert(connection, writeBuffer);
    writeBuffer.clear();
    bufferBytes = 0;
Copy the code

Exchanger interaction

Exchanger implements RecordSender and RecordReceiver, at the same time, it has two implementation class BufferedRecordExchanger and BufferedRecordTransformerExchanger.

RecordSender.java

public interface RecordSender {
    public Record createRecord(a);
    public void sendToWriter(Record record);
    public void flush(a);
    public void terminate(a);
    public void shutdown(a);
}
Copy the code

RecordReceiver.java

public interface RecordReceiver {
    public Record getFromReader(a);
    public void shutdown(a);
}
Copy the code

So let’s take buffered record Changer as an example, and let’s pick a couple of them.

The method name role
sendToWriter Write data to a channel
flush Flush the buffer, that is, write the buffer data to a channel
terminate Flush buffer data into channel immediately
getFromReader Get data from a channel

The sendToWriter method is responsible for putting records into a channel. The first check is made to see if the record size exceeds the limit

    if (record.getMemorySize() > this.byteCapacity) {
        this.pluginCollector.collectDirtyRecord(record, new Exception(String.format("Single record exceeds size limit, current limit is :%s".this.byteCapacity)));
        return;
    }
Copy the code

Then determine if the channel is full. If it’s full, refresh and send

    boolean isFull = (this.bufferIndex >= this.bufferSize || this.memoryBytes.get() + record.getMemorySize() > this.byteCapacity);
    if (isFull) {
        flush();
    }
Copy the code

Otherwise join the queue

    this.buffer.add(record);
    this.bufferIndex++;
    memoryBytes.addAndGet(record.getMemorySize());
Copy the code

The flush method is also simple to add directly to a channel

    this.channel.pushAll(this.buffer);
    this.buffer.clear();
Copy the code

The terminate method is used to flush buffer data directly into a channel

    flush();
    this.channel.pushTerminate(TerminateRecord.get());
Copy the code

The getFromReader method is used to fetch data from writer and proceed to the next step. The first step is to check whether the queue is empty. If null, the method is then called to write the data in batches to writer’s buffer

    boolean isEmpty = (this.bufferIndex >= this.buffer.size());
    if (isEmpty) {
       receive();
    }
Copy the code

If not empty, read by index and return one of the buffers.

    Record record = this.buffer.get(this.bufferIndex++);
    if (record instanceof TerminateRecord) {
            record = null;
    }
    return record;
Copy the code

At the end of the article

In general, Datax’s read and write tasks follow this process, but it is flexible in that it provides many implementation cases for data sources. In addition, readers and writers are also provided as plug-ins for more expansion and service adaptation

end