In the previous article, the registration principle of XXL-job and the principle of the scheduling center to manage registration information were introduced. In the next part, how the scheduling center schedules tasks and how the scheduling center calls back the task interface of the executor was described.

1. The scheduling center schedules tasks

1.1 Task Triggering Type

How does the dispatch center trigger the task? Xxl-jobs define several sources of triggering tasks

  • Manual trigger: This parameter is manually triggered on the management page

  • Cron triggers periodically: is triggered in the background by configured CRon rules, for example0 */1 * * *?The command is executed every minute
  • Retry when a task fails to be scheduled
  • Parent node trigger: If there are tasks associated with both the parent and child nodes, the parent node triggers the tasks of the child node. The configuration task on the management interface is to configure child node ids

  • Api triggering: Can be triggered by an Api
------ address format: {executor embedded service and address}/run Header: xxl-job-access-token: {request TOKEN} Request data format is as follows, placed in the RequestBody.JSONFormat: {"jobId":1./ / task ID
        "executorHandler":"demoJobHandler".// Task identifier
        "executorParams":"demoJobHandler".// Task parameters
        "executorBlockStrategy":"COVER_EARLY"./ / task blocking strategy, optional value reference com. The XXL. Job. Core. Enums. ExecutorBlockStrategyEnum
        "executorTimeout":0.// Task timeout duration, in seconds, takes effect when the value is greater than zero
        "logId":1.// ID of this scheduling log
        "logDateTime":1586629003729.// Log time of this time
        "glueType":"BEAN"./ / task model, optional value reference com. The XXL. Job. Core. Glue. GlueTypeEnum
        "glueSource":"xxx".// GLUE script code
        "glueUpdatetime":1586629003727.// GLUE script update time, used to determine whether the script has changed and needs to be refreshed
        "broadcastIndex":0.// Sharding parameter: current sharding
        "broadcastTotal":0                          // Fragment parameter: total fragment} Response data format: {"code": 200.// 200 indicates normal or other failures
      "msg": null       // Error message
    }
Copy the code
  • Activation failure trigger:

1.2 time round

Since the triggering principle is studied, we can understand it according to the normal CRon expression. Other triggering scenarios are abnormal triggering scenarios, so I will not elaborate too much here.

In the old version of quest triggering, which was implemented based on Quartz, the time wheel scheme was replaced from V2.1. One is to simplify the system to reduce the redundancy complexity, and the other is to improve the stability of the system.

So what is the time wheel algorithm flow? Why the time wheel instead of Quartz? With these two questions in mind, let’s first look at the data structure of the time wheel:

The picture is reproduced in xxL-job task trigger process

In Java, the time wheel is structured as Map

>, where key is the number of seconds from 0 to 59, and value is the List of task ids to be executed at the corresponding time point.

For each scanning task, the next execution seconds of the task will be put into the list of the corresponding scale. When the time reaches the corresponding scale, it will be taken out from the corresponding list and triggered. The execution of each task can realize resource isolation through the thread pool.

Time wheel involves the idea that the task to be executed will be removed from MSYQL in advance and put into memory, and the corresponding execution time will be removed from memory to trigger. So as to avoid the delay in scheduling because of too many tasks.

  • Round entry condition: When the scanning task is triggered

    1. If the task is completed and the triggering event of the next task is triggered within 5 seconds, for example, if the current task is completed at 10:55:01 and the triggering time of the next task is 10:55:02, the task will be added to the task list with a scale of 2.

    2. The trigger time of this task has not reached

  • Round out condition: obtain the current number of seconds, remove the two seconds before the current number of seconds from the time wheel scale TASK ID list, and trigger.

    1. For example, the current time is10:55:05 , the task is removed from the task list of 03 and 04 scales to trigger

1.3 JobScheduleHelper

The JobScheduleHelper works on scheduling threads, including scheduleThreads and RingThreads. And the storage container of the time wheel Map

> ringData. Each task stores TriggerNextTime and TriggerLastTime, which are updated each time the task is triggered.
,>

ScheduleThread Scheduling threads perform the following operations:

  1. Get database connection, turn off automatic commit, need to commit transaction manually
  2. throughselect xxx for updatePessimistic database lock is obtained to ensure that only one scheduling center is processing at the same time, ensuring data consistency
  3. By reading the list of tasks whose trigger time is less than now+5S from the database, the tasks in the following three intervals can be obtained

(1) TriggerNextTime + 5s < now: This part of the task due to scheduler scheduling failure, no available thread execution, task string execution, etc., resulting in the last scheduling failure, this business scenario is called misfire.

(2) TriggerNextTime + 5s > now and TriggerNextTime < now: this part is the task that will be scheduled immediately

(3) TriggerNextTime < now + 5S: this part is the task to be scheduled, will not trigger immediately, will find the corresponding scale according to the next trigger event, put into the time wheel.

  1. Each task is traversed in turn and processed in the above three cases according to the next triggering time of the task
  2. Update for each taskTriggerNextTime
/ * * *@author xuxueli 2019-05-21
 */
public class JobScheduleHelper {
    private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);

    private static JobScheduleHelper instance = new JobScheduleHelper();
    public static JobScheduleHelper getInstance(a){
        return instance;
    }

    public static final long PRE_READ_MS = 5000;    // pre read

    // Schedule threads
    private Thread scheduleThread;
    // Time wheel thread
    private Thread ringThread;
    private volatile boolean scheduleThreadToStop = false;
    private volatile boolean ringThreadToStop = false;
    // The data structure of the time wheel
    private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

    public void start(a){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run(a) {

                try {
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

                // Prefetch quantity Number of threads in the thread pool * trigger QPS
                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

                while(! scheduleThreadToStop) {// Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {

                        // 1. Obtain the database connection
                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        // 2. Disable automatic commit and commit transactions manually
                        conn.setAutoCommit(false);

                        Mysql > select * for update (select * for update)
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // 4. Start transaction
                        // tx start

                        // 5. Read from the database for preReadCount tasks that will be executed within 5 seconds
                        long nowTime = System.currentTimeMillis();
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if(scheduleList! =null && scheduleList.size()>0) {
                            // 6. Time wheel in wheel operation
                            for (XxlJobInfo jobInfo: scheduleList) {

                                // 6.1 If the trigger time is less than now-5s, the task missed the previous schedule and needs to be processed by misfire policy
                                // time-ring jump
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                    // 2.1, trigger-expire > 5s: pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 1, misfire match
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        / / FIRE_ONCE_NOW trigger
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1.null.null.null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    }

                                    // 2, fresh next
                                    refreshNextValidTime(jobInfo, new Date());

                                6.2 If the triggering time of the task is within now-5s, it will be triggered immediately. If the triggering time is within 5S again, it will be put into the corresponding time wheel of the new scale again
                                } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    // 2.2, trigger-expire < 5s: direct-trigger && make next-trigger-time

                                    / / 1, the trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1.null.null.null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 2, fresh next
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000) %60);

                                        // 2, push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }
                                // 6.3 If the trigger time is not reached, place the task in the time wheel
                                } else {
                                    // 2.3, trigger-pre-read: time-ring trigger && make next-trigger-time

                                    // make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000) %60);

                                    // 2, push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3
                                    refreshNextValidTime(jobInfo, newDate(jobInfo.getTriggerNextTime())); }}// 7. Update the next trigger time for each task
                            // update trigger info
                            for(XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); }}else {
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                        if(! scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); }}finally {

                        // commit
                        if(conn ! =null) {
                            try {
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); }}}// close PreparedStatement
                        if (null! = preparedStatement) {try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); }}}}long cost = System.currentTimeMillis()-start;


                    // Wait seconds, align second
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); } } } } logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop"); }}); scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();


        // ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run(a) {

                while(! ringThreadToStop) {// align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // To avoid too long processing time, step over the scale, check a scale forward;
                        for (int i = 0; i < 2; i++) {
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if(tmpData ! =null) { ringItemData.addAll(tmpData); }}// ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + "=" + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1.null.null.null);
                            }
                            // clearringItemData.clear(); }}catch (Exception e) {
                        if(! ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); }}); ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

    private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
        Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
        if(nextValidTime ! =null) {
            jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
            jobInfo.setTriggerNextTime(nextValidTime.getTime());
        } else {
            jobInfo.setTriggerStatus(0);
            jobInfo.setTriggerLastTime(0);
            jobInfo.setTriggerNextTime(0);
            logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}", jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf()); }}private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);

        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + "=" + Arrays.asList(ringItemData) );
    }

    public void toStop(a){

        // 1
        scheduleThreadToStop = true;
        try {
            TimeUnit.SECONDS.sleep(1);  // wait
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
        if(scheduleThread.getState() ! = Thread.State.TERMINATED){// interrupt and wait
            scheduleThread.interrupt();
            try {
                scheduleThread.join();
            } catch(InterruptedException e) { logger.error(e.getMessage(), e); }}// if has ring data
        boolean hasRingData = false;
        if(! ringData.isEmpty()) {for (int second : ringData.keySet()) {
                List<Integer> tmpData = ringData.get(second);
                if(tmpData! =null && tmpData.size()>0) {
                    hasRingData = true;
                    break; }}}if (hasRingData) {
            try {
                TimeUnit.SECONDS.sleep(8);
            } catch(InterruptedException e) { logger.error(e.getMessage(), e); }}// stop ring (wait job-in-memory stop)
        ringThreadToStop = true;
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
        if(ringThread.getState() ! = Thread.State.TERMINATED){// interrupt and wait
            ringThread.interrupt();
            try {
                ringThread.join();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }

        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
    }


    // ---------------------- tools ----------------------
    public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
        ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
        if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {
            Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
            return nextValidTime;
        } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {
            return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
        }
        return null; }}Copy the code

1.4 Comparison of time wheel and Quartz

Quartz, as a leader in open source job scheduling, is the first choice for job scheduling. Quartz uses apis to manage tasks in a clustered environment to avoid these problems, but there are also the following problems:

Problem 1: The way to call API operation task, not humanized;

Problem 2: The QuartzJobBean needs to be persisted into the underlying data table, which is quite intrusive.

Problem 3: Scheduling logic and QuartzJobBean are coupled in the same project, which will lead to a problem. When the number of scheduling tasks gradually increases and the scheduling task logic gradually increases, the performance of the scheduling system at this time will be greatly limited by services.

Problem 4: At the bottom of Quartz, the DB lock is obtained by “preemption” and the successful node is responsible for the running task, which will lead to a very large load gap between nodes. Xxl-job implements collaborative assignment tasks through actuators, giving full play to cluster advantages and balancing load among nodes.

1.5 summarize

  • Task information is stored inxxl_job_infoIn the table, each task stores the time when the next task will be triggeredTriggerNextTimeAnd the last time the task was triggeredTriggerLastTimeTwo trigger times
  • Task scheduling methods include manual trigger, corn conditional trigger, failed retry or parent trigger
  • Task scheduling is realized based on the time wheel algorithm. The time wheel is a circle with a scale of 60. Each scale has a corresponding task listMap<Integer, List<Integer>>As a data agency for the time wheel.
  • The scheduling center starts a scheduling thread, scheduleThread, which continuously scans the tasks whose next triggering event is less than the current time +5s. The tasks are processed in three types according to their ranges.

2. The scheduling center invokes the callback actuator

In the previous section, you saw how the scheduling center schedules tasks at the appropriate time, and now you see how the scheduling center calls back to the jobHandler interface provided by the executor. The following is reprinted big guy’s drawing, very clear.

Xxl-job Task trigger process

2.1 JobTriggerPoolHelper triggers the thread pool

JobTriggerPoolHelper defines two thread pools, one for fast fastTriggerPool and one for slow slowTriggerPool.

JobTriggerPoolHelper (addTrigger) : JobTriggerPoolHelper (addTrigger) : JobTriggerPoolHelper (addTrigger) :

  1. In order to avoid uneven resource allocation caused by different tasks, two thread pools are used to process fast and slow tasks respectively

  2. If a task times out more than 10 times in a minute, it is considered a slow task and sent to the slow thread pool for execution

  3. Assign the task to XxlJobTrigger for processing

/** * add trigger */
public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {

    // 1. In order to avoid uneven resource allocation due to different execution of multiple tasks, two thread pools are used to process fast tasks and slow tasks respectively
    // If a task times out more than 10 times in a minute, the task is considered slow and will be assigned to the slow thread pool
    // choose thread pool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if(jobTimeoutCount! =null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }

    // trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run(a) {

            long start = System.currentTimeMillis();

            try {
                // do trigger
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {

                // check timeout-count-map
                long minTim_now = System.currentTimeMillis()/60000;
                if(minTim ! = minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); }// incr timeout-count-map
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if(timeoutCount ! =null) { timeoutCount.incrementAndGet(); }}}}}); }Copy the code

2.2 XxlJobTrigger A task is triggered

XxlJobTrigger XxlJobTrigger is the logic that handles the actual task trigger.

  1. Example Query task information from the database
  2. Sets the failed enrichment times and machine execution list, as passed in
  3. Handling shard parameters
  4. If it is a broadcast task, each executor is called in turn
/**
 * trigger job
 *
 * @param jobId
 * @param triggerType
 * @param failRetryCount
 *           >=0: use this param
 *           <0: use param from job info config
 * @param executorShardingParam
 * @param executorParam
 *          null: use job param
 *          not null: cover job param
 * @param addressList
 *          null: use executor addressList
 *          not null: cover
 */
public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {

    // 1. Query the task information from the database
    // load data
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn("> > > > > > > > > > > > the trigger fail, jobId invalid, jobId = {}", jobId);
        return;
    }
    if(executorParam ! =null) {
        jobInfo.setExecutorParam(executorParam);
    }
    // 2. Set the number of failed retries and the actuator address list
    int finalFailRetryCount = failRetryCount>=0? failRetryCount:jobInfo.getExecutorFailRetryCount(); XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());// cover addressList
    if(addressList! =null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // 3. Handle sharding parameters
    // sharding param
    int[] shardingParam = null;
    if(executorShardingParam! =null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]); }}// 4. If it is a broadcast task, the task of each actuator is triggered
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()! =null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); }}else {
        if (shardingParam == null) {
            shardingParam = new int[] {0.1};
        }
        // 5. If other routing policies are used
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); }}Copy the code

ProcessTrigger is the real processing logic, and its main operations include the following:

  1. Stores trigger logs and returns trigger logs
  2. Initialize trigger parameters
  3. Select a machine in the actuator according to the right-of-way policy
  4. Triggers the remote executor to initiate an RPC call
  5. Collect returned information and record it in logs

RunExecutor implements RPC calls by wrapping parameters through an execuorBiz agent.

/**
 * run executor
 * @param triggerParam
 * @param address
 * @return* /
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        // 1. Remote actuator proxy
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        // 2. Call the run method of the remote executor
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("< br > address:").append(address);
    runResultSB.append("< br > code.").append(runResult.getCode());
    runResultSB.append("< br > MSG:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}
Copy the code

2.3 ExecutorBizClient Executive agent

Com. XXL. Job. Admin. Core. The scheduler. XxlJobScheduler# getExecutorBiz through lazy loading way, the address of the remote actuators packaged into a ExecutorBizClient, And put it into a cache of type ConcurrentMap.

private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
    // valid
    if (address==null || address.trim().length()==0) {
        return null;
    }

    // load-cache
    address = address.trim();
    ExecutorBiz executorBiz = executorBizRepository.get(address);
    if(executorBiz ! =null) {
        return executorBiz;
    }

    // set-cache
    executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());

    executorBizRepository.put(address, executorBiz);
    return executorBiz;
}
Copy the code

ExecutorBizClient implements the Run method, which calls the execute method of the executor.

/**
 * admin api test
 *
 * @author xuxueli 2017-07-28 22:14:52
 */
public class ExecutorBizClient implements ExecutorBiz {

    public ExecutorBizClient() {
    }
    public ExecutorBizClient(String addressUrl, String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;

        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
    }

    private String addressUrl ;
    private String accessToken;
    private int timeout = 3;


    @Override
    public ReturnT<String> beat() {
        return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class);
    }

    @Override
    public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){
        return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class);
    }

    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }

    @Override
    public ReturnT<String> kill(KillParam killParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "kill", accessToken, timeout, killParam, String.class);
    }

    @Override
    public ReturnT<LogResult> log(LogParam logParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "log", accessToken, timeout, logParam, LogResult.class);
    }

}
Copy the code

2.4 ExecutorBizImpl RPC call

An ExecutorBizImpl instance is created when the ExecutorBizImpl built-in container is started. The ExecutorBizImpl is the processor of RPC calls from the scheduling center on the interface of the executor, which receives the requests from the scheduling center and executes the corresponding interface methods according to the request methods.

In com. XXL. Job. Core. Server EmbedServer. EmbedHttpServerHandler# executorBiz ExecutorBizImpl is in the process of an instance

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

    // valid
    if(HttpMethod.POST ! = httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
    }
    if (uri==null || uri.trim().length()==0) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
    }
    if(accessToken! =null
            && accessToken.trim().length()>0
            && !accessToken.equals(accessTokenReq)) {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
    }

    // services mapping
    try {
        if ("/beat".equals(uri)) {
            return executorBiz.beat();
        } else if ("/idleBeat".equals(uri)) {
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);
        } else if ("/run".equals(uri)) {
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        } else if ("/kill".equals(uri)) {
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        } else if ("/log".equals(uri)) {
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); }}catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:"+ ThrowableUtil.toString(e)); }}Copy the code

The ExecutorBizImpl Run method does the following:

  1. Retrieves a task thread and its corresponding jobHandler from the thread pool
  2. Process according to blocking policy
  3. Put tasks in a queue
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // Load old: jobHandler + jobThread
    // 1. Obtain a task thread and its corresponding jobHandler from the thread poolJobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); IJobHandler jobHandler = jobThread! =null? jobThread.getHandler():null;
    String removeOldReason = null;

    // Valid: jobHandler + jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // new jobhandler
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // valid old jobThread
        if(jobThread! =null&& jobHandler ! = newJobHandler) {// change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); }}}else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

        // valid old jobThread
        if(jobThread ! =null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return newReturnT<String>(ReturnT.FAIL_CODE, e.getMessage()); }}}else if(glueTypeEnum! =null && glueTypeEnum.isScript()) {

        // valid old jobThread
        if(jobThread ! =null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = newScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); }}else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // 2. Proceed according to the blocking policy
    // executor block strategy
    if(jobThread ! =null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); }}else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null; }}else {
            // just queue trigger}}// replace thread (new or exists invalid)
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // Put the task in a queue
    // push data to queue
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}
Copy the code

2.5 RPC Call of JobThread

The JobThread continually pulls a task from the task queue for processing. The JobThread mainly does the following:

  1. On dutyinitInitialization method
  2. Retrieves a task from the task queue, and if the task has a timeout set, the task is handed overFutureTaskExecute. Otherwise, execute the handler’s execute method.
  3. Record the result log

com.xxl.job.core.thread.JobThread#run

@Override public void run() { // 1. // init try {handler.init(); } catch (Throwable e) { logger.error(e.getMessage(), e); } // execute while(! toStop){ running = false; idleTimes++; TriggerParam triggerParam = null; // To check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam! =null) { running = true; idleTimes = 0; triggerLogIdSet.remove(triggerParam.getLogId()); // log filename, like "logPath/yyyy-MM-dd/9999.log" String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId()); XxlJobContext xxlJobContext = new XxlJobContext( triggerParam.getJobId(), triggerParam.getExecutorParams(), logFileName, triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()); / / initialize the task context / / init job context XxlJobContext. SetXxlJobContext (XxlJobContext); // execute XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam()); if (triggerParam.getExecutorTimeout() > 0) { // limit timeout Thread futureThread = null; try { FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() { @Override public Boolean call()  throws Exception { // init job context XxlJobContext.setXxlJobContext(xxlJobContext); handler.execute(); return true; }}); futureThread = new Thread(futureTask); futureThread.start(); Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS); } catch (TimeoutException e) { XxlJobHelper.log("<br>----------- xxl-job job execute timeout"); XxlJobHelper.log(e); // handle result XxlJobHelper.handleTimeout("job execute timeout "); } finally { futureThread.interrupt(); } } else { // just execute handler.execute(); } // valid execute handle data if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) { XxlJobHelper.handleFail("job  handle result lost."); } else { String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg(); tempHandleMsg = (tempHandleMsg! =null&&tempHandleMsg.length()>50000) ? tempHandleMsg.substring(0, 50000).concat("..." ) :tempHandleMsg; XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg); } XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode=" + XxlJobContext.getXxlJobContext().getHandleCode() + ", handleMsg = " + XxlJobContext.getXxlJobContext().getHandleMsg() ); } else { if (idleTimes > 30) { if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); } } } } catch (Throwable e) { if (toStop) { XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason); } // handle result StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String errorMsg = stringWriter.toString(); XxlJobHelper.handleFail(errorMsg); XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -- -- -- -- -- -- -- -- -- -- - "); } finally { if(triggerParam ! = null) { // callback handler info if (! toStop) { // commonm TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.getXxlJobContext().getHandleCode(), XxlJobContext.getXxlJobContext().getHandleMsg() ) ); } else { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_COCE_FAIL, stopReason + " [job running, killed]" ) ); } } } } // callback trigger request in queue while(triggerQueue ! =null && triggerQueue.size()>0){ TriggerParam triggerParam = triggerQueue.poll(); if (triggerParam! =null) { // is killed TriggerCallbackThread.pushCallBack(new HandleCallbackParam( triggerParam.getLogId(), triggerParam.getLogDateTime(), XxlJobContext.HANDLE_COCE_FAIL, stopReason + " [job not executed, in the job queue, killed.]") ); } } // destroy try { handler.destroy(); } catch (Throwable e) { logger.error(e.getMessage(), e); } logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); }Copy the code

2.6 summarize

  • The scheduling center generates a proxy class for each executorExecutorBizClientAnd RPC communication between the proxy and the executor when the task is triggered.
  • One is created when the built-in container on the actuator side is startedExecutorBizImplInstance, this oneExecutorBizImplIt is the processor of RPC call of the scheduling center on the executor side interface. It accepts the request of the scheduling center and executes the corresponding interface method according to the request method.
  • Tasks on the executor side are assigned to a task thread for execution, and each task has a correspondingjobThreadInstead of executing the task immediately, the task is placed in a blocking queue.
  • Actuator sidejobThreadContinually fetching tasks from the queue for execution and logging the execution.

3. Summary

This article detailed the dispatch center for everyone is how to schedule tasks, and how the end executor is perceived task trigger, and perform the task of logic, the main process can learn XXL – job with the method of the thread pool, to avoid single thread scheduling delay, due to a blocking the task scheduling and execution of asynchronized completely, Thus improving the throughput of the system. Replacing Quartz with a time wheel can further reduce the coupling degree of the system and avoid the outdated scheduling of tasks. Xxl-job many design ideas are worth learning.

Xxl-job Task triggering process