\

About the author: WeDO Experimental Jun, data analyst; Love life, love writing.

Application scenarios of task scheduling

The so-called task scheduling is to arrange the task execution plan, that is, when to execute, how to execute and so on. They often appear in real projects; In particular, data projects, such as real-time statistics of website visits every 5 minutes, need to analyze visits from log data every 5 minutes.

The following are the application scenarios of task scheduling:

  • Offline job scheduling: Executes a task by time granularity
  • Shared cache update: periodically refresh the cache, such as redis cache; Shared data between different processes

Task scheduling tool

  • The Crontab of Linux supports task execution by minute, hour, day, month, and week
  • Java Quartz,
  • Task planning for Windows

This article introduces the task scheduler library in Python, APScheduler (Advance Python Scheduler). If you know Quartz, APScheduler is a Python implementation of Quartz; APScheduler provides scheduling schemes based on time, fixed point in time and Crontab, which can be used as a cross-platform scheduling tool.

APScheduler

Component is introduced

APScheduler consists of five parts: trigger, scheduler, task memory, actuator, and task event.

  • Job: task ID and task execution func
  • Triggers: Determine when a task starts executing
  • Job Stores: Stores the state of tasks
  • Executors: Sure How to execute the task
  • Event: Monitors the abnormal task execution
  • Schedulers: Concatenates the entire life cycle of a task to add editstasktoTask memoryIn thetaskThe execution time comes when thetasktoactuatorExecute return result; At the same time issue event listening, monitoringTask event

The installation

pip install apscheduler
Copy the code

A simple example

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
import logging
import datetime


# task execution function
def job_func(job_id) :
    print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))


# event listener
def job_exception_listener(event) :
    if event.exception:
        # todo: handle exceptions, alarms, etc
        print('The job crashed :(')
    else:
        print('The job worked :)')


# log
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)


Define a background task non-blocking scheduler
scheduler = BackgroundScheduler()
Add a task to memory
# trigger: trigger='interval' seconds=10
# executor='default' thread execution
# jobstore: jobstore='default' default memory store
# number of concurrent instances: max_instances
scheduler.add_job(job_func, trigger='interval', args=[1].id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
Set task listening
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)


Start the scheduler
scheduler.start()
Copy the code

Operating Conditions:

job 1 is runed at 2020-03-21 20:00:38 The job worked :) job 1 is runed at 2020-03-21 20:00:48 The job worked :) job 1 is  runed at 2020-03-21 20:00:58 The job worked :)Copy the code

The trigger

Triggers determine when tasks are executed, and APScheduler supports three types of triggers

  • Trigger =’interval’ : This parameter is executed at a fixed time interval, including weeks, days, hours, minutes, seconds, and a specified time range

    sched.add_job(job_function, 'interval', hours=2, start_date='the 2010-10-10 09:30:00', end_date='the 2014-06-15 11:00:00')
    Copy the code
  • Trigger =’date’: fixed time, execute once

    sched.add_job(my_job, 'date', run_date=datetime(2009.11.6.16.30.5), args=['text'])
    Copy the code
  • Trigger =’cron’: Supports the crontab mode to execute tasks

    • Parameter: minute/hour/day/month/week granularity, also can specify the time range

      year (int|str) -4-digit year
      month (int|str) - the month (1-12)
      day (int|str-- Day of the (1-31)
      week (int|str) -- ISO Week (1-53)
      day_of_week (int|str) - the numberor name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
      hour (int|str) – hour (0-23)
      minute (int|str) – minute (0-59)
      second (int|strThe second () -0-59)
      start_date (datetime|str) - the earliest possible date/time to trigger on (inclusive) end_date (datetime |str) -- Latest possible date/time to trigger on (inclusive)Copy the code
    • example

         Run job_function until 2014-05-30 00:00:00
         sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
      
      
         Run the crontab command in the following format: minute, hour, day, month, week. * indicates all
         # Execute job_function from 1st to 15th of May to August at 00:00
         sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))
      Copy the code

actuator

Actuators decide how to perform tasks; APScheduler supports four different actuators, including pool(thread/process) and GEvent (IO multiplexing, supporting high concurrency). The default is pool thread. Different actuators can be configured in the scheduler configuration (see scheduler).

  • Apscheduler. Executors. Asyncio: synchronous IO, blocking
  • Apscheduler. Executors. Gevent: IO multiplexing, non-blocking
  • Apscheduler. Executors. The pool: thread ThreadPoolExecutor and processes ProcessPoolExecutor
  • Apscheduler. Executors. Twisted: based on the event-driven

Task memory

The task memory determines how tasks are saved and is stored in memory (MemoryJobStore) by default and is gone after a restart. APScheduler supports the following task stores:

  • Apscheduler. Jobstores. Memory: memory
  • Apscheduler. Jobstores. Mongo: stored in the mongo
  • Apscheduler. Jobstores. Redis: stored in redis
  • Apscheduler. Jobstores. Rethinkdb: stored in rethinkdb
  • Apscheduler. Jobstores. Sqlalchemy: support sqlalchemy database such as mysql, sqlite, etc
  • Apscheduler. Jobstores. They are: they are

Different task storage can be configured in the scheduler configuration (see Scheduler)

The scheduler

The scheduler modes supported by APScheduler are as follows, BlockingScheduler and BackgroundScheduler are commonly used

  • BlockingScheduler: Applies when the scheduler is the only running process in the process. Calling start blocks the current thread and cannot return immediately.
  • BackgroundScheduler: Applies when the scheduler runs in the background of the application. The main thread does not block after a call to start.
  • AsyncIOScheduler: Applies to applications that use the Asyncio module.
  • GeventScheduler: For applications that use the GEvent module.
  • TwistedScheduler: For applications that build Twisted.
  • QtScheduler: For applications that build Qt.

From the previous example, we saw that the scheduler can manipulate tasks (and specify triggers, task storage, and actuators for tasks) and monitor tasks.

scheduler.add_job(job_func, trigger='interval', args=[1].id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
Copy the code

Let’s take a look at each part in detail

  • Scheduler configuration: In add_job, we see that jobStore and executor are both default. APScheduler can specify different task stores and executors as well as initial parameters when defining the scheduler

    from pytz import utc
    
    
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.jobstores.mongodb import MongoDBJobStore
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    
    
    Run different JobStores, Executors, and default parameters using dict
    jobstores = {
        'mongo': MongoDBJobStore(),
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
    executors = {
        'default': ThreadPoolExecutor(20),
        'processpool': ProcessPoolExecutor(5)
    }
    job_defaults = {
        'coalesce': False.'max_instances': 3
    }
    Define the scheduler
    scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
    
    
    def job_func(job_id) :
        print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
    # add task
    scheduler.add_job(job_func, trigger='interval', args=[1].id='1', name='a test job', jobstore='default', executor='processpool', seconds=10)
    Start the scheduler
    scheduler.start()
    Copy the code
  • Action Tasks: The scheduler can add, delete, pause, resume, and modify tasks. Note that the operations only take effect on unexecuted tasks. Executed and ongoing tasks are not affected by these operations.

    • add_job

      scheduler.add_job(job_func, trigger='interval', args=[1].id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
      Copy the code
    • Remove_job: Based on the unique ID of a job, records in the task storage will be deleted when the job is deleted

      scheduler.add_job(myfunc, 'interval', minutes=2.id='my_job_id')
      scheduler.remove_job('my_job_id')
      Copy the code
    • Pausing and root jobs: Pauses and restarts tasks

      scheduler.add_job(myfunc, 'interval', minutes=2.id='my_job_id')
      scheduler.pause_job('my_job_id')
      scheduler.resume_job('my_job_id')
      Copy the code
    • Modifying Jobs: Modifies the configuration of tasks

      job = scheduler.add_job(myfunc, 'interval', minutes=2.id='my_job_id', max_instances=10)
      Modify the attributes of the task
      job.modify(max_instances=6, name='Alternate name')
      Change the trigger of the task
      scheduler.reschedule_job('my_job_id', trigger='cron', minute='* / 5')
      Copy the code
  • Monitoring task event types. Common types are as follows:

    • EVENT_JOB_ERROR: Indicates that an exception occurs during task execution

    • EVENT_JOB_EXECUTED: when a task is executed successfully

    • EVENT_JOB_MAX_INSTANCES: when the number of tasks performed on the scheduler exceeds the configured value

      scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
      Copy the code

Reference: apscheduler. Readthedocs. IO/en/stable/u…

The author is the director of the weDO Maker Lab official account.

Recommended reading:

Forecasting COVID-19 with Python \

Writing music in Python \

Use Pyecharts to draw the epidemic map of China (attached source code) \

Technical circle core call for contributions, up to 1000 yuan/article! \