1 Program entry class Engine

The entry class for task execution is Engine

public static void main(String[] args) throws Exception {
        int exitCode = 0;
        try {
            // Call the internal entry method
            Engine.entry(args);
        } catch (Throwable e) {
         / / /... Omit other code
        }
        System.exit(exitCode);
    }
Copy the code

The entry method inside Engine provides the following functions:

  1. Parse the command line parameters -mode, -jobid, and -job to obtain the execution mode, jobid, and job configuration file paths respectively.
  2. Parse the Configuration of the json file edited by the user, package the parse JSON file in a Configuration class (how to parse later added), create a new Engine class and call the start method to start.
    public static void entry(final String[] args) throws Throwable {
        // Define the command line arguments -mode, -jobid, -job
        Options options = new Options();
        options.addOption("job".true."Job config.");
        options.addOption("jobid".true."Job unique id.");
        options.addOption("mode".true."Job runtime mode.");

        BasicParser parser = new BasicParser();
        CommandLine cl = parser.parse(options, args);// Parse command line arguments

        String jobPath = cl.getOptionValue("job");

        // If the user does not specify jobid explicitly, datax.py specifies jobid with a default value of -1
        String jobIdString = cl.getOptionValue("jobid");
        RUNTIME_MODE = cl.getOptionValue("mode");
        // Specify the Job Configuration path. ConfigParser parses all Job, Plugin, and Core information and returns it in Configuration
        Configuration configuration = ConfigParser.parse(jobPath);// Configuration contains all information about the configuration file throughout the datax program
        
        / / /... Omit other code
        
        Engine engine = new Engine();
        engine.start(configuration);// Start the engine
    }
Copy the code

How to parse the configParser. parse method temporarily omitted, can be used directly

Engine’s start method does the following:

  1. The first is to continue binding some information to the configuration class, such as the ColumnCast conversion information.
  2. Initialize the PluginLoader to retrieve various plug-in configurations (in preparation for subsequent hot loading).
  3. Create JobContainer and start it. JobContainer will be the running container for a data synchronization job.
    /* check job model (job/task) first */
    public void start(Configuration allConf) {

        // Bind column to transform information
        ColumnCast.bind(allConf);

        /** * Initialize PluginLoader to get various plug-in configurations */
        LoadUtil.bind(allConf);

        booleanisJob = ! ("taskGroup".equalsIgnoreCase(allConf
                .getString(CoreConstant.DATAX_CORE_CONTAINER_MODEL)));
        //JobContainer will set and adjust the value after schedule
        int channelNumber =0;
        AbstractContainer container;
        long instanceId;
        int taskGroupId = -1;
        // Basically job mode
        if (isJob) {
            allConf.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, RUNTIME_MODE);
            //JobContainer initializes, passing in global configuration parameters
            container = new JobContainer(allConf);
            instanceId = allConf.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, 0);

        } else {
            / / /... The code omitted here is almost useless
        }

        PerfTrace is enabled by default
        boolean traceEnable = allConf.getBool(CoreConstant.DATAX_CORE_CONTAINER_TRACE_ENABLE, true);
        boolean perfReportEnable = allConf.getBool(CoreConstant.DATAX_CORE_REPORT_DATAX_PERFLOG, true);

        // Datax Shell tasks in Standlone mode are not reported
        if(instanceId == -1){
            perfReportEnable = false;
        }
        // The code is not understood here
        int priority = 0;
        try {
            priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
        }catch (NumberFormatException e){
            LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
        }
        // Extract job-related configurations from the total configuration file
        Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
        // Initialize PerfTrace
        PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
        perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
        // Start JobContainer. The startup class is introduced and JobContainner is entered
        container.start()

    }
Copy the code

2 JobContainer

The Job instance runs in jobContainer. It is the master of all tasks and is responsible for initialization, splitting, scheduling, running, recycling, monitoring, and reporting. However, it does not perform actual data synchronization.

The start method

  1. JobContainer is responsible for all tasks in start(), including init, prepare, split, and scheduler
  2. The init method is responsible for initializing and loading readers and writers
  3. Prepare Method Do some preparatory work
  4. The split method splits jobs into multiple tasks based on the configured concurrency parameters
  5. Scheduler is the real scheduling task scheduling and running.
    @Override
    public void start(a) {
        LOG.info("DataX jobContainer starts job.");
        boolean hasException = false;
        boolean isDryRun = false;
        try {
            this.startTimeStamp = System.currentTimeMillis();
            isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
            if(isDryRun) {
                / / /... Omitted, hardly needed
            } else {
                // Clone a copy of the configuration, because changes need to be made
                userConf = configuration.clone();
                // preprocessing
                this.preHandle();
                // Initialize the read and write plug-ins
                this.init();
                // Do the preloading of plug-ins, some plug-ins are not required, such as mysqlReader
                this.prepare();
                // Split tasks to prepare for concurrency
                this.totalStage = this.split();
                // Start the task
                this.schedule();
                // Task post-processing
                this.post();
                // Task post-processing
                this.postHandle();
                // Trigger hook? Don't understand
                this.invokeHooks(); }}catch (Throwable e) {
            / / /... Omit other code
        } finally {
            / / /... Omit other code}}Copy the code

Init the init method is used to initialize read and Writer plug-ins, including loading the specified plug-in through the class loader and assigning the contents of the configuration file to internal variables of the Read and Write plug-ins for subsequent calls. This procedure makes a judgment about the tables, columns, and so on in the configuration file. After initialization, the read and write variables in the container are the concrete plug-ins.

private void init(a) {
        // Get the job from the configuration
        this.jobId = this.configuration.getLong(
                CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, -1);

        if (this.jobId < 0) {
            LOG.info("Set jobId = 0");
            this.jobId = 0;
            // Add jobId information to the configuration information
            this.configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID,
                    this.jobId);
        }

        Thread.currentThread().setName("job-" + this.jobId);
        // This is not the case
        JobPluginCollector jobPluginCollector = new DefaultJobPluginCollector(
                this.getContainerCommunicator());
        // Reader must come first, since Writer depends on Reader
        this.jobReader = this.initJobReader(jobPluginCollector);
        this.jobWriter = this.initJobWriter(jobPluginCollector);
    }
Copy the code

The initJobReader method mainly uses the URLClassLoader to load a class of plug-ins, which can be found in the specified directory for loading. Once loaded, the plug-in’s own internal init method is called for personalized initialization.

private Reader.Job initJobReader( JobPluginCollector jobPluginCollector) {
    // Get the read plug-in name
    this.readerPluginName = this.configuration.getString(
            CoreConstant.DATAX_JOB_CONTENT_READER_NAME);    //job.content[0].reader.name
    // The lib package that loads the plug-in is loaded into the JVM based on the read plug-in class name
    classLoaderSwapper.setCurrentThreadClassLoader(LoadUtil.getJarLoader(
            PluginType.READER, this.readerPluginName)); // Reset the plugin jar classLoader
    // Create a read object
    Reader.Job jobReader = (Reader.Job) LoadUtil.loadJobPlugin(
            PluginType.READER, this.readerPluginName);
    // Set jobConfig for reader
    jobReader.setPluginJobConf(this.configuration.getConfiguration(
            CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER));
    // Set readerConfig for reader
    jobReader.setPeerPluginJobConf(this.configuration.getConfiguration(
            CoreConstant.DATAX_JOB_CONTENT_WRITER_PARAMETER));
    jobReader.setJobPluginCollector(jobPluginCollector);
    jobReader.init();   // After loading a specific plug-in, perform the corresponding operation
    // reset the original classLoader
    classLoaderSwapper.restoreCurrentThreadClassLoader();
    return jobReader;
}
Copy the code

Take the initialization of the MysqlReader plug-in

@Override
        public void init(a) {
            this.originalConfig = super.getPluginJobConf();

            Integer userConfigedFetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE);
            if(userConfigedFetchSize ! =null) {
                LOG.warn("There is no need to configure fetchSize for mysqlReader, mysqlReader will ignore this configuration. If you don't want to see this warning again, remove the fetchSize configuration.);
            }

            this.originalConfig.set(Constant.FETCH_SIZE, Integer.MIN_VALUE);

            this.commonRdbmsReaderJob = new CommonRdbmsReader.Job(DATABASE_TYPE);
            this.commonRdbmsReaderJob.init(this.originalConfig);
        }
Copy the code

The prepare method is omitted here. It is not required by all plug-ins and is empty in many plug-ins.

JobContainer Task splitting method: split

After init and prepare, the most important step before task execution is task partitioning.

  1. Split refers to splitting readers and writers based on the needChannelNumber. Each Reader and Writer plug-in has its own split method.
  2. The jobReader that has been initialized in JobContainer splits the Configuration based on the Configuration and its own conditions. The Configuration file that has been assigned to JobContainer contains all information about the data to be synchronized.
  3. After splitting, a List of configurations is returned. Each Configuration represents a portion of the data that needs to be synchronized from the original total Configuration file. Add it to the total configuration file store to provide configuration support for subsequent calls.
  4. Note that readers must be shelled first, because Writer shards readers based on the number of readers that have been shelled.
    private int split(a) {
        this.adjustChannelNumber();
        // Set the number of pipes
        if (this.needChannelNumber <= 0) {
            this.needChannelNumber = 1;
        }
        // Shard read plugins, returns a list of read plugins configurations containing each shard, one for each subsequent service
        List<Configuration> readerTaskConfigs = this
                .doReaderSplit(this.needChannelNumber);
        // Read the number of plug-in shards
        int taskNumber = readerTaskConfigs.size();
        // Split write plugins according to the number of shards of read plugins, return a list of write configurations containing each shard
        List<Configuration> writerTaskConfigs = this
                .doWriterSplit(taskNumber);
        Job. Content [0]. Transformer configuration.
        List<Configuration> transformerList = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT_TRANSFORMER);

        LOG.debug("transformer configuration: "+ JSON.toJSONString(transformerList));
        // Merge read task configuration, write task configuration, transformer configuration
        List<Configuration> contentConfig = mergeReaderAndWriterTaskConfigs(
                readerTaskConfigs, writerTaskConfigs, transformerList);
        // Assign the configured list to the general configuration file this.configuration for subsequent calls.
        this.configuration.set(CoreConstant.DATAX_JOB_CONTENT, contentConfig);

        return contentConfig.size();
    }
Copy the code

The mysql Reader is used as an example

@Override
        public List<Configuration> split(int adviceNumber) {
            return this.commonRdbmsReaderJob.split(this.originalConfig, adviceNumber);
        }
Copy the code

MysqlReader actually calls the split method of commonRdbmsReaderJob, which is common to the framework itself. See the code comment below for details

    public static List<Configuration> doSplit(
            Configuration originalSliceConfig, int adviceNumber) {
        boolean isTableMode = originalSliceConfig.getBool(Constant.IS_TABLE_MODE).booleanValue();
        int eachTableShouldSplittedNumber = -1;
        if (isTableMode) {
            // adviceNumber indicates the number of channels, i.e., the number of concurrent datax tasks
            / / the number of copies eachTableShouldSplittedNumber is a single table should segmentation, rounded up and adviceNumber no proportion relationship already
            eachTableShouldSplittedNumber = calculateEachTableShouldSplittedNumber(
                    adviceNumber, originalSliceConfig.getInt(Constant.TABLE_NUMBER_MARK));
        }
        // Get the column information in the configuration file
        String column = originalSliceConfig.getString(Key.COLUMN);
        // Get the configuration of where in the configuration file
        String where = originalSliceConfig.getString(Key.WHERE, null);
        // Get all connections in the configuration file
        List<Object> conns = originalSliceConfig.getList(Constant.CONN_MARK, Object.class);

        List<Configuration> splittedConfigs = new ArrayList<Configuration>();
        // Iterate over all connections
        for (int i = 0, len = conns.size(); i < len; i++) {
            Configuration sliceConfig = originalSliceConfig.clone();
            // Get the configuration of the corresponding connection
            Configuration connConf = Configuration.from(conns.get(i).toString());
            String jdbcUrl = connConf.getString(Key.JDBC_URL);
            sliceConfig.set(Key.JDBC_URL, jdbcUrl);

            // Extract IP /port from jdbcUrl for resource usage marking to provide meaningful shuffle operations for core
            sliceConfig.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));

            sliceConfig.remove(Constant.CONN_MARK);

            Configuration tempSlice;

            // The table mode is configured
            if (isTableMode) {
                // It has been extended and processed previously, and can be used directly
                List<String> tables = connConf.getList(Key.TABLE, String.class);

                Validate.isTrue(null! = tables && ! tables.isEmpty(),"You read the database table configuration error.");

                String splitPk = originalSliceConfig.getString(Key.SPLIT_PK, null);

                / / the final segmentation number does not necessarily equals eachTableShouldSplittedNumber
                boolean needSplitTable = eachTableShouldSplittedNumber > 1
                        && StringUtils.isNotBlank(splitPk);
                if (needSplitTable) {
                    if (tables.size() == 1) {
                        Num =num*2+1
                        // splitPk is null; // splitPk is null
                        //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1; // Should not add 1 to cause a long tail
                        
                        // Consider other ratio figures? (splitPk is null, ignore the long tail)
                        //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;

                        // To avoid importing small Hive files, the default cardinality is 5. Use splitFactor to set the cardinality
                        // The final task count is (channel/tableNum) rounded up *splitFactor
                        Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR);
                        eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor;
                    }
                    / / try on each table, segmentation for eachTableShouldSplittedNumber
                    for (String table : tables) {
                        tempSlice = sliceConfig.clone();
                        tempSlice.set(Key.TABLE, table);
                        If splitPK is not configured, you do not need to split a single tableList<Configuration> splittedSlices = SingleTableSplitUtil .splitSingleTable(tempSlice, eachTableShouldSplittedNumber); splittedConfigs.addAll(splittedSlices); }}else {//
                    for(String table : tables) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.TABLE, table); String queryColumn = HintUtil.buildQueryColumn(jdbcUrl, table, column); tempSlice.set(Key.QUERY_SQL, SingleTableSplitUtil.buildQuerySql(queryColumn, table, where)); splittedConfigs.add(tempSlice); }}}else {
                // querySql mode is configured. If SQL mode is configured, it is relatively simple, several SQL sentences, several segmentation
                List<String> sqls = connConf.getList(Key.QUERY_SQL, String.class);

                // TODO check is configured as multiple statements?
                for(String querySql : sqls) { tempSlice = sliceConfig.clone(); tempSlice.set(Key.QUERY_SQL, querySql); splittedConfigs.add(tempSlice); }}}return splittedConfigs;
    }
Copy the code

Program down from above and is not hard to find in the annotation, the split method can determine whether the need for single table within segmentation, when meets the demand of concurrency is higher, and configure the splitPk partitioning (primary key) parameters, requirements for single table split, split in front of the number have been calculated, or several tables to open several concurrent, The following is a single table split source code: mainly through the primary key, table name, column name, WHERE condition, combined into a SQL, and then by adding where condition to THE SQL, divide the primary key range, and then split the SQL to the corresponding Configuration file class Configuration and form a list, as each task divided out of the Configuration basis.

public static List<Configuration> splitSingleTable(
            Configuration configuration, int adviceNum) {
        List<Configuration> pluginParams = new ArrayList<Configuration>();
        List<String> rangeList;
        String splitPkName = configuration.getString(Key.SPLIT_PK);
        String column = configuration.getString(Key.COLUMN);
        String table = configuration.getString(Key.TABLE);
        String where = configuration.getString(Key.WHERE, null);
        boolean hasWhere = StringUtils.isNotBlank(where);
        
        if (DATABASE_TYPE == DataBaseType.Oracle) {
            rangeList = genSplitSqlForOracle(splitPkName, table, where,
                    configuration, adviceNum);
        } else {
            Pair<Object, Object> minMaxPK = getPkRange(configuration);
            if (null == minMaxPK) {
                throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK,
                        "Shard table by shard primary key failed. DataX supports only one shard primary key and the type is integer or string. Try using another shard primary key or contact your DBA.);
            }

            configuration.set(Key.QUERY_SQL, buildQuerySql(column, table, where));
            if (null == minMaxPK.getLeft() || null == minMaxPK.getRight()) {
                // Start /end is Null
                pluginParams.add(configuration);
                return pluginParams;
            }

            boolean isStringType = Constant.PK_TYPE_STRING.equals(configuration
                    .getString(Constant.PK_TYPE));
            boolean isLongType = Constant.PK_TYPE_LONG.equals(configuration
                    .getString(Constant.PK_TYPE));

            // Perform gradual segmentation to find the critical value
            if (isStringType) {
                rangeList = RdbmsRangeSplitWrap.splitAndWrap(
                        String.valueOf(minMaxPK.getLeft()),
                        String.valueOf(minMaxPK.getRight()), adviceNum,
                        splitPkName, "'", DATABASE_TYPE);
            } else if (isLongType) {
                rangeList = RdbmsRangeSplitWrap.splitAndWrap(
                        new BigInteger(minMaxPK.getLeft().toString()),
                        new BigInteger(minMaxPK.getRight().toString()),
                        adviceNum, splitPkName);
            } else {
                throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK,
                        "The splitPk type DataX that you configured is not supported. DataX supports only one shard primary key and the type is integer or string. Try using another shard primary key or contact your DBA.);
            }
        }
        String tempQuerySql;
        // Store all the SQL in a table
        List<String> allQuerySql = new ArrayList<String>();

        if (null! = rangeList && ! rangeList.isEmpty()) {for (String range : rangeList) {
                Configuration tempConfig = configuration.clone();
                // Here the primary key is shard to get the range and add the range to the WHERE condition to form a new SQL
                tempQuerySql = buildQuerySql(column, table, where)
                        + (hasWhere ? " and " : " where ") + range; allQuerySql.add(tempQuerySql); tempConfig.set(Key.QUERY_SQL, tempQuerySql); pluginParams.add(tempConfig); }}else {
            Configuration tempConfig = configuration.clone();
            tempQuerySql = buildQuerySql(column, table, where)
                    + (hasWhere ? " and " : " where ")
                    + String.format(" %s IS NOT NULL", splitPkName);
            // add to SQL collection
            allQuerySql.add(tempQuerySql);
            tempConfig.set(Key.QUERY_SQL, tempQuerySql);
            pluginParams.add(tempConfig);
        }
        Configuration tempConfig = configuration.clone();
        tempQuerySql = buildQuerySql(column, table, where)
                + (hasWhere ? " and " : " where ")
                + String.format(" %s IS NULL", splitPkName);
        // add to SQL collection
        allQuerySql.add(tempQuerySql);

        tempConfig.set(Key.QUERY_SQL, tempQuerySql);
        pluginParams.add(tempConfig);
        
        return pluginParams;
    }
Copy the code

Conclusion:

  1. Table mode: When splitPk is not configured, the number of tasks is the same as that of tables. For example, if the table is configured with two tasks (table1 and table2), at least two tasks are enabled for table1 and table2 respectively.
  2. Table mode: splitPk is used together with channel. Number of tasks = (rounded up)(Number of channels/tables). When the number of tasks is greater than 1, the task will be divided again. The final number of tasks = Number of tasks x 5 + 1. The configured splitPk will be integrated into the querySql in the Configuration. For example, if an ID is configured, conditions such as ID >1 and ID <5 will be added to the querySql to achieve the splitting effect.
  3. QuerySql mode: There are several querySql, generating the same number of task configurations.
  4. Writer has only the table mode. If there is only one table, ensure that the number of tasks in writer is the same as that in Reader. If there are multiple tables, ensure that the number of tasks in writer is the same as that in Reader
  5. Note: The sharding policy for mysql is applicable to databases that support SQL statements. It does not represent the sharding policy for all data sources

3 JobContainer Task scheduling method: schedule

Select * from taskgroup; select * from taskgroup; select * from taskgroup; select * from taskgroup;

        // The number of tasks to run per taskgroup
        int channelsPerTaskGroup = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL, 5);
        / / the total number of the task
        int taskNumber = this.configuration.getList(
                CoreConstant.DATAX_JOB_CONTENT).size();
        / / taskgroup quantity
        this.needChannelNumber = Math.min(this.needChannelNumber, taskNumber);
Copy the code

Then, obtain the configuration information to determine which tasks each taskGroup needs to run. After determining the number of tasks, allocate tasks to specific Taskgroups evenly, create task actuators, and execute the tasks.

        // Assign specific tasks to specific taskgroups equally.
        List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
                this.needChannelNumber, channelsPerTaskGroup);

        LOG.info("Scheduler starts [{}] taskGroups.", taskGroupConfigs.size());

        ExecuteMode executeMode = null;
        AbstractScheduler scheduler;
        try {
            // Create an actuator to monitor what
        	executeMode = ExecuteMode.STANDALONE;
            scheduler = initStandaloneScheduler(this.configuration);

            / / set executeMode
            for (Configuration taskGroupConfig : taskGroupConfigs) {
                taskGroupConfig.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_MODE, executeMode.getValue());
            }
            
            / /... Omit other code
            
            // Start the task
            scheduler.schedule(taskGroupConfigs);
        } catch (Exception e) {
            / /... Omit other code
        }
Copy the code

The scheduler. Schedule method then calls the startAllTaskGroup method of its AbstractScheduler parent class to start all taskgroups after certain parameters and exception exclusion checks are configured.

    public void startAllTaskGroup(List<Configuration> configurations) {
        // Start a thread pool with the number of taskgroups
        this.taskGroupContainerExecutorService = Executors
                .newFixedThreadPool(configurations.size());

        for (Configuration taskGroupConfiguration : configurations) {
            // Create a TaskGroupContainerRunner thread
            TaskGroupContainerRunner taskGroupContainerRunner = newTaskGroupContainerRunner(taskGroupConfiguration);
            // Enable the taskgroup to run
            this.taskGroupContainerExecutorService.execute(taskGroupContainerRunner);
        }

        this.taskGroupContainerExecutorService.shutdown();
    }
Copy the code

When the thread is started, TaskGroupContainer is launched to run all tasks in a TaskGroup

	@Override
	public void run(a) {
		try {
		    // Set the thread name
            Thread.currentThread().setName(
                    String.format("taskGroup-%d".this.taskGroupContainer.getTaskGroupId()));
            / / start TaskGroupContainer
            this.taskGroupContainer.start();
			this.state = State.SUCCEEDED;
		} catch (Throwable e) {
			this.state = State.FAILED;
			throwDataXException.asDataXException( FrameworkErrorCode.RUNTIME_ERROR, e); }}Copy the code

4 TaskGroupContainer

Then the TaskGroupContainer starts. The TaskGroupContainer starts in two parts:

  1. Initialize task execution status information, including taskId and Congifuration map mapping set, task queue waiting to run, Task FailedexecutorMap, and task set runTasks
  2. Enter a loop to judge the execution status of each task.
  • Check whether any task fails. If any task fails, the task is added to the Task FailedexecutorMap. If the task supports rerun and failOver, the task is put back to the execution queue. If there is no failure, the task is marked as successful and removed from the status polling map
  • If a failed task is found, the status is reported to the container and an exception is thrown
  • Check the length of the current execution queue, and if there are channels in the execution queue, build the TaskExecutor, add it to the execution queue, and remove it from waiting
  • Check the execution queue and the status of all tasks, and if all tasks are successfully executed, report the taskGroup status and exit the loop
  • Check whether the current time exceeds the report time. If the time exceeds the report time, report the current status to the whole world
  • After all tasks are successful, report the current task status to the whole world.

See the source code notes below.

public class TaskGroupContainer extends AbstractContainer {
    private static final Logger LOG = LoggerFactory
            .getLogger(TaskGroupContainer.class);
    // The current jobId of the taskGroup
    private long jobId;
    / / the current taskGroupId
    private int taskGroupId;
    // The channel class to use
    private String channelClazz;
    // Class used by the Task collector
    private String taskCollectorClass;

    private TaskMonitor taskMonitor = TaskMonitor.getInstance();

    public TaskGroupContainer(Configuration configuration) {
        super(configuration);
        initCommunicator(configuration);    // Initializes communicator
        this.jobId = this.configuration.getLong(
                CoreConstant.DATAX_CORE_CONTAINER_JOB_ID);
        / / core. Container. TaskGroup. Id task group id
        this.taskGroupId = this.configuration.getInt(
                CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID);
        / / pipeline implementation class core. Transport. Channel. The class
        this.channelClazz = this.configuration.getString(
                CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CLASS);
        / / the core task collector. The statistics. The collector. The plugin. TaskClass
        this.taskCollectorClass = this.configuration.getString(
                CoreConstant.DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);
    }
    / /...
    @Override
    public void start(a) {
        try {
            / * * * state check interval, is shorter, can be distributed the tasks in a timely manner to the corresponding channel of * core in the container. The taskGroup. SleepInterval * /
            int sleepIntervalInMillSec = this.configuration.getInt(
                    CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100);
            / * * * status report time interval, a bit long, avoid a lot of report * core in the container. The taskGroup. ReportInterval * /
            long reportIntervalInMillSec = this.configuration.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL,
                    10000);
            /** * 2 minutes to report one-time statistics */
            //core.container.taskGroup.channel
            // Get the number of channels
            int channelNumber = this.configuration.getInt(
                    CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL);
            / / maximum number of retries core. Container. Task. FailOver. MaxRetryTimes 1 by default
            int taskMaxRetryTimes = this.configuration.getInt(
                    CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1);
            / / task group retry interval. The core container. Task. FailOver. RetryIntervalInMsec
            long taskRetryIntervalInMsec = this.configuration.getLong(
                    CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000);
            //core.container.task.failOver.maxWaitInMsec
            long taskMaxWaitInMsec = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000);
            // Get all task configurations of the current task group
            List<Configuration> taskConfigs = this.configuration
                    .getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
            int taskCountInThisTaskGroup = taskConfigs.size();
            LOG.info(String.format(
                    "taskGroupId=[%d] start [%d] channels for [%d] tasks.".this.taskGroupId, channelNumber, taskCountInThisTaskGroup));
            // Task group registers communicator
            this.containerCommunicator.registerCommunication(taskConfigs);
            //taskId and task configuration
            Map<Integer, Configuration> taskConfigMap = buildTaskConfigMap(taskConfigs);
            List<Configuration> taskQueue = buildRemainTasks(taskConfigs); // List of tasks to run
            Map<Integer, TaskExecutor> taskFailedExecutorMap = new HashMap<Integer, TaskExecutor>();            //taskId and the last failed instance
            List<TaskExecutor> runTasks = new ArrayList<TaskExecutor>(channelNumber); // The task is running
            Map<Integer, Long> taskStartTimeMap = new HashMap<Integer, Long>(); // Task start time
            long lastReportTimeStamp = 0;
            Communication lastTaskGroupContainerCommunication = new Communication();
           // Start the loop
            while (true) {
               //1. Check the task status
               boolean failedOrKilled = false;
               Map<Integer, Communication> communicationMap = containerCommunicator.getCommunicationMap();  // Task id corresponds to communicator, which is used to collect task operation information
               for(Map.Entry<Integer, Communication> entry : communicationMap.entrySet()){
                  Integer taskId = entry.getKey();
                  Communication taskCommunication = entry.getValue();
                    if(! taskCommunication.isFinished()){continue;   // The current task is not finished
                    }
                    // The finished task is removed from the running task collection
                    TaskExecutor taskExecutor = removeTask(runTasks, taskId);
                    // It is removed from runTasks, so it is removed from monitor
                    taskMonitor.removeTask(taskId);
                    // Failed to check whether the task supports failover and the retry times did not exceed the upper limit
                  if(taskCommunication.getState() == State.FAILED){
                        taskFailedExecutorMap.put(taskId, taskExecutor);
                     if(taskExecutor.supportFailOver() && taskExecutor.getAttemptCount() < taskMaxRetryTimes){
                            taskExecutor.shutdown(); // Close the old executor
                            containerCommunicator.resetCommunication(taskId); // Reset the state of the task
                        Configuration taskConfig = taskConfigMap.get(taskId);
                        taskQueue.add(taskConfig); // Rejoin the task list
                     }else{
                        failedOrKilled = true;
                         break; }}else if(taskCommunication.getState() == State.KILLED){
                     failedOrKilled = true;
                     break;
                  }else if(taskCommunication.getState() == State.SUCCEEDED){
                        Long taskStartTime = taskStartTimeMap.get(taskId);
                        if(taskStartTime ! =null){
                            Long usedTime = System.currentTimeMillis() - taskStartTime;
                            LOG.info("taskGroup[{}] taskId[{}] is successed, used[{}]ms".this.taskGroupId, taskId, usedTime);
                            //usedTime*1000*1000 is converted to the NS of PerfRecord, which is mainly a simple register to print the longest task. Hence the addition of specific static methods
                            PerfRecord.addPerfRecord(taskGroupId, taskId, PerfRecord.PHASE.TASK_TOTAL,taskStartTime, usedTime * 1000L * 1000L); taskStartTimeMap.remove(taskId); taskConfigMap.remove(taskId); }}}// 2. If the taskExecutor status of the taskGroup fails, an error is reported
                if (failedOrKilled) {
                    lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                            lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
​
                    throw DataXException.asDataXException(
                            FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, lastTaskGroupContainerCommunication.getThrowable());
                }
                
                //3. A task is not executed and the number of running tasks is less than the upper limit of the channel
                Iterator<Configuration> iterator = taskQueue.iterator();
                while(iterator.hasNext() && runTasks.size() < channelNumber){
                    Configuration taskConfig = iterator.next();
                    Integer taskId = taskConfig.getInt(CoreConstant.TASK_ID);
                    int attemptCount = 1;
                    TaskExecutor lastExecutor = taskFailedExecutorMap.get(taskId);
                    if(lastExecutor! =null){
                        attemptCount = lastExecutor.getAttemptCount() + 1;
                        long now = System.currentTimeMillis();
                        long failedTime = lastExecutor.getTimeStamp();
                        if(now - failedTime < taskRetryIntervalInMsec){  // The waiting time is not reached
                            continue;
                        }
                        if(! lastExecutor.isShutdown()){// The task that failed last time is still not finished
                            if(now - failedTime > taskMaxWaitInMsec){
                                markCommunicationFailed(taskId);
                                reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                                throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "Task Failover wait timeout");
                            }else{
                                lastExecutor.shutdown(); // Try closing again
                                continue; }}else{
                            LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown".this.taskGroupId, taskId, lastExecutor.getAttemptCount());
                        }
                    }
                    Configuration taskConfigForRun = taskMaxRetryTimes > 1 ? taskConfig.clone() : taskConfig;
                   TaskExecutor taskExecutor = new TaskExecutor(taskConfigForRun, attemptCount);
                    taskStartTimeMap.put(taskId, System.currentTimeMillis());
                   taskExecutor.doStart();
​
                    iterator.remove();
                    runTasks.add(taskExecutor); // Continue to add to the collection of running tasks
                    // Above, add task to runTasks list, so register with monitor.
                    taskMonitor.registerTask(taskId, this.containerCommunicator.getCommunication(taskId));
                  // The task id is removed from the failed map
                    taskFailedExecutorMap.remove(taskId);
                    LOG.info("taskGroup[{}] taskId[{}] attemptCount[{}] is started".this.taskGroupId, taskId, attemptCount);
                }
​
                //4. The task list is empty, executor is finished, and the collection status is SUCCESS --> Success
                if (taskQueue.isEmpty() && isAllTaskDone(runTasks) && containerCommunicator.collectState() == State.SUCCEEDED) {
                   // In case of success, report again. Otherwise, the information collected will be inaccurate if the task ends very quickly
                    lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                            lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                    LOG.info("taskGroup[{}] completed it's tasks.".this.taskGroupId);
                    break;
                }
                // 5. If the current time has exceeded the interval of the reporting time, we need to report it immediately
                long now = System.currentTimeMillis();
                if (now - lastReportTimeStamp > reportIntervalInMillSec) {
                    lastTaskGroupContainerCommunication = reportTaskGroupCommunication(
                            lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
                    lastReportTimeStamp = now;
                    //taskMonitor checks running tasks per reportIntervalInMillSec
                    for(TaskExecutor taskExecutor:runTasks){   taskMonitor.report(taskExecutor.getTaskId(),this.containerCommunicator.getCommunication(taskExecutor.getTaskId()));
                    }

                }
                Thread.sleep(sleepIntervalInMillSec);
            }

            //6. One last debrief
            reportTaskGroupCommunication(lastTaskGroupContainerCommunication, taskCountInThisTaskGroup);
        } catch (Throwable e) {
            Communication nowTaskGroupContainerCommunication = this.containerCommunicator.collect();
            if (nowTaskGroupContainerCommunication.getThrowable() == null) {
                nowTaskGroupContainerCommunication.setThrowable(e);
            }
            nowTaskGroupContainerCommunication.setState(State.FAILED);
            this.containerCommunicator.report(nowTaskGroupContainerCommunication);
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR, e);
        }finally {
            if(! PerfTrace.getInstance().isJob()){// Finally print the average CPU consumption and GC statistics
                VMInfo vmInfo = VMInfo.getVmInfo();
                if(vmInfo ! =null) {
                    vmInfo.getDelta(false); LOG.info(vmInfo.totalString()); } LOG.info(PerfTrace.getInstance().summarizeNoException()); }}}}Copy the code

5 TaskExecute

TaskExecute is an internal class of TaskGroupContainer that manages the execution of the basic task.

  1. Initialize some information, such as initializing the read-write thread, instantiating the pipe that stores the read data, and getting transformer parameters.
  2. After initialization, the read/write thread is started. A single task (some data synchronization tasks) is started.
  3. Read runner uses JDBC to encapsulate each data read from the database into a Record into a Channel. When the data is read, a TerminateRecord identifier will be written at the end.
  4. The WriterRunner reads the Record continuously from a Channel until TerminateRecord identifies the data to be finished and all the data is read into the database
class TaskExecutor {
    private Configuration taskConfig;   // Current task configuration item
    private Channel channel;    // The pipe is used to cache the read data
    private Thread readerThread;    / / read threads
    private Thread writerThread;    / / write threads
    private ReaderRunner readerRunner;
    private WriterRunner writerRunner;
​
    /** ** taskPluginCollector */ * channel * 2. ReaderRunner * 3
    private Communication taskCommunication;
​
    public TaskExecutor(Configuration taskConf, int attemptCount) {
        // Get the taskExecutor configuration
        this.taskConfig = taskConf;
        / /...
        /** * The taskExecutor Communication from taskId is passed to readerRunner and writerRunner, as well as to channel for statistics */
        this.taskCommunication = containerCommunicator
                .getCommunication(taskId);
        // Instantiate the pipe that stores read data
        this.channel = ClassUtil.instantiate(channelClazz,
                Channel.class, configuration);
        this.channel.setCommunication(this.taskCommunication);
        /** * get transformer parameters */
        List<TransformerExecution> transformerInfoExecs = TransformerUtil.buildTransformerInfo(taskConfig);
        /** * generate writerThread */
        writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
        this.writerThread = new Thread(writerRunner,
                String.format("%d-%d-%d-writer",
                        jobId, taskGroupId, this.taskId));
        ContextClassLoader (contextClassLoader); // contextClassLoader (contextClassLoader)
        this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(
                PluginType.WRITER, this.taskConfig.getString(
                        CoreConstant.JOB_WRITER_NAME)));
        /** * Generate readerThread */
        readerRunner = (ReaderRunner) generateRunner(PluginType.READER,transformerInfoExecs);
        this.readerThread = new Thread(readerRunner,
                String.format("%d-%d-%d-reader",
                        jobId, taskGroupId, this.taskId));
        /** * contextClassLoader (contextClassLoader, contextClassLoader, contextClassLoader)
        this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(
                PluginType.READER, this.taskConfig.getString(
                        CoreConstant.JOB_READER_NAME)));
    }
​
    public void doStart(a) {
        this.writerThread.start();
        // Writer can't finish without reader
        if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    this.taskCommunication.getThrowable());
        }
        this.readerThread.start();
        // Here reader may end soon
        if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
            // There is a possibility that the Reader will start and hang, for which an exception must be thrown immediately
            throw DataXException.asDataXException(
                    FrameworkErrorCode.RUNTIME_ERROR,
                    this.taskCommunication.getThrowable()); }}}Copy the code

ReaderRunner (similar to WriterRunner)

  1. ReaderRunner is initialized by Taskexcute’s generateRunner.
  2. ReaderRunner mainly calls the task internal class of the corresponding plugin, calls the init, prepare and startRead methods of each plug-in, and starts to read the database data.
public void run(a) {
        assert null! =this.recordSender;

        Reader.Task taskReader = (Reader.Task) this.getPlugin();

        // Count waitWriterTime and end in finally.
        PerfRecord channelWaitWrite = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_WRITE_TIME);
        try {
            channelWaitWrite.start();

            LOG.debug("task reader starts to do init ...");
            PerfRecord initPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_INIT);
            initPerfRecord.start();
            taskReader.init();
            initPerfRecord.end();

            LOG.debug("task reader starts to do prepare ...");
            PerfRecord preparePerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_PREPARE);
            preparePerfRecord.start();
            taskReader.prepare();
            preparePerfRecord.end();

            LOG.debug("task reader starts to read ...");
            PerfRecord dataPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DATA);
            dataPerfRecord.start();
            taskReader.startRead(recordSender);
            recordSender.terminate();

            dataPerfRecord.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
            dataPerfRecord.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
            dataPerfRecord.end();

            LOG.debug("task reader starts to do post ...");
            PerfRecord postPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_POST);
            postPerfRecord.start();
            taskReader.post();
            postPerfRecord.end();
            // automatic flush
            // super.markSuccess(); This cannot be marked as a success, as this is marked by writerRunner.
        } catch (Throwable e) {
            LOG.error("Reader runner Received Exceptions:", e);
            super.markFail(e);
        } finally {
            LOG.debug("task reader starts to do destroy ...");
            PerfRecord desPerfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.READ_TASK_DESTROY);
            desPerfRecord.start();
            super.destroy();
            desPerfRecord.end();

            channelWaitWrite.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_WRITER_TIME));

            long transformerUsedTime = super.getRunnerCommunication().getLongCounter(CommunicationTool.TRANSFORMER_USED_TIME);
            if (transformerUsedTime > 0) {
                PerfRecord transformerRecord = newPerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.TRANSFORMER_TIME); transformerRecord.start(); transformerRecord.end(transformerUsedTime); }}}Copy the code

Take mysql as an example, mysqlReader reads data through JDBC and forwards it to the corresponding Writer through senderRecord in the form of a Record through a Channel. The code is as follows

public void startRead(Configuration readerSliceConfig, RecordSender recordSender, TaskPluginCollector taskPluginCollector, int fetchSize) {
            String querySql = readerSliceConfig.getString("querySql");
            String table = readerSliceConfig.getString("table");
            PerfTrace.getInstance().addTaskDetails(this.taskId, table + "," + this.basicMsg);
            LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql, this.basicMsg);
            PerfRecord queryPerfRecord = new PerfRecord(this.taskGroupId, this.taskId, PHASE.SQL_QUERY);
            queryPerfRecord.start();
            Connection conn = DBUtil.getConnection(this.dataBaseType, this.jdbcUrl, this.username, this.password);
            DBUtil.dealWithSessionConfig(conn, readerSliceConfig, this.dataBaseType, this.basicMsg);
            int columnNumber = false;
            ResultSet rs = null;

            try {
                rs = DBUtil.query(conn, querySql, fetchSize);
                queryPerfRecord.end();
                ResultSetMetaData metaData = rs.getMetaData();
                int columnNumber = metaData.getColumnCount();
                PerfRecord allResultPerfRecord = new PerfRecord(this.taskGroupId, this.taskId, PHASE.RESULT_NEXT_ALL);
                allResultPerfRecord.start();
                long rsNextUsedTime = 0L;

                for(long lastTime = System.nanoTime(); rs.next(); lastTime = System.nanoTime()) {
                    rsNextUsedTime += System.nanoTime() - lastTime;
                    // Transfer the record to channel via recordSender
                    this.transportOneRecord(recordSender, rs, metaData, columnNumber, this.mandatoryEncoding, taskPluginCollector);
                }

                allResultPerfRecord.end(rsNextUsedTime);
                LOG.info("Finished read record by Sql: [{}\n] {}.", querySql, this.basicMsg);
            } catch (Exception var20) {
                throw RdbmsException.asQueryException(this.dataBaseType, var20, querySql, table, this.username);
            } finally {
                DBUtil.closeDBResources((Statement)null, conn); }}Copy the code

Writer retrieves the Record from a Channel and stores it in the target database.

6 Data structure during data transmission

A separate article will be discussed