In this article, a simple example is used to introduce the quick start process of XXL-job. Xxl-job as a distributed scheduling task framework, how to schedule and manage the bottom layer, this paper will take these problems to understand the operation principle of XXL-Job.

1. The XXL – job first

1.1 XXL – job is introduced

Xxl-job is a distributed task scheduling framework opened by Xu Xueli, a leading figure in Dianping. It is easy to use, lightweight and extensible. Compared with Spring Task, Quartz and XXL-job record execution logs and run large batches, facilitating better Task management for developers and o&M personnel.

1.2 Explanation of Nouns

  • Scheduling center: automatic registration of actuators, task scheduling management, scheduling log recording and other operations.
  • Actuator: An actuator is an application service that is uniquely identified by appName.
  • Task: A task is the smallest scheduling unit and must belong to an executor. The scheduling of tasks supports CRON and fixed speed configuration.
  • JobHandler: A task handler that calls back the development-defined interface when scheduling tasks. This interface is jobHandler, which exists in Spring as a bean.

1.3 System Architecture

The following figure shows the architecture of XXL-Job 2.1

  • The dispatch center contains several modules, such as task management, executive management, log management and so on.
  • The executor consists of the register thread registering automatically, the callback thread performing the task, and the executor adding the schedule to the schedule queue

There are a lot of things involved in this architecture diagram, and at first glance it’s not very clear what the process is. Such as:

  • How are actuators automatically registered with the dispatch center?
  • How does the dispatch center manage actuators?
  • How does the dispatch center trigger a task?
  • How does the task call back to jobHandler?
  • Is there a strategy for executing a task over time?

.

This article will take these questions to understand the working principle of XXL-job, one by one.

2. Register the actuator with the scheduling center

As described above, an executor is an application service that needs to be registered with the scheduling center so that it can be centrally managed by the scheduling center. Xxl-job does not select ZooKeeper, Eureka, or Nacos as the registry, but directly uses the scheduling center as the registry.

2.1 XxlJobConfig Actuator Configuration

Actuator auto-registration should be done when Spring initializes the associated bean, so start with the XxlJobConfig configuration class.

Add XxlJobSpringExecutor executor, which is managed in the Spring Bean container and has the same life cycle as normal beans.

@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
    // Address of the dispatch center
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    / / access token
    @Value("${xxl.job.accessToken}")
    private String accessToken;

    // The name of the actuator application
    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    // Log path
    @Value("${xxl.job.executor.logpath}")
    private String logPath;


    // Log retention days
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor(a) {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        logger.info(">>>>>>>>>>> xxl-job adminAddresses {}. ", adminAddresses);
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

    /** * The "InetUtils" component provided by "Spring-Cloud-Commons" can be used to flexibly customize the registration IP for multiple network cards and in-container deployment. * * 1, introduce dependencies:  * 
      
        * 
       
        org.springframework.cloud
        * 
       
        spring-cloud-commons
        * < version > ${version} < / version > * < / dependency > * * 2, configuration files, or container startup variables * spring in cloud. Inetutils. Preferred - networks: 'XXX. XXX. XXX. 3, obtain IP * * * String ip_ = inetUtils. FindFirstNonLoopbackHostInfo () getIpAddress (); * /
      


}
Copy the code

2.2 XxlJobSpringExecutor executor

XxlJobSpringExecutor inherited base class XxlJobExecutor, implements ApplicationContextAware SmartInitializingSingleton, DisposableBean interfaces, etc.

  • XxlJobExecutor: The basic implementation of the executor, which defines the start and stop methods of the executor, including the generation of the scheduling center agent collection, initialization of the callback thread, initialization of the executor built-in container, etc.
  • ApplicationContextAware: In the Spring lifecycle,ApplicationContextAwareIs the container refresh hook function, available through this interfaceApplicationContextApplication context of
  • SmartInitializingSingleton: This interface can be called when the bean is registered with the Spring container after some post-initialization of the singleton beanafterSingletonsInstantiatedMethod to perform some initialization operations
  • DisposableBean: The hook function called when the bean is destroyed by the container, to make some destruction operations

In the old version, the actuator needs to inherit IJobhandler, implement the execute method, in the new version only need @ xxljob annotations can be added to the method, a class can register multiple tasks, therefore, the implementation is performed for initJobHandlerMethodRepository method. InitJobHandlerMethodRepository execution process is as follows:

  1. Get all annotated beans from applicationContext
  2. Get all of the methods annotated with XxlJob annotations for each bean. Methods annotated with XxlJob annotations are a task
  3. Walk through all the tasks
  4. Register the task handler,registJobHandlerThe parent class is calledXxlJobExecutortheregistJobHandler
/**
 * xxl-job executor (for spring)
 *
 * @author xuxueli 2018-11-01 09:24:52
 */
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware.SmartInitializingSingleton.DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);


    // start
    @Override
    public void afterSingletonsInstantiated(a) {

        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext); * /

        // init JobHandler Repository (for method)
        // Initialize the task handler repository (for the method handler)
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            // call XxlJobExecutor's start method
            super.start();
        } catch (Exception e) {
            throw newRuntimeException(e); }}// destroy
    @Override
    public void destroy(a) {
        super.destroy();
    }


    private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }

        // init job handler from method
        // 1. Get all annotated beans from applicationContext
        String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false.true);
        for (String beanDefinitionName : beanDefinitionNames) {
            Object bean = applicationContext.getBean(beanDefinitionName);

            // 2. Get all the methods annotated with XxlJob annotations for each bean. Methods annotated with XxlJob annotations are tasks
            Map<Method, XxlJob> annotatedMethods = null;   / / referred to: org. Springframework. Context. Event. EventListenerMethodProcessor. ProcessBean
            try {
                annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        new MethodIntrospector.MetadataLookup<XxlJob>() {
                            @Override
                            public XxlJob inspect(Method method) {
                                returnAnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class); }}); }catch (Throwable ex) {
                logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
            }
            if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                continue;
            }

            // 3. Iterate through all tasks
            for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                Method executeMethod = methodXxlJobEntry.getKey();
                XxlJob xxlJob = methodXxlJobEntry.getValue();
                if (xxlJob == null) {
                    continue;
                }

                String name = xxlJob.value();
                if (name.trim().length() == 0) {
                    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "].");
                }
                if(loadJobHandler(name) ! =null) {
                    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                }

                // execute method
                /*if (! (method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) { throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like " public ReturnT
      
        execute(String param) " ."); } if (! method.getReturnType().isAssignableFrom(ReturnT.class)) { throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " + "The correct method format like " public ReturnT
       
         execute(String param) " ."); } * /
       
      

                executeMethod.setAccessible(true);

                // init and destory
                Method initMethod = null;
                Method destroyMethod = null;

                if (xxlJob.init().trim().length() > 0) {
                    try {
                        initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                        initMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "]."); }}if (xxlJob.destroy().trim().length() > 0) {
                    try {
                        destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                        destroyMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "]."); }}// 4. Register the task handler
                // registry jobhandler
                registJobHandler(name, newMethodJobHandler(bean, executeMethod, initMethod, destroyMethod)); }}}// ---------------------- applicationContext ----------------------
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public static ApplicationContext getApplicationContext(a) {
        returnapplicationContext; }}Copy the code
  • MethodJobHandler Method agent

MethodJobHandler inherits IJobHandler and calls the execute, initMethod, and destroyMethod operations on the target object by reflection.

/ * * *@author xuxueli 2019-12-11 21:12:18
*/
public class MethodJobHandler extends IJobHandler {

   // Target object to be proxied
   private final Object target;
   // Task execution method
   private final Method method;
   // Initialize method
   private Method initMethod;
   // Destruction method
   private Method destroyMethod;

   public MethodJobHandler(Object target, Method method, Method initMethod, Method destroyMethod) {
       this.target = target;
       this.method = method;

       this.initMethod = initMethod;
       this.destroyMethod = destroyMethod;
   }

   @Override
   public void execute(a) throws Exception {
       // by reflection method invocationClass<? >[] paramTypes = method.getParameterTypes();if (paramTypes.length > 0) {
           method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types
       } else{ method.invoke(target); }}@Override
   public void init(a) throws Exception {
       if(initMethod ! =null) { initMethod.invoke(target); }}@Override
   public void destroy(a) throws Exception {
       if(destroyMethod ! =null) { destroyMethod.invoke(target); }}@Override
   public String toString(a) {
       return super.toString()+"["+ target.getClass() + "#" + method.getName() +"]"; }}Copy the code

2.3 XxlJobExecutor registered

The registJobHandler method of XxlJobExecutor places each jobHandler name and MethodJobHandler key-value in a thread-safe map cached in memory.

// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    return jobHandlerRepository.put(name, jobHandler);
}
Copy the code

2.4 XxlJobExecutor Initialization

XxlJobSpringExecutor calls XxlJobExecutor’s start method. The start method does the following:

  1. Initialize the log path
  2. Initialize the Admin dispatch center agent and, with the configured adminAdress, generate an AdminBiz agent class for each dispatch center and place it in the adminBizList as a collection of dispatch centers
  3. Initialize the log file cleanup thread, which periodically cleans up expired log files in the background
  4. Initialize the trigger callback thread
  5. Initializes the built-in container of the actuator

The relevant codes for registration are as follows:

AdminBiz defines callback, registration, removal registration, etc., and defines how the executor interacts with the dispatch center

  • The executor invokes the registration interface to register with the dispatch center
  • The scheduling center calls back the task method of the executor
  • The actuator exits and notifies the scheduling center to remove registration information

The initAdminBizList method simply identifies the configured dispatch center address, generates the AdminBizClient agent class, and caches it with a List collection.

// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if(adminAddresses! =null && adminAddresses.trim().length()>0) {
        for (String address: adminAddresses.trim().split(",")) {
            if(address! =null && address.trim().length()>0) {

                AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                if (adminBizList == null) {
                    adminBizList = newArrayList<AdminBiz>(); } adminBizList.add(adminBiz); }}}}public static List<AdminBiz> getAdminBizList(a){
    return adminBizList;
}
Copy the code

The important thing is when to call the adminBiz Registry method. The built-in container was created in the initEmbedServer method of XxlJobExecutor, and the Start method of EmbedServer started the container and started registering the executor.

// ---------------------- executor-server (rpc provider) ----------------------
private EmbedServer embedServer = null;

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    port = port>0? port: NetUtil.findAvailablePort(9999); ip = (ip! =null&&ip.trim().length()>0)? ip: IpUtil.getIp();// generate address
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address: default use address to registry, otherwise use IP :port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // accessToken
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }

    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}
Copy the code

Only part of the code related to EmbedServer and registration is posted here. ExecutorRegistryThread is an executor registration thread,

// ---------------------- registry ----------------------

public void startRegistry(final String appname, final String address) {
    // start registry
    ExecutorRegistryThread.getInstance().start(appname, address);
}

public void stopRegistry(a) {
    // stop registry
    ExecutorRegistryThread.getInstance().toStop();
}
Copy the code

2.5 ExecutorRegistryThread The executor registers threads

ExecutorRegistryThread implements the simple profit model with lazy loading, and starts a background thread that continuously registers the executor with the scheduling center every 30 seconds. And when the built-in container stops, the deregistration method is called

/** * Created by xuxueli on 17/3/2. */
public class ExecutorRegistryThread {
    private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);

    private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
    public static ExecutorRegistryThread getInstance(a){
        return instance;
    }

    private Thread registryThread;
    private volatile boolean toStop = false;
    public void start(final String appname, final String address){

        // valid
        if (appname==null || appname.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
            return;
        }
        if (XxlJobExecutor.getAdminBizList() == null) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
            return;
        }

        registryThread = new Thread(new Runnable() {
            @Override
            public void run(a) {

                // registry
                while(! toStop) {try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {
                                // Register
                                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                                if(registryResult! =null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                    registryResult = ReturnT.SUCCESS;
                                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}".new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}".newObject[]{registryParam, registryResult}); }}catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e); }}}catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }

                    try {
                        if (!toStop) {
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        }
                    } catch (InterruptedException e) {
                        if(! toStop) { logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage()); }}}// registry remove
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                            if(registryResult! =null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}".new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}".newObject[]{registryParam, registryResult}); }}catch (Exception e) {
                            if(! toStop) { logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e); }}}}catch (Exception e) {
                    if(! toStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory."); }}); registryThread.setDaemon(true);
        registryThread.setName("xxl-job, executor ExecutorRegistryThread");
        registryThread.start();
    }

    public void toStop(a) {
        toStop = true;

        // interrupt and wait
        if(registryThread ! =null) {
            registryThread.interrupt();
            try {
                registryThread.join();
            } catch(InterruptedException e) { logger.error(e.getMessage(), e); }}}}Copy the code

2.6 summarize

The process for registering an actuator with the scheduling center is as follows:

  1. XxlJobConfigThe configuration class is registered with the Spring bean containerXxlJobSpringExecutor.XxlJobSpringExecutorThis bean is implementedSmartInitializingSingletonInterface, which initializes the bean after it is instantiated.
  2. XxlJobSpringExecutorWill be calledXxlJobExecutorstartMethod to perform some initialization operations. This includes wrapping the configured dispatch center into oneAdminBizProxy class, which contains callback, register, unregister, and other methods through which the executor is registered.
  3. XxlJobExecutorstartThe method also initializes a built-in container that starts one on startupExecutorRegistryThreadBackground thread registration, by default 30 seconds of heartbeat, continuous throughAdminBizThe agent registers information about the current actuator with the dispatch center.

3. The dispatch center manages registration information

In the previous section, you covered the process of registering an executor with the dispatch center, ultimately through the AdminBiz agent. Xxl-job-core uses internal XXL-RPC to implement remote method invocation. The executor, as a client, initiates remote invocation to the scheduling center through XxlJobRemotingUtil, and the interface route is API/Registry

com.xxl.job.core.biz.client.AdminBizClient#registry

@Override
public ReturnT<String> registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
Copy the code

3.1 JobApiController Remote method

The executor registration will eventually call the Http interface exposed by the JobApiController of xxl-job-admin. In the case of callback, registration, registration removal, etc., the scheduler side of AdminBiz will be called to implement AdminBizImpl.

/** * Created by xuxueli on 17/5/10. */
@Controller
@RequestMapping("/api")
public class JobApiController {

    @Resource
    private AdminBiz adminBiz;

    /**
     * api
     *
     * @param uri
     * @param data
     * @return* /
    @RequestMapping("/{uri}")
    @ResponseBody
    @PermissionLimit(limit=false)
    public ReturnT<String> api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {

        // valid
        if (!"POST".equalsIgnoreCase(request.getMethod())) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
        }
        if (uri==null || uri.trim().length()==0) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
        }
        if(XxlJobAdminConfig.getAdminConfig().getAccessToken()! =null
                && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
                && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }
        
        // If the corresponding callback is called, register or register to remove
        // services mapping
        if ("callback".equals(uri)) {
            List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
            return adminBiz.callback(callbackParamList);
        } else if ("registry".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registry(registryParam);
        } else if ("registryRemove".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registryRemove(registryParam);
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found."); }}}Copy the code

3.2 AdminBizImpl

The AdminBizImpl Registry uses JobRegistryHelper to manage registration information.

@Service
public class AdminBizImpl implements AdminBiz {


    @Override
    public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
        return JobCompleteHelper.getInstance().callback(callbackParamList);
    }

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        / / register
        return JobRegistryHelper.getInstance().registry(registryParam);
    }

    @Override
    public ReturnT<String> registryRemove(RegistryParam registryParam) {
        returnJobRegistryHelper.getInstance().registryRemove(registryParam); }}Copy the code

3.3 JobRegistryHelper registered

Com. XXL. Job. Admin. Core. Thread. JobRegistryHelper# registry method will register information stored in xxl_job_registry table by means of asynchronous

public ReturnT<String> registry(RegistryParam registryParam) {
   // Call the underlying arguments
   // valid
   if(! StringUtils.hasText(registryParam.getRegistryGroup()) || ! StringUtils.hasText(registryParam.getRegistryKey()) || ! StringUtils.hasText(registryParam.getRegistryValue())) {return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
   }

   // Register asynchronously
   // async execute
   registryOrRemoveThreadPool.execute(new Runnable() {
      @Override
      public void run(a) {
         // Save the registration information in xxl_job_registry table
         int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
         if (ret < 1) {
            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

            // freshfreshGroupRegistryInfo(registryParam); }}});return ReturnT.SUCCESS;
}
Copy the code

The xxL_job_registry table is structured as follows. It stores the registered actuators, their addresses, and the latest heartbeat renewal time.


CREATE TABLE `xxl_job_registry` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `registry_group` varchar(50) NOT NULL,
  `registry_key` varchar(255) NOT NULL,
  `registry_value` varchar(255) NOT NULL,
  `update_time` datetime DEFAULT NULL.PRIMARY KEY (`id`),
  KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Copy the code

3.4 Updated JobRegistryHelper registration information

Com. XXL. Job. Admin. Core. Thread. Registered JobRegistryHelper# start will start the thread pool, and opens a monitoring thread, scanning for the failure of those who have registered information and cleaned up. The monitoring thread in the start method does the following:

  1. Gets the auto-registered actuator
  2. Determine whether the registration information renewal time exceeds 3*30s and clear invalid registration information
  3. Update the registration list of each actuator and modify the registration list field of xxL_JOB_group
public void start(a){

   // for registry or remove
   registryOrRemoveThreadPool = new ThreadPoolExecutor(
         2.10.30L,
         TimeUnit.SECONDS,
         new LinkedBlockingQueue<Runnable>(2000),
         new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
               return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-"+ r.hashCode()); }},new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
               r.run();
               logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now)."); }});// for monitor
   registryMonitorThread = new Thread(new Runnable() {
      @Override
      public void run(a) {
         while(! toStop) {try {
               // 1. Obtain the automatically registered actuator
               // auto registry group
               List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
               if(groupList! =null && !groupList.isEmpty()) {

                  // 2. Check whether the renewal timeout period is 3*30s and clear invalid registration information
                  // remove dead address (admin/executor)
                  List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                  if(ids! =null && ids.size()>0) {
                     XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                  }

                  // 3. Update the registration information of each application with appName as key and List as value cached
                  // fresh online address (admin/executor)
                  HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                  List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                  if(list ! =null) {
                     for (XxlJobRegistry item: list) {
                        if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                           String appname = item.getRegistryKey();
                           List<String> registryList = appAddressMap.get(appname);
                           if (registryList == null) {
                              registryList = new ArrayList<String>();
                           }

                           if(! registryList.contains(item.getRegistryValue())) { registryList.add(item.getRegistryValue()); } appAddressMap.put(appname, registryList); }}}// 4. Update the registration information of each app and modify the registration list field of xxL_job_group
                  // fresh group address
                  for (XxlJobGroup group: groupList) {
                     List<String> registryList = appAddressMap.get(group.getAppname());
                     String addressListStr = null;
                     if(registryList! =null && !registryList.isEmpty()) {
                        Collections.sort(registryList);
                        StringBuilder addressListSB = new StringBuilder();
                        for (String item:registryList) {
                           addressListSB.append(item).append(",");
                        }
                        addressListStr = addressListSB.toString();
                        addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                     }
                     group.setAddressList(addressListStr);
                     group.setUpdateTime(newDate()); XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); }}}catch (Exception e) {
               if(! toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); }}try {
               TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            } catch (InterruptedException e) {
               if(! toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
               }
            }
         }
         logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); }}); registryMonitorThread.setDaemon(true);
   registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
   registryMonitorThread.start();
}

public void toStop(a){
   toStop = true;

   // stop registryOrRemoveThreadPool
   registryOrRemoveThreadPool.shutdownNow();

   // stop monitir (interrupt and wait)
   registryMonitorThread.interrupt();
   try {
      registryMonitorThread.join();
   } catch(InterruptedException e) { logger.error(e.getMessage(), e); }}Copy the code

3.5 summarize

  1. The scheduling center exposes an API HTTP interface, which is invoked by the executor to register information during registration
  2. The dispatch center saves the registration information inxxl_job_registryIn the table, this table stores actuator and corresponding address information
  3. Dispatch centerJobRegistryHelperA background monitoring thread is started to clean up the invalid registration information from the registry and synchronize it to the registry list in the executor table

4. To summarize

Limited by the length of the article, this article understands the basic process of registering xxL-job executor to the dispatch center and managing registration information from the source point of view.

The process for registering an actuator with the scheduling center is as follows:

  1. XxlJobConfigThe configuration class is registered with the Spring bean containerXxlJobSpringExecutor.XxlJobSpringExecutorThis bean is implementedSmartInitializingSingletonInterface, which initializes the bean after it is instantiated.
  2. XxlJobSpringExecutorWill be calledXxlJobExecutorstartMethod to perform some initialization operations. This includes wrapping the configured dispatch center into oneAdminBizProxy class, which contains callback, register, unregister, and other methods through which the executor is registered.
  3. XxlJobExecutorstartThe method also initializes a built-in container that starts one on startupExecutorRegistryThreadBackground thread registration, by default 30 seconds of heartbeat, continuous throughAdminBizThe agent registers information about the current actuator with the dispatch center.

The flow for the dispatch center to manage registration information is as follows:

  1. The scheduling center exposes an API HTTP interface, which is invoked by the executor to register information during registration
  2. The dispatch center saves the registration information inxxl_job_registryIn the table, this table stores actuator and corresponding address information
  3. Dispatch centerJobRegistryHelperA background monitoring thread is started to clean up the invalid registration information from the registry and synchronize it to the registry list in the executor table

Xxl-job Task triggering process