“This is the 15th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”

1. Implement Async asynchronous tasks

1.1 Environment Preparations

Configure the @enableAsync annotation on the Spring Boot entry class to enable asynchronous processing. Create a task abstract class AbstractTask and configure three task methods: doTaskOne(), doTaskTwo(), and doTaskThree().

public abstract class AbstractTask {
    private static Random random = new Random();
​
    public void doTaskOne(a) throws Exception {
        System.out.println("Start on task one.");
        long start = currentTimeMillis();
        sleep(random.nextInt(10000));
        long end = currentTimeMillis();
        System.out.println("Complete Task 1, Time:" + (end - start) + "毫秒");
    }
​
    public void doTaskTwo(a) throws Exception {
        System.out.println("Start on task two.");
        long start = currentTimeMillis();
        sleep(random.nextInt(10000));
        long end = currentTimeMillis();
        System.out.println("Complete Task 2, Time:" + (end - start) + "毫秒");
    }
​
    public void doTaskThree(a) throws Exception {
        System.out.println("Start on task three.");
        long start = currentTimeMillis();
        sleep(random.nextInt(10000));
        long end = currentTimeMillis();
        System.out.println("Complete Task 3, Time:" + (end - start) + "毫秒"); }}Copy the code

1.2 Synchronous Invocation

Here is a simple example to get an intuitive understanding of what synchronous calls are:

  • defineTaskClass, inheritance,AbstractTask, the three processing functions simulate the operations of three tasks respectively, and the operation consumption time is randomly selected (10Seconds).
@Component
public class SyncTask extends AbstractTask {}Copy the code
  • inUnit testingUse case, injectionSyncTaskObject and execute in the test casedoTaskOne().doTaskTwo().doTaskThree()Three ways.
@RunWith(SpringRunner.class)
@SpringBootTest
public class TaskTest {
    @Autowired
    private SyncTask task;
​
    @Test
    public void testSyncTasks(a) throws Exception { task.doTaskOne(); task.doTaskTwo(); task.doTaskThree(); }}Copy the code
  • When you execute the unit test, you see output similar to the following:
Start task 1 Finish Task 1:6,720 milliseconds Start Task 2 Finish Task 2:6,604 milliseconds Start Task 3 Finish task 3:9,448 millisecondsCopy the code

DoTaskOne (), doTaskTwo(), and doTaskThree() are executed in sequence.

1.3 Asynchronous Invocation

Although the above synchronous invocation successfully executed the three tasks, it can be seen that the execution time is relatively long. If there is no dependency between the three tasks and they can be executed concurrently, the execution efficiency of synchronous invocation is relatively poor. Therefore, asynchronous invocation can be considered for concurrent execution.

  • createAsyncTaskClass, configured separately on methods@AsyncAnnotate the originalSynchronized methodsintoAsynchronous methods.
@Component
public class AsyncTask extends AbstractTask {
    @Async
    public void doTaskOne(a) throws Exception {
        super.doTaskOne();
    }
​
    @Async
    public void doTaskTwo(a) throws Exception {
        super.doTaskTwo();
    }
​
    @Async
    public void doTaskThree(a) throws Exception {
        super.doTaskThree(); }}Copy the code
  • inUnit testingUse case, injectionAsyncTaskObject and execute in the test casedoTaskOne().doTaskTwo().doTaskThree()Three ways.
@Autowired
private AsyncTask asyncTask;
​
@Test
public void testAsyncTasks(a) throws Exception {
    asyncTask.doTaskOne();
    asyncTask.doTaskTwo();
    asyncTask.doTaskThree();
}
Copy the code
  • When you execute the unit test, you see output similar to the following:
Let's do task three let's do task one let's do task twoCopy the code

If you run unit tests repeatedly, you may encounter different results, such as:

  1. There is no task-related output
  2. There are partial task-specific outputs
  3. Out-of-order task-related output

The reason is that doTaskOne(), doTaskTwo(), and doTaskThree() are currently executed asynchronously. After the main program is called asynchronously, the main program does not care whether the execution of these three functions is complete. Because there is no other content to be executed, the program will automatically end, resulting in incomplete or no output task related content.

Note: Functions decorated by @async should not be defined as static, so asynchronous calls do not take effect.

1.4 Asynchronous Callback

In order for doTaskOne(), doTaskTwo(), and doTaskThree() to finish properly, suppose we need to count the total time it takes to execute the three tasks simultaneously. We need to record the time and calculate the result after all three functions have been used.

So how do we determine if the above three asynchronous calls have completed? We need to use the Future to return the result of the asynchronous call.

  • createAsyncCallBackTaskClasses, a statementdoTaskOneCallback().doTaskTwoCallback().doTaskThreeCallback()Three methods, the original three methods for packaging.
@Component
public class AsyncCallBackTask extends AbstractTask {
    @Async
    public Future<String> doTaskOneCallback(a) throws Exception {
        super.doTaskOne();
        return new AsyncResult<>("Mission one complete.");
    }
​
    @Async
    public Future<String> doTaskTwoCallback(a) throws Exception {
        super.doTaskTwo();
        return new AsyncResult<>("Mission two completed.");
    }
​
    @Async
    public Future<String> doTaskThreeCallback(a) throws Exception {
        super.doTaskThree();
        return new AsyncResult<>("Mission three completed."); }}Copy the code
  • inUnit testingUse case, injectionAsyncCallBackTaskObject and execute in the test casedoTaskOneCallback().doTaskTwoCallback().doTaskThreeCallback()Three ways. Cycle callFutureisDone()Method waits for threeConcurrent tasksWhen the execution is complete, record the final execution time.
@Autowired
private AsyncCallBackTask asyncCallBackTask;
​
@Test
public void testAsyncCallbackTask(a) throws Exception {
    long start = currentTimeMillis();
    Future<String> task1 = asyncCallBackTask.doTaskOneCallback();
    Future<String> task2 = asyncCallBackTask.doTaskTwoCallback();
    Future<String> task3 = asyncCallBackTask.doTaskThreeCallback();
​
    // All three tasks are completed
    while(! task1.isDone() || ! task2.isDone() || ! task3.isDone()) { sleep(1000);
    }
​
    long end = currentTimeMillis();
    System.out.println("All tasks completed, total time:" + (end - start) + "毫秒");
}
Copy the code

See what changes have been made:

  • Record the start time at the beginning of the test case;
  • Returns a result object of type Future when three asynchronous functions are called;
  • After three asynchronous functions are called, a loop is opened to determine whether all three asynchronous functions are finished based on the Future object returned. If both end, the loop ends; If not, wait 1 second before judging.
  • After exiting the loop, calculate the total time required for the three tasks to be executed concurrently based on the end time and start time.

Run the unit test above and see the following results:

Start Task 3 Start Task 1 Start Task 2 Complete Task 2:2,572 ms Complete Task 1:7,333 ms Complete Task 3:7,647 ms Complete all tasks: 8,013 msCopy the code

It can be seen that through asynchronous invocation, task one, task two and task three are executed concurrently, which effectively reduces the total running time of the program.

2. Plan thread pools for asynchronous tasks

2.1 The role of thread pools

  1. Prevents unlimited expansion of resource usage
  2. The invocation procedure saves time for resource creation and destruction

In the previous section, one of our asynchronous tasks opened a thread and destroyed it when it was finished. In a high concurrency environment, constantly allocating new resources may cause system resource exhaustion. So to avoid this problem, we plan a thread pool for asynchronous tasks.

2.2 Defining a thread pool

In the above operation, create a thread pool configuration class TaskConfiguration and configure a task thread pool object taskExecutor.

@Configuration
public class TaskConfiguration {
    @Bean("taskExecutor")
    public Executor taskExecutor(a) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(60);
        executor.setThreadNamePrefix("taskExecutor-");
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        returnexecutor; }}Copy the code

We created a thread pool using ThreadPoolTaskExecutor and set the following parameters:

Thread pool properties The role of attributes Setting the initial value
Core threads The number of threads initialized when the thread pool was created 10
Maximum number of threads The maximum number of threads in the thread pool, and only after the buffer queue is full will more threads than the number of core threads be applied 20
Buffer queue A queue used to buffer the execution of tasks 200
The idle time allowed for threads Threads beyond the core thread are destroyed when idle time expires 60 seconds
The prefix of the thread pool name Can be used to locate the thread pool where the processing task resides taskExecutor-
The processing policy of the thread pool for rejected tasks In this case, the CallerRunsPolicy policy is used. When the thread pool has no processing capacity, this policy will directly run the rejected task in the execute method calling thread. If the executor is closed, the task is discarded CallerRunsPolicy
  • createAsyncExecutorTaskClass, three task configuration andAsyncTaskIt’s the same, but it’s different@AsyncAnnotations need to specify the previous configurationThe name of the thread pooltaskExecutor.
@Component
public class AsyncExecutorTask extends AbstractTask {
    @Async("taskExecutor")
    public Future<String> doTaskOneCallback(a) throws Exception {
        super.doTaskOne();
        System.out.println("Task 1, current thread:" + Thread.currentThread().getName());
        return new AsyncResult<>("Mission one complete.");
    }
​
    @Async("taskExecutor")
    public Future<String> doTaskTwoCallback(a) throws Exception {
        super.doTaskTwo();
        System.out.println("Task 2, current thread:" + Thread.currentThread().getName());
        return new AsyncResult<>("Mission two completed.");
    }
​
    @Async("taskExecutor")
    public Future<String> doTaskThreeCallback(a) throws Exception {
        super.doTaskThree();
        System.out.println("Task 3, current thread:" + Thread.currentThread().getName());
        return new AsyncResult<>("Mission three completed."); }}Copy the code
  • inUnit testingUse case, injectionAsyncExecutorTaskObject and execute in the test casedoTaskOne().doTaskTwo().doTaskThree()Three ways.
@RunWith(SpringRunner.class)
@SpringBootTest
public class AsyncExecutorTaskTest {
    @Autowired
    private AsyncExecutorTask task;
​
    @Test
    public void testAsyncExecutorTask(a) throws Exception {
        task.doTaskOneCallback();
        task.doTaskTwoCallback();
        task.doTaskThreeCallback();
​
        sleep(30 * 1000L); }}Copy the code

Run the unit test above and see the following results:

Start Task 1 Start Task 3 Start Task 2 Finish Task 2, duration: 3905 ms Task 2, current thread: TaskExecutor-2 Complete Task 1, duration: 6184 ms Task 1, current thread: TaskExecutor-1 Complete Task 3, Duration: Task 3, current thread: taskExecutor-3Copy the code

Execute the unit test above and observe that the prefix of the thread pool name of the task thread pool is printed, indicating that the thread pool successfully executed the asynchronous task!

2.3 Gracefully close thread pools

Because the asynchronous task is still running when the application is shut down, objects such as the database connection pool are destroyed, and an error occurs when the database is operated on in the asynchronous task.

Solution as follows, resetting the thread pool configuration object, the new thread pool setWaitForTasksToCompleteOnShutdown () and setAwaitTerminationSeconds () function:

@Bean("taskExecutor")
public Executor taskExecutor(a) {
    ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
    executor.setPoolSize(20);
    executor.setThreadNamePrefix("taskExecutor-");
    executor.setWaitForTasksToCompleteOnShutdown(true);
    executor.setAwaitTerminationSeconds(60);
    return executor;
}
Copy the code
  • setWaitForTasksToCompleteOnShutdown(true):This method is used to setThread pool shutdownwhenWaiting for theWhen all tasks are complete, continueThe destructionThe rest of theBeanSo theseAsynchronous tasksThe destructionWill precedeDatabase connection pool objectThe destruction.
  • SetAwaitTerminationSeconds (60) : this method is used to set the thread pool task waiting time, if more than this time haven’t destroy force to destroy, to ensure that the application of the last to be closed, and not blocked.

3. Quartz Dynamic timed tasks (database persistence)

3.1 introduction

In the process of project development, some scheduled tasks may not be needed after running for a period of time, or the execution time of scheduled tasks may need to be modified. It needs to be modified in code and repackaged for distribution, which is a hassle. Using Quartz, you don’t need to rework the code.

3.2 the principle

  1. Add, delete, change and check configuration tasks using the Quartz API
  2. Save the configuration of the task in the database

3.3 configuration

Application. Yml introduced maven dependencies in the previous section, which will not be repeated here. Add quartz configuration information directly under spring properties

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/testdb? useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
    username: root
    password: 123456
    driver-class-name: com.mysql.cj.jdbc.Driver
  quartz:
    job-store-type: JDBC The database stores the Quartz task configuration
    jdbc:
      initialize-schema: NEVER # autoinitialize table structure, first start with always
Copy the code

But may be a version of the bug, sometimes automatically built table will not take effect and to quartz scheduler – X.X.X.J ar inside look for building table SQL script: The classpath: org/quartz/impl/jdbcjobstore/tables_ @ @ platform @ @. SQL, then execute.

3.4 Dynamic configuration code implementation

The first step is to create a scheduled task related entity class to save the scheduled task related information to the database

@Data
public class QuartzBean {
    /** Task id */
    private String id;
​
    /** Task name */
    private String jobName;
​
    /** Task execution class */
    private String jobClass;
​
    /** Whether the task status is started or suspended */
    private Integer status;
​
    /** Task run time expression */
    private String cronExpression;
}
Copy the code

Step 2 Create a scheduled task pause, modify, start, and start the utility class once

public class QuartzUtils {
​
    /** * Create a scheduled task. After a scheduled task is created, it starts by default *@paramScheduler *@paramQuartzBean Periodic task information class */
    @SuppressWarnings("unchecked")
    public static void createScheduleJob(Scheduler scheduler, QuartzBean quartzBean) throws ClassNotFoundException, SchedulerException {
            // The execution class that gets the scheduled task must be the absolute path name of the class
            // The scheduled task class needs to be a concrete implementation of the job class. QuartzJobBean is an abstract class of the job.
            Class<? extends Job> jobClass = (Class<? extends Job>) Class.forName(quartzBean.getJobClass());
            // Build the scheduled task information
            JobDetail jobDetail = JobBuilder.newJob(jobClass).withIdentity(quartzBean.getJobName()).build();
            // Set the execution mode of the scheduled task
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzBean.getCronExpression());
            // Build trigger trigger
            CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(quartzBean.getJobName()).withSchedule(scheduleBuilder).build();
            scheduler.scheduleJob(jobDetail, trigger);
    }
​
    /** * Suspends the scheduled task * based on the task name@paramScheduler *@paramJobName Name of the scheduled task */
    public static void pauseScheduleJob(Scheduler scheduler, String jobName) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobName);
        scheduler.pauseJob(jobKey);
    }
​
    /** * Restore the scheduled task by task name *@paramScheduler *@paramJobName Name of the scheduled task */
    public static void resumeScheduleJob(Scheduler scheduler, String jobName) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobName);
        scheduler.resumeJob(jobKey);
    }
​
    /** * Immediately run a scheduled task based on the task name *@paramScheduler *@paramJobName Name of the scheduled task */
    public static void runOnce(Scheduler scheduler, String jobName) throws SchedulerException {
        JobKey jobKey = JobKey.jobKey(jobName);
        scheduler.triggerJob(jobKey);
    }
​
    /** * Updates scheduled tasks *@paramScheduler *@paramQuartzBean Periodic task information class */
    public static void updateScheduleJob(Scheduler scheduler, QuartzBean quartzBean) throws SchedulerException {
​
            // Get the trigger for the task
            TriggerKey triggerKey = TriggerKey.triggerKey(quartzBean.getJobName());
            // Set the execution mode of the scheduled task
            CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzBean.getCronExpression());
            // Rebuild the task trigger
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey);
            trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build();
            // Reset the corresponding job
            scheduler.rescheduleJob(triggerKey, trigger);
    }
​
    /** * Deletes the scheduled task * from the scheduler based on the scheduled task name@paramScheduler *@paramJobName Name of the scheduled task */
    public static void deleteScheduleJob(Scheduler scheduler, String jobName) throws SchedulerException { JobKey jobKey = JobKey.jobKey(jobName); scheduler.deleteJob(jobKey); }}@Controller
@RequestMapping("/quartz/job/")
public class QuartzController {
    // Inject task scheduling
    @Resource
    private Scheduler scheduler;
​
    @PostMapping("/create")
    @ResponseBody
    public String createJob(@RequestBody QuartzBean quartzBean) throws SchedulerException, ClassNotFoundException {
        QuartzUtils.createScheduleJob(scheduler,quartzBean);
        return "Created Task";// Return is not production-level code
    }
​
    @PostMapping("/pause")
    @ResponseBody
    public String pauseJob(String jobName) throws SchedulerException {
        QuartzUtils.pauseScheduleJob (scheduler,jobName);
        return "Suspended successfully";// Return is not production-level code
    }
​
    @PostMapping("/run")
    @ResponseBody
    public String runOnce(String jobName) throws SchedulerException {
        QuartzUtils.runOnce (scheduler,jobName);
        return "Run task" + jobName + "Success";// Return is not production-level code
    }
​
    @PostMapping("/resume")
    @ResponseBody
    public String resume(String jobName) throws SchedulerException {
        QuartzUtils.resumeScheduleJob(scheduler,jobName);
        return "Succeeded in restoring the scheduled task:" + jobName;
    }
​
    @PostMapping("/update")
    @ResponseBody
    public String update(@RequestBody QuartzBean quartzBean) throws SchedulerException {
        QuartzUtils.updateScheduleJob(scheduler,quartzBean);
        return "Succeeded in updating the scheduled task scheduling information"; }}Copy the code