Project source code address: gitee.com/lidishan/xx… Xxl-job version: 2.3.0 XXL-job includes an executor and a scheduler. Our usual client belongs to an executor. When the executor is started, it will automatically register with the scheduler, and then the scheduler will conduct remote scheduling.
The steps of the scheduler initialization are as follows
1. Internationalization
Configuration parameters: xxl.job.i18n=zh_CN, set this parameter to Simplified Chinese
2 Initialize the fastTriggerPool thread and slowTriggerPool thread
Configuration parameters: XXL. Job. Triggerpool. Fast. Max = 200, here is set to the maximum number of threads fastTriggerPool = 200, not less than 200 XXL. Job. Triggerpool. Missile. Max = 100, The maximum number of threads set to slowTriggerPool =100 and cannot be less than 100
3 Start the registered listening thread
3.1 the initialization registryOrRemoveThreadPool thread pool: A thread pool that is used to register or remove threads when a client calls API/Registry or API /registryRemove. The registration information of the heartbeat exceeds 90s is cleared and the group registration information is refreshed
4 Starting the listening thread of the failed task (retry and alarm)
Configuration parameters: [email protected], alarm mailbox
5 Start the monitoring thread
5.1 Initializing the callbackThreadPool thread pool: the thread pool used for callback callbacks when clients call the API /callback interface. If the scheduling record stays in the Running state for more than 10 minutes and the heartbeat registration failure of the corresponding actuator is not online, the local scheduling failure is actively marked
6 Start log statistics collection and clear the logrThread
Refresh log records. Refresh log reports in the last three days (counting failures, successes, and run times each day). Clear invalid and expired log data once a day. Xxl.job. logretentionDays =30, the expiration time of xxl-job database logs is cleared. The expiration time of xxl-job database logs is less than 7 days
7 Start task scheduling (** very important!! ** Mainly rely on these two threads to plug data into the time wheel, and then the time wheel takes the number of scheduling tasks)
Select for Update database as a distributed lock to avoid simultaneous execution of xxL-job admin scheduler nodes. Prefetch data: Reads the job information that will be executed within 5 seconds from the database. And read the article page size for preReadCount = 6000 data – preReadCount = (XxlJobAdminConfig. GetAdminConfig () getTriggerPoolFastMax () + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; — Step 3: Compare the current time with the next scheduling time. **** The current time is greater than (the next triggering time of a task + PRE_READ_MS (5s)) : ——– : DO_NOTHING= void; DO_NOTHING= void; FIRE_ONCE_NOW= Once triggered immediately after the task expires ——– 2. Refresh the last trigger time and the time to be triggered next **** The current time is longer than the next trigger time of the task and has not expired: — — — — — — — — 1, directly trigger tasks — — — — — — — — 2, refresh the last trigger and to trigger the next time — — — — — — — – 3, if the next trigger for five seconds, Directly into the time spend scheduling in the wheel — — — — — — — — — — — — — — — — 1, the current task the next trigger time the N seconds in a minute — — — — — — — — — — — — — — — – 2, the current task ID inside the wheel and ringSecond into the time — — — — — — — — — — — — — — — – 3. Refresh the last trigger time and the time to be triggered next time **** The current time is less than the next trigger time: ——– 2. Put the current task ID and ringSecond into the time wheel ——– 3. Refresh the last trigger time and the time to be triggered next time — step 4: Update database execution information (trigger_last_time, trigger_next_time)
7.2 ringThread Thread – Executing a Job based on the time wheel (fetching data) The time wheel data is in the following format: Map
,>
ringData = new ConcurrentHashMap<>() The second step is to run the job list that was not executed on the previous scale. The third step is to clear the current scale list. **** ——– blocking policy: serial, discard after, overwrite before ——– Routing policy: Take first, take last, minimum distribution, consistent Hash, fast failure, least frequently used LFU, least recently used LRU, random, and polling
The entry code for initialization is XxlJobAdminConfig as follows
@Component
public class XxlJobAdminConfig implements InitializingBean.DisposableBean {
private static XxlJobAdminConfig adminConfig = null;
public static XxlJobAdminConfig getAdminConfig(a) {
return adminConfig;
}
// ---------------------- XxlJobScheduler ----------------------
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet(a) throws Exception {// Property injection in the lifecycle to initialize xxlJobScheduler
adminConfig = this;
// Initialize scheduled xxl-job tasks
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
@Override
public void destroy(a) throws Exception { // Destroy in the lifecycle to destroy xxlJobSchedulerxxlJobScheduler.destroy(); }... Omit... }Copy the code
Xxljobscheduler.init () performs the following initialization:
public class XxlJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
public void init(a) throws Exception {
// 1 internationalization related init i18n
initI18n();
// 2 Initialize the fastTriggerPool and slowTriggerPool slowTriggerPool admin trigger pool start
JobTriggerPoolHelper.toStart();
// 3 Start the admin Registry monitor run thread
JobRegistryHelper.getInstance().start();
// 4 Starting the task listening thread (retry and alarm) admin fail-monitor run
JobFailMonitorHelper.getInstance().start();
// 5 Start the monitoring thread (The scheduling record stays in the Running state for more than 10 minutes, and the corresponding executor heartbeat registration fails and is not online. Admin lose-monitor run (Depend on JobTriggerPoolHelper)
JobCompleteHelper.getInstance().start();
// 6 Start the log statistics collection and clearing thread (refreshing log records to refresh the log reports of the last three days (counting failures, successes, and running times each day). Delete invalid log data once a day
JobLogReportHelper.getInstance().start();
// 7 Start task scheduling (scheduleThread- Takes the data of tasks to the time wheel; RingThread - Execute job tasks based on time round) start-schedule (Depend on JobTriggerPoolHelper)
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success."); }... Omit... }Copy the code
The seven initialization steps above are broken down as follows
1. Internationalization
private void initI18n(a){// Set title to Chinese, English, etc
for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name()))); }}Copy the code
2 Initialize the fastTriggerPool thread and slowTriggerPool thread
SlowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool = slowTriggerPool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if(jobTimeoutCount! =null && jobTimeoutCount.get() > 10) { SlowTriggerPool slowTriggerPool slowTriggerPool slowTriggerPool slowTriggerPool slowTriggerPool
triggerPool_ = slowTriggerPool;
}
triggerPool_.execute(newRunnable() {......... Omit... }Copy the code
3 Start the registered listening thread
3.1 the initialization registryOrRemoveThreadPool thread pool: A thread pool that is used to register or remove threads when a client calls API/Registry or API /registryRemove. The registration information of the heartbeat exceeds 90s is cleared and the group registration information is refreshed
public void start(a){
// This thread pool is used to register or unregister for Registry or remove when clients call API /registry or API /registryRemove interfaces
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2.10.30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-"+ r.hashCode()); }},new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now)."); }});// Start the registered thread for monitor
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run(a) {
while(! toStop) {try {
// Obtain the auto-registered actuator group (actuator address type: 0= automatic registration, 1= manual entry) Auto Registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if(groupList! =null && !groupList.isEmpty()) {// group The group set is not empty
// Remove dead call addresses (heartbeat over 90 seconds, when the thread is down. The default is 30 seconds. Remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if(ids! =null && ids.size()>0) {// Remove the failed registration address
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
// Find all normal registered addresses that are not dead
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if(list ! =null) {
for (XxlJobRegistry item: list) {
// Make sure it is an EXECUTOR EXECUTOR type
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List<String> registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList<String>();
}
if(! registryList.contains(item.getRegistryValue())) { registryList.add(item.getRegistryValue()); } appAddressMap.put(appname, registryList); }}}// Refresh the group registration address
for (XxlJobGroup group: groupList) {
List<String> registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if(registryList! =null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(newDate()); XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group); }}}catch (Exception e) {
if(! toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e); }}try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if(! toStop) { logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop"); }}); registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
}
Copy the code
4 Starting the listening thread of the failed task (retry and alarm)
This part of the logic is relatively simple, is retry + alarm, the core code is as follows
// Obtain information about the job that failed to be executed
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if(failLogIds! =null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1. Retry the Fail Retry Monitor
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = " > > > > > > > > > > >"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2. Fail alarm Monitor
int newAlarmStatus = 0; // Alarm status: 0- default, -1= locked, 1- No alarm, 2- Alarm succeeded, 3- Alarm failed
if(info! =null&& info.getAlarmEmail()! =null && info.getAlarmEmail().trim().length()>0) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus); }}Copy the code
5 Start the monitoring thread
5.1 Initializing the callbackThreadPool thread pool: the thread pool used for callback callbacks when clients call the API /callback interface. If the scheduling record stays in the Running state for more than 10 minutes and the heartbeat registration failure of the corresponding actuator is not online, the local scheduling failure is actively marked
6 Start log statistics collection and clear the logrThread
Refresh log records. Refresh log reports in the last three days (counting failures, successes, and run times each day). Clear invalid and expired log data once a day. Xxl.job. logretentionDays =30, the expiration time of xxl-job database logs is cleared. The expiration time of xxl-job database logs is less than 7 days
7 Starting task Scheduling (Focus!!!!!)
Select for Update database as a distributed lock to avoid simultaneous execution of xxL-job admin scheduler nodes. Prefetch data: Reads the job information that will be executed within 5 seconds from the database. And read the article page size for preReadCount = 6000 data – preReadCount = (XxlJobAdminConfig. GetAdminConfig () getTriggerPoolFastMax () + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20; — Step 3: Compare the current time with the next scheduling time. **** The current time is greater than (the next triggering time of a task + PRE_READ_MS (5s)) : ——– : DO_NOTHING= void; DO_NOTHING= void; FIRE_ONCE_NOW= Once triggered immediately after the task expires ——– 2. Refresh the last trigger time and the time to be triggered next **** The current time is longer than the next trigger time of the task and has not expired: — — — — — — — — 1, directly trigger tasks — — — — — — — — 2, refresh the last trigger and to trigger the next time — — — — — — — – 3, if the next trigger for five seconds, Directly into the time spend scheduling in the wheel — — — — — — — — — — — — — — — — 1, the current task the next trigger time the N seconds in a minute — — — — — — — — — — — — — — — – 2, the current task ID inside the wheel and ringSecond into the time — — — — — — — — — — — — — — — – 3. Refresh the last trigger time and the time to be triggered next time **** The current time is less than the next trigger time: ——– 2. Put the current task ID and ringSecond into the time wheel ——– 3. Refresh the last trigger time and the time to be triggered next time — step 4: Update database execution information (trigger_last_time, trigger_next_time)
7.2 ringThread Thread – Executing a Job based on the time wheel (fetching data) The time wheel data is in the following format: Map
,>
ringData = new ConcurrentHashMap<>() The second step is to run the job list that was not executed on the previous scale. The third step is to clear the current scale list. **** ——– blocking policy: serial, discard after, overwrite before ——– Routing policy: Take first, take last, minimum distribution, consistent Hash, fast failure, least frequently used LFU, least recently used LRU, random, and polling
- Start two thread parsing core source code as follows:
public void start(a){
// Start the schedule threads that fetch data
scheduleThread = new Thread(new Runnable() {
@Override
public void run(a) {
try {// Don't know why sleep for 4-5 seconds, then start again
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); } } logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
Pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, QPS = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while(! scheduleThreadToStop) {// Scan task Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
// Select for update. Xxl-job uses a database table as a distributed lock to ensure that only one xxL-job task can be executed at the same time under multiple xxL-job admin nodes
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1, pre read data
long nowTime = System.currentTimeMillis();
PreReadCount =6000 preReadCount=6000 preReadCount=6000
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if(scheduleList! =null && scheduleList.size()>0) {
// 2, push time wheel push time ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// The current time is greater than (task next trigger time + PRE_READ_MS (5s)), maybe the query is too long, then the following code refreshes the task next execution time, resulting in more than five seconds, so special processing is required
// 2.1, trigger-expire > 5s: pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1, match the expired policy: DO_NOTHING= expired nothing, discard; FIRE_ONCE_NOW= Triggers a misfire match immediately after expiration
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
/ / FIRE_ONCE_NOW trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1.null.null.null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2, refresh the last trigger and the next trigger time fresh next
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// The current time is greater than the task's next trigger time and is not expired
// 2.2, trigger-expire < 5s: direct-trigger && make next-trigger-time
// 1. Trigger the task executor directly
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1.null.null.null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2, refresh the last trigger and the next trigger time fresh next
refreshNextValidTime(jobInfo, new Date());
// If the next trigger is within 5 seconds, put it directly into the time wheel for scheduling. Next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000) %60);
// 2, put the current task ID and ringSecond into the time wheel push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3, refresh the last trigger and the next trigger time fresh next
refreshNextValidTime(jobInfo, newDate(jobInfo.getTriggerNextTime())); }}else {
// The current time is less than the next trigger time
// 2.3, trigger-pre-read: time-ring trigger && make next-trigger-time
// make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000) %60);
// 2, put the current task ID and ringSecond into the time wheel push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3, refresh the last trigger and the next trigger time fresh next
refreshNextValidTime(jobInfo, newDate(jobInfo.getTriggerNextTime())); }}Trigger_last_time, trigger_next_time update trigger INFO
for(XxlJobInfo jobInfo: scheduleList) { XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo); }}else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
if(! scheduleThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e); }}finally {
// Commit the transaction, release the lock commit of the database select for update. Omit... }long cost = System.currentTimeMillis()-start;
// If execute too fast, sleep Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if(! scheduleThreadToStop) { logger.error(e.getMessage(), e); }}}); scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// Time wheel thread, used to fetch data per second, and then process ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run(a) {
while(! ringThreadToStop) {// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
// Get the current second in a minute, and then run for twice, the second time is to run the job list that was not executed on the previous scale, so that the previous scale is not missing
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // To avoid too long processing time, step over the scale, check a scale forward;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if(tmpData ! =null) { ringItemData.addAll(tmpData); }}// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + "=" + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// Execute the trigger do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1.null.null.null);
}
// Clear the current scale listringItemData.clear(); }}catch (Exception e) {
if(! ringThreadToStop) { logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop"); }}); ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
Copy the code