WeChat number: public bugstack wormhole stack | blog: bugstack. Cn precipitation, share, grow, and focus on the original project cases, to share knowledge in the most easy to learn programming way, let oneself and others to learn something. Projects completed so far are; Netty4.x practical case, Implementing JVM with Java, Full-link monitoring based on JavaAgent, Handwritten RPC framework, Architectural design case [Ing], etc. You use the sword 🗡, I use the knife 🔪, good code is burning 😏, hope you don’t stingy move 💨!

I. Introduction

Regular tasks are often used in daily development, for; Library table scans send MQ, T+ N billing settlements, cache data updates, seconds activity status changes, and more. Spring’s Schedule greatly facilitates the use of such scenarios. So, how much do you know about it besides the application;

  1. How many task threads are initialized by default
  2. JobStore has several implementations, which one you usually use
  3. This section describes the execution process of a scheduled task

Confused, is not feeling usually just use, never pay attention to these. Have the urge to search for answers! But just knowing the answer is not much meaning, shouldnt ask not to say, also do not understand the principle. So, if you want to really improve your skills, you have to do it from the ground up.

Case project

In order to do a better source analysis, we will usually use the scheduled task service separate out. Bugstack wormhole stack, source analysis

├─ Java │ ├─ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ Java │ ├─ JobImpl. Java │ └ ─ ─ resources │ ├ ─ ─ props │ │ └ ─ ─ config. The properties │ ├ ─ ─ spring │ │ └ ─ ─ spring config - the schedule - task. XML │ ├─ ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java ├─ Java └ ─ ─ MyTask. JavaCopy the code

Three, environment configuration

  1. JDK 1.8
  2. The IDEA of 2019.3.1
  3. Spring 4.3.24. RELEASE
  4. Quartz 2.3.2 {some code differences between different versions}

4. Source code analysis

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>
Copy the code

Upgrade Quartz depending on the Spring version select 2.3.2 and if you use XML configuration tasks as shown in this example. Then the following changes will occur;

Spring 3.x/org.springframework.scheduling.quart.CronTriggerBean

<bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean"> <property name="jobDetail" ref="taskHandler"/> <property name="cronExpression" value="0/5 * * * * ?" /> </bean>Copy the code

Spring 4.x/org.springframework.scheduling.quartz.CronTriggerFactoryBean

<bean id="taskTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="taskHandler"/> <property name="cronExpression" value="0/5 * * * * ?" /> </bean>Copy the code

To get started, take a look at the default configuration of Quartz, from which many initialization actions are taken, and you can also configure your own configuration files. For example, when you have a lot of tasks and the default initialization of 10 thread groups does not meet your business needs, you can adjust as needed.

quart.properties

# Default Properties file for use by StdSchedulerFactory
# to create a Quartz Scheduler Instance, if a different
# properties file is not explicitly specified.
#

org.quartz.scheduler.instanceName: DefaultQuartzScheduler
org.quartz.scheduler.rmi.export: false
org.quartz.scheduler.rmi.proxy: false
org.quartz.scheduler.wrapJobExecutionInUserTransaction: false

org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 10
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

org.quartz.jobStore.misfireThreshold: 60000

org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
Copy the code

1. Start with a simple example

Schedule is usually used as annotations or XML configuration files, but in order to analyze the code more easily, we started with a simple Demo and put it in the main function.

Demotask.java & defines a task to be executed

public class DemoTask {

    private Logger logger = LoggerFactory.getLogger(DemoTask.class);

    public void execute(a) throws Exception{
        logger.info("Periodic processing of user information task: 0/5 * * * *?"); }}Copy the code

Mytask.java & test class that extracts the code configured in XML

public class MyTask {

    public static void main(String[] args) throws Exception {

        DemoTask demoTask = new DemoTask();

        // Defined; What to perform
        MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
        methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
        methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
        methodInvokingJobDetailFactoryBean.setConcurrent(true);
        methodInvokingJobDetailFactoryBean.setName("demoTask");
        methodInvokingJobDetailFactoryBean.afterPropertiesSet();

        // Defined; Execution plan
        CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
        cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
        cronTriggerFactoryBean.setCronExpression("0/5 * * * *?");
        cronTriggerFactoryBean.setName("demoTask");
        cronTriggerFactoryBean.afterPropertiesSet();

        // Implement; Functions performed
        SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
        schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
        schedulerFactoryBean.setAutoStartup(true);
        schedulerFactoryBean.afterPropertiesSet();

        schedulerFactoryBean.start();

        / / pauseSystem.in.read(); }}Copy the code

If all goes well, here’s what happens:

The 2020-01-04 10:47:16. 369. [the main] INFO org. Quartz. Impl. StdSchedulerFactory [1220] - Using the default implementation for ThreadExecutor 10:47:16 2020-01-04. 421 (main) INFO. Org. Quartz core. SchedulerSignalerImpl [61] - an Initialized Scheduler Signaller of type: Class org. Quartz. Core. SchedulerSignalerImpl 10:47:16. 2020-01-04, 422 [main] INFO. Org. Quartz core. The QuartzScheduler [229] - Quartz Scheduler v. 2.3.2 created. The 2020-01-04 10:47:16. [the main] 423 INFO org. Quartz. Simpl. RAMJobStore [155] - RAMJobStore An initialized. 2020-01-04 10:47:16. 424. [the main] INFO org. Quartz. Core. The QuartzScheduler [294] - the Scheduler meta - data: QuartzScheduler (V2.3.2) 'QuartzScheduler' with instanceId 'NON_CLUSTERED' Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally. NOT STARTED. Currently in standby mode. Number of jobs executed: 0 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads. Using job-store 'org. Quartz. Simpl. RAMJobStore' - which does not support persistence, and is not clustered. The 2020-01-04 10:47:16. 424 [main] INFO org.quartz.impl.StdSchedulerFactory[1374] - Quartz scheduler 'QuartzScheduler' initialized from an Externally provided properties instance. 2020-01-04 10:47:16. 424. [the main] INFO org. Quartz. Impl. StdSchedulerFactory [1378] -  Quartz scheduler version: 2.3.2 10:47:16 2020-01-04. 426. [the main] INFO. Org. Quartz core. The QuartzScheduler [2293] - JobFactory set to: Org. Springframework. Scheduling. Quartz. AdaptableJobFactory @ 3 e9b1010 10:47:16 2020-01-04. 651 (main) INFO org.quartz.core.QuartzScheduler[547] - Scheduler QuartzScheduler_$_NON_CLUSTERED started. January 4, 2020 10:47:16 morning org. Springframework. Scheduling. Quartz. SchedulerFactoryBean startScheduler information: [QuartzScheduler_Worker Starting Quartz Scheduler now 10:47:20 2020-01-04. 321-1] INFO org. Itstack. Demo. The DemoTask [11] - Periodically handle user information tasks: 0/5 * * * *? The 2020-01-04 10:47:25. 001 INFO [QuartzScheduler_Worker - 2] org. Itstack. Demo. The DemoTask [11] - timing task handling user information: 0/5 * * * *? The 2020-01-04 10:47:30. 000 [QuartzScheduler_Worker - 3] INFO org. Itstack. Demo. The DemoTask [11] - timing task handling user information: 0/5 * * * *? The 2020-01-04 10:47:35. 001 [QuartzScheduler_Worker - 4] INFO org. Itstack. Demo. The DemoTask [11] - timing task handling user information: 0/5 * * * *? The 2020-01-04 10:47:40. 000 5] [QuartzScheduler_Worker - INFO org. Itstack. Demo. The DemoTask [11] - timing task handling user information: 0/5 * * * * ? Process finished with exit code -1Copy the code

2. Define execution content (MethodInvokingJobDetailFactoryBean)

// Defined; What to perform
MethodInvokingJobDetailFactoryBean methodInvokingJobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
methodInvokingJobDetailFactoryBean.setTargetObject(demoTask);
methodInvokingJobDetailFactoryBean.setTargetMethod("execute");
methodInvokingJobDetailFactoryBean.setConcurrent(true);
methodInvokingJobDetailFactoryBean.setName("demoTask");
methodInvokingJobDetailFactoryBean.afterPropertiesSet();
Copy the code

The main content will be our task body (that is, to perform a task DemoTask) to MethodInvokingJobDetailFactoryBean management, first of all, set up necessary information;

  • TargetObject: targetObject bean, also known as demoTask
  • TargetMethod: targetMethod name (execute)
  • Concurrent: Indicates whether a task is executed in parallel or not. If the previous task is not completed, the task will not be executed at the next moment
  • Name: XML configuration is not mandatory. BeanName can be obtained from the source code

Finally, we simulate initialization by manually calling afterPropertiesSet(). If our class is handed over to Spring for management, the class that implements the InitializingBean interface will automatically execute afterPropertiesSet() after the class configuration information is loaded. Classes that implement the InitializingBean interface will also implement the FactoryBean interface, since this interface can then get its own custom initialized class via T getObject(). This is often used in framework development as well.

MethodInvokingJobDetailFactoryBean.afterPropertiesSet()

public void afterPropertiesSet(a) throws ClassNotFoundException, NoSuchMethodException {
	prepare();
	// Use specific name if given, else fall back to bean name.
	String name = (this.name ! =null ? this.name : this.beanName);
	// Consider the concurrent flag to choose between stateful and stateless job.Class<? > jobClass = (this.concurrent ? MethodInvokingJob.class : StatefulMethodInvokingJob.class);
	// Build JobDetail instance.
	JobDetailImpl jdi = new JobDetailImpl();
	jdi.setName(name);
	jdi.setGroup(this.group);
	jdi.setJobClass((Class) jobClass);
	jdi.setDurability(true);
	jdi.getJobDataMap().put("methodInvoker".this);
	this.jobDetail = jdi;
	
	postProcessJobDetail(this.jobDetail);
}
Copy the code
  • Line 168: Choose according to whether parallel execution task class, these two classes are MethodInvokingJobDetailFactoryBean inner classes, the parallel execution of StatefulMethodInvokingJob just inherited MethodInvokingJob add tag annotation.

  • Jdi.setjobclass ((Class) jobClass) is actually MethodInvokingJob. MethodInvokingJob is also what we’re ultimately going to reflect call execution on.

  • After initializing the task, assign this. JobDetail = jdi, which is the final class object

    MethodInvokingJobDetailFactoryBean.getObject()

    @Override
    public JobDetail getObject(a) {
    	return this.jobDetail;
    }
    Copy the code
  • Source: line 220: access to return this object. The jobDetail, this also explains why MethodInvokingJobDetailFactoryBean initialized directly assigned to a jobDetail after;

3. Define an execution plan (CronTriggerFactoryBeann)

// Defined; Execution plan
CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setJobDetail(methodInvokingJobDetailFactoryBean.getObject());
cronTriggerFactoryBean.setCronExpression("0/5 * * * *?");
cronTriggerFactoryBean.setName("demoTask");
cronTriggerFactoryBean.afterPropertiesSet();
Copy the code

This section defines the task execution plan and assigns the task execution content to CronTriggerFactoryBean management, and sets the necessary information.

  • JobDetail: Set the task body. In XML, you can directly assign the value to the object and set the information of the jobDetail object to be executed in hard coding. The JobDetailImpl we set above is retrieved from getObject().
  • CronExpression: plan expression; Seconds, minutes, hours, days, months, weeks, years

CronTriggerFactoryBean.afterPropertiesSet()

@Override
public void afterPropertiesSet(a) throws ParseException {
    
	/ /... Verifying attribute Information
	
	CronTriggerImpl cti = new CronTriggerImpl();
	cti.setName(this.name);
	cti.setGroup(this.group);
	if (this.jobDetail ! =null) {
		cti.setJobKey(this.jobDetail.getKey());
	}
	cti.setJobDataMap(this.jobDataMap);
	cti.setStartTime(this.startTime);
	cti.setCronExpression(this.cronExpression);
	cti.setTimeZone(this.timeZone);
	cti.setCalendarName(this.calendarName);
	cti.setPriority(this.priority);
	cti.setMisfireInstruction(this.misfireInstruction);
	cti.setDescription(this.description);
	this.cronTrigger = cti;
}
Copy the code
  • Create the trigger CronTriggerImpl and set the attribute information

  • Generate the execution plan class CTI.setCronexpression (this.cronexpression);

    public void setCronExpression(String cronExpression) throws ParseException {
    	TimeZone origTz = getTimeZone();
    	this.cronEx = new CronExpression(cronExpression);
    	this.cronEx.setTimeZone(origTz);
    }
    Copy the code

    Cronexpression. Java & Parsing Cron expressions

    protected void buildExpression(String expression) throws ParseException {
    	expressionParsed = true;
    	try {
    		
    		/ /... Initialize TreeSet XXX = new TreeSet
            
             ();
            
    		
    		int exprOn = SECOND;
    		StringTokenizer exprsTok = new StringTokenizer(expression, " \t".false);
    				
    		while (exprsTok.hasMoreTokens() && exprOn <= YEAR) {
    			String expr = exprsTok.nextToken().trim();
    			
    			/ /... Verify special characters of DAY_OF_MONTH and DAY_OF_WEEK fields
    			
    			StringTokenizer vTok = new StringTokenizer(expr, ",");
    			while (vTok.hasMoreTokens()) {
    				String v = vTok.nextToken();
    				storeExpressionVals(0, v, exprOn);
    			}
    			exprOn++;
    		}
    		
    		/ /... Verify special characters of DAY_OF_MONTH and DAY_OF_WEEK fields
    		
    	} catch (ParseException pe) {
    		throw pe;
    	} catch (Exception e) {
    		throw new ParseException("Illegal cron expression format ("
    				+ e.toString() + ")".0); }}Copy the code
    • The Cron expression has seven fields, and the CronExpression resolves the seven fields into seven TreeSet objects.
    • When you populate a TreeSet value, the expression is converted to a starting value, an end value, and an increment, and then the matching value is computed and placed in the TreeSet object

CronTriggerFactoryBean.getObject()

@Override
public CronTrigger getObject(a) {
	return this.cronTrigger;
}
Copy the code
  • This. CronTrigger, also known as the CronTriggerImpl object, is returned when an object is retrieved

SchedulerFactoryBean

// It is scheduled; Scheduler to execute
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
schedulerFactoryBean.setTriggers(cronTriggerFactoryBean.getObject());
schedulerFactoryBean.setAutoStartup(true);
schedulerFactoryBean.afterPropertiesSet();

schedulerFactoryBean.start();
Copy the code

This part, like the name of the dispatch factory, is equivalent to a commander. It can perform global scheduling, such as monitoring which trigger is ready, allocating threads, and so on. It also needs to set the necessary attribute information.

  • Triggers: on-demand can set up multiple trigger, this paper set up a cronTriggerFactoryBean. GetObject () is the CronTriggerImpl object
  • AutoStartup: specifies whether to automatically start a task by default. The default value is true

This process is long, including: scheduling factory, thread pool, registration task, etc. The overall core loading process is as follows;

  • The whole loading process is longer, extract part of the core code block for analysis, including the class;
    • StdScheduler
    • StdSchedulerFactory
    • SimpleThreadPool
    • QuartzScheduler
    • QuartzSchedulerThread
    • RAMJobStore
    • CronTriggerImpl
    • CronExpression

SchedulerFactoryBean.afterPropertiesSet()

public void afterPropertiesSet(a) throws Exception {
	if (this.dataSource == null && this.nonTransactionalDataSource ! =null) {
		this.dataSource = this.nonTransactionalDataSource;
	}
	if (this.applicationContext ! =null && this.resourceLoader == null) {
		this.resourceLoader = this.applicationContext;
	}
	// Initialize the Scheduler instance...
	this.scheduler = prepareScheduler(prepareSchedulerFactory());
	try {
		registerListeners();
		registerJobsAndTriggers();
	}
	catch (Exception ex) {
		try {
			this.scheduler.shutdown(true);
		}
		catch (Exception ex2) {
			logger.debug("Scheduler shutdown exception after registration failure", ex2);
		}
		throwex; }}Copy the code
  • PrepareScheduler prepareScheduler(prepareSchedulerFactory()), execute as follows;

    1. SchedulerFactoryBean.prepareScheduler(SchedulerFactory schedulerFactory)
    2. SchedulerFactoryBean.createScheduler(schedulerFactory, this.schedulerName);
    3. SchedulerFactoryBean.createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
    4. Scheduler newScheduler = schedulerFactory.getScheduler();
    5. StdSchedulerFactory.getScheduler();
    6. sched = instantiate(); Includes a series of core operations;
    1) to initialize the threadPool (thread pool) : developers can org. Quartz. The threadPool. Class configuration is used to specify which thread pool class, such as SimpleThreadPool.2Is stored) initialization jobStore (task) : developers can org. Quartz. JobStore. Class configuration specifies which task using storage classes, such as RAMJobStore.3Initializing dataSource: the org.quartz. DataSource configuration allows developers to specify dataSource details, such as which database, account, password, etc.4) Initialize other configurations including SchedulerPlugins, JobListeners, TriggerListeners, etc.5Initialize threadExecutor(threadExecutor) : default is DefaultThreadExecutor;6) Create a worker thread: Create N workers according to the configuration, run start() to start the thread, and add the N threads to the idle thread list of threadPool instance availWorkers.7Create a scheduler thread: Create an instance of QuartzSchedulerThread and start the scheduler thread with threadeXEcutor.execute (instance);8) Create a scheduler: Create an instance of StdScheduler, combine all of the above configurations and references into the instance, and store the instance to a scheduler poolCopy the code
  • Source line 477: call the superclass SchedulerAccessor registerJobsAndTriggers registered task (), and triggers

    for (Trigger trigger : this.triggers) {
    	addTriggerToScheduler(trigger);
    }
    Copy the code

SchedulerAccessor. AddTriggerToScheduler () & SchedulerAccessor is SchedulerFactoryBean parent class

private boolean addTriggerToScheduler(Trigger trigger) throws SchedulerException {
	booleantriggerExists = (getScheduler().getTrigger(trigger.getKey()) ! =null);
	if (triggerExists && !this.overwriteExistingJobs) {
		return false;
	}
	// Check if the Trigger is aware of an associated JobDetail.
	JobDetail jobDetail = (JobDetail) trigger.getJobDataMap().remove("jobDetail");
	if (triggerExists) {
		if(jobDetail ! =null&&!this.jobDetails.contains(jobDetail) && addJobToScheduler(jobDetail)) {
			this.jobDetails.add(jobDetail);
		}
		try {
			getScheduler().rescheduleJob(trigger.getKey(), trigger);
		}
		catch (ObjectAlreadyExistsException ex) {
			if (logger.isDebugEnabled()) {
				logger.debug("Unexpectedly encountered existing trigger on rescheduling, assumably due to " +
						"cluster race condition: " + ex.getMessage() + " - can safely be ignored"); }}}else {
		try {
			if(jobDetail ! =null&&!this.jobDetails.contains(jobDetail) &&
					(this.overwriteExistingJobs || getScheduler().getJobDetail(jobDetail.getKey()) == null)) {
				getScheduler().scheduleJob(jobDetail, trigger);
				this.jobDetails.add(jobDetail);
			}
			else{ getScheduler().scheduleJob(trigger); }}catch (ObjectAlreadyExistsException ex) {
			if (logger.isDebugEnabled()) {
				logger.debug("Unexpectedly encountered existing trigger on job scheduling, assumably due to " +
						"cluster race condition: " + ex.getMessage() + " - can safely be ignored");
			}
			if (this.overwriteExistingJobs) { getScheduler().rescheduleJob(trigger.getKey(), trigger); }}}return true;
}
Copy the code
  • AddJobToScheduler (jobDetail) is called to RAMJobStore to store task information in HashMap

    (100).
    ,>

    public void storeJob(JobDetail newJob,
        boolean replaceExisting) throws ObjectAlreadyExistsException {
    	JobWrapper jw = new JobWrapper((JobDetail)newJob.clone());
    	boolean repl = false;
    	synchronized (lock) {
    		if(jobsByKey.get(jw.key) ! =null) {
    			if(! replaceExisting) {throw new ObjectAlreadyExistsException(newJob);
    			}
    			repl = true;
    		}
    		if(! repl) {// get job group
    			HashMap<JobKey, JobWrapper> grpMap = jobsByGroup.get(newJob.getKey().getGroup());
    			if (grpMap == null) {
    				grpMap = new HashMap<JobKey, JobWrapper>(100);
    				jobsByGroup.put(newJob.getKey().getGroup(), grpMap);
    			}
    			// add to jobs by group
    			grpMap.put(newJob.getKey(), jw);
    			// add to jobs by FQN map
    			jobsByKey.put(jw.key, jw);
    		} else {
    			// update job detail
    			JobWrapper orig = jobsByKey.get(jw.key);
    			orig.jobDetail = jw.jobDetail; // already cloned}}}Copy the code
  • Initialize a thread group.

    • prepareScheduler
    • createScheduler
    • schedulerFactory
    • StdSchedulerFactory.getScheduler()
    • getScheduler()->instantiate()
    • Tp.initialize ();

    SimpleThreadPool. The initialize () & the count is the number in the default configuration, you can change

     // create the worker threads and start them
     Iterator<WorkerThread> workerThreads = createWorkerThreads(count).iterator();
     while(workerThreads.hasNext()) {
    	 WorkerThread wt = workerThreads.next();
    	 wt.start();
    	 availWorkers.add(wt);
     }
    Copy the code

5. Start the scheduled task

Case use hard-coded way call schedulerFactoryBean. Start () to start the thread. Thread collaboration is implemented through Object sigLock. Siglock. wait() methods are in the run method of QuartzSchedulerThread, so sigLock wakes up only QuartzSchedulerThread. The core process is as follows;

During this startup process, the core code class is as follows;

  • StdScheduler
  • QuartzScheduler
  • QuartzSchedulerThread
  • ThreadPool
  • RAMJobStore
  • CronTriggerImpl
  • JobRunShellFactory

& start QuartzScheduler. Start ()

public void start(a) throws SchedulerException {

    if (shuttingDown|| closed) {
        throw new SchedulerException(
                "The Scheduler cannot be restarted after shutdown() has been called.");
    }
	
    // QTZ-212 : calling new schedulerStarting() method on the listeners
    // right after entering start()
    notifySchedulerListenersStarting();
    
	if (initialStart == null) {
        initialStart = new Date();
        this.resources.getJobStore().schedulerStarted();            
        startPlugins();
    } else {
        resources.getJobStore().schedulerResumed();
    }
	
    // Wake up the thread
	schedThread.togglePause(false);
	
    getLog().info(
            "Scheduler " + resources.getUniqueIdentifier() + " started.");
    
    notifySchedulerListenersStarted();
}
Copy the code

QuartzSchedulerThread. The run () & execution process

@Override
public void run(a) {
    int acquiresFailed = 0;
	
	// The loop will exit only if halt() is called
    while(! halted.get()) {try {
			
			// If it is paused, the loop timeout waits 1000 ms

            // wait a bit, if reading from job store is consistently failing (e.g. DB is down or restarting)..
           
		    // Block until free threads are available and return the number of available threads
            int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
            if(availThreadCount > 0) {
			
                List<OperableTrigger> triggers;
                long now = System.currentTimeMillis();
                clearSignaledSchedulingChange();
                
				try {
					// Obtain acquire state Trigger list, that is, the task to be executed
                    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBat
                    acquiresFailed = 0;
                    if (log.isDebugEnabled())
                        log.debug("batch acquisition of " + (triggers == null ? 0 : triggers
                } catch() {/ /... }
				
                if(triggers ! =null && !triggers.isEmpty()) {
                    
					// Get the next time the first Trigger of the List is triggered
					long triggerTime = triggers.get(0).getNextFireTime().getTime();
                    
					// Four: get the task trigger set
					List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
					
					// Triggering: Triggers a triggering state
					qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                    
					// Create JobRunShell
					qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
					
					// Seven: Execute Job
					qsRsrcs.getThreadPool().runInThread(shell)
					
                    continue; // while (! halted)}}else { // if(availThreadCount > 0)
                // should never happen, if threadPool.blockForAvailableThreads() follows con
                continue; // while (! halted)}}catch(RuntimeException re) {
            getLog().error("Runtime error occurred in main trigger firing loop.", re);
        }
    }
    
    qs = null;
    qsRsrcs = null;
}
Copy the code
  • Create JobRunShell. The JobRunShell instance in initialize() sets the JobDetailImpl containing the business logic class as its member property in preparation for executing the business logic code later. The business logic code is executed in the runInThread(shell) method.

    QuartzSchedulerThread. The run () & code

    JobRunShell shell = null;
    try {
    	shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
    	shell.initialize(qs);
    } catch (SchedulerException se) {
    	qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
    	continue;
    }
    Copy the code
  • Qsrcs.getthreadpool ().runinThread (shell) qsrsrcs.getThreadPool ().runinThread (shell)

    SimpleThreadPool.runInThread

    // Save a collection of all workerThreads
    private List<WorkerThread> workers;
    // Set of idle workerThreads
    private LinkedList<WorkerThread> availWorkers = new LinkedList<WorkerThread>();
    // Set of workerThreads for the task
    private LinkedList<WorkerThread> busyWorkers = new LinkedList<WorkerThread>();
    
    Availwork.removefirst (); availwork.removefirst (); Busyworkers.add () * Then call the workthread.run (runnable) method */
    public boolean runInThread(Runnable runnable) {
    	if (runnable == null) {
    		return false;
    	}
    
    	synchronized (nextRunnableLock) {
    
    		handoffPending = true;
    
    		// Wait until a worker thread is available
    		while ((availWorkers.size() < 1) && !isShutdown) {
    			try {
    				nextRunnableLock.wait(500);
    			} catch (InterruptedException ignore) {
    			}
    		}
    
    		if(! isShutdown) { WorkerThread wt = (WorkerThread)availWorkers.removeFirst(); busyWorkers.add(wt); wt.run(runnable); }else {
    			// If the thread pool is going down, execute the Runnable
    			// within a new additional worker thread (no thread from the pool).
    			
    			WorkerThread wt = new WorkerThread(this, threadGroup,
    					"WorkerThread-LastJob", prio, isMakeThreadsDaemons(), runnable);
    			busyWorkers.add(wt);
    			workers.add(wt);
    			wt.start();
    		}
    		nextRunnableLock.notifyAll();
    		handoffPending = false;
    	}
    
    	return true;
    }
    Copy the code
  • WorkerThread is an internal class that assigns and wakes up a queue of waiting threads for a lock object

    WorkerThread.run(Runnable newRunnable)

    public void run(Runnable newRunnable) {
    	synchronized(lock) {
    		if(runnable ! =null) {
    			throw new IllegalStateException("Already running a Runnable!"); } runnable = newRunnable; lock.notifyAll(); }}Copy the code
  • The WorkerThread’s run method executes lock.notifyAll(), and the corresponding WorkerThread goes to the run() method. To this! Near dawn! We’ve finally arrived at the penultimate step of the execute() method that executes the business. The runnable object is a JobRunShell object, and now look at the jobrunshell.run () method.

    WorkerThread.run()

    @Override
    public void run(a) {
    	boolean ran = false;
    	
    	while (run.get()) {
    		try {
    			synchronized(lock) {
    				while (runnable == null && run.get()) {
    					lock.wait(500);
    				}
    				if(runnable ! =null) {
    					ran = true;
    					// Start the actual execution. Runnable is JobRunShell
    					runnable.run();
    				}
    			}
    		} cache(){/ /... }
    	}
    	//if (log.isDebugEnabled())
    	try {
    		getLog().debug("WorkerThread is shut down.");
    	} catch(Exception e) {
    		// ignore to help with a tomcat glitch}}Copy the code

Jobrunshell.run () & is called here from workerThread.run () above

public void run(a) {
    qs.addInternalSchedulerListener(this);

    try {
        OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
        JobDetail jobDetail = jec.getJobDetail();

        do {
            // ...

            long startTime = System.currentTimeMillis();
            long endTime = startTime;

            // execute the job
            try {
                log.debug("Calling execute on job " + jobDetail.getKey());
                
				Execute the business code, which is our task
				job.execute(jec);
                
				endTime = System.currentTimeMillis();
            } catch (JobExecutionException jee) {
                endTime = System.currentTimeMillis();
                jobExEx = jee;
                getLog().info("Job " + jobDetail.getKey() +
                        " threw a JobExecutionException: ", jobExEx);
            } catch (Throwable e) {
                endTime = System.currentTimeMillis();
                getLog().error("Job " + jobDetail.getKey() +
                        " threw an unhandled Exception: ", e);
                SchedulerException se = new SchedulerException(
                        "Job threw an unhandled exception.", e);
                qs.notifySchedulerListenersError("Job ("
                        + jec.getJobDetail().getKey()
                        + " threw an exception.", se);
                jobExEx = new JobExecutionException(se, false);
            }

            jec.setJobRunTime(endTime - startTime);

            // Other code
        } while (true);

    } finally {
        qs.removeInternalSchedulerListener(this); }}Copy the code

Quartzjobbean.execte () & Continue

public final void execute(JobExecutionContext context) throws JobExecutionException {
	try {
		BeanWrapper bw = PropertyAccessorFactory.forBeanPropertyAccess(this);
		MutablePropertyValues pvs = new MutablePropertyValues();
		pvs.addPropertyValues(context.getScheduler().getContext());
		pvs.addPropertyValues(context.getMergedJobDataMap());
		bw.setPropertyValues(pvs, true);
	}
	catch (SchedulerException ex) {
		throw new JobExecutionException(ex);
	}
	executeInternal(context);
}
Copy the code

MethodInvokingJobDetailFactoryBean->MethodInvokingJob.executeInternal(JobExecutionContext context)

protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
	try {
		// reflection executes business code
		context.setResult(this.methodInvoker.invoke());
	}
	catch (InvocationTargetException ex) {
		if (ex.getTargetException() instanceof JobExecutionException) {
			// -> JobExecutionException, to be logged at info level by Quartz
			throw (JobExecutionException) ex.getTargetException();
		}
		else {
			// -> "unhandled exception", to be logged at error level by Quartz
			throw new JobMethodInvocationFailedException(this.methodInvoker, ex.getTargetException()); }}catch (Exception ex) {
		// -> "unhandled exception", to be logged at error level by Quartz
		throw new JobMethodInvocationFailedException(this.methodInvoker, ex); }}Copy the code

Five, to sum up

  • Quartz, which stands for quartz, is a metaphor for the precise grasp of time, like a quartz clock.
  • Source code analysis is a very happy process, the happiness is to finish the analysis of happiness. Behind the horizontal and horizontal interaction is a high degree of object-oriented decoupling, a brilliant use of threads, and a list of tasks to perform, which is a fantastic piece of work.
  • For Quartz. Properties, developers do not need to customize the quartz configuration for simple scenarios. The default quartz configuration can be used, but for more demanding scenarios, it should be customized. Such as through org. Quartz. The threadPool. ThreadCount set enough number of threads can improve the performance of the job scene under more.
  • Quartz is highly decoupled from the task processing, job and trigger are decoupled, and the task itself is decoupled from the task execution strategy, so that N tasks and M execution strategies can be easily combined.
  • The scheduler is separate, acting as a commander, and can perform global scheduling tasks, such as listening for which triggers are ready, allocating threads, and so on.
  • External links:
    • www.quartz-scheduler.org
    • Quartz – 2.1 x/configuration