This article is participating in Python Theme Month. See the link to the event for more details

In Web development, we often encounter asynchronous tasks. For some operations that consume resources and time, it is very bad experience if they are not isolated from the application. For example: In the process of a mobile phone verification code login, when the user enters the mobile phone number and clicks “send”, if it is directly thrown to the back-end application for execution, it will cause network IO blocking, and the whole application will be very unfriendly. How to solve this problem gracefully?

We can use asynchronous tasks. When receiving a request, we can trigger an asynchronous task during the processing of business logic. The front end immediately returns the second count for the user to receive the verification code.

Tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks: tasks

What is Celery?

Introduction of Celery

Celery is a simple, flexible and reliable distributed system written in Python. It can handle a large number of messages and also provides the necessary tools to operate and maintain the distributed system.

Said white dots is, Celery is an asynchronous task scheduling tool, it focused on real-time multitasking, support task scheduling. With Celery, we can quickly set up a distributed task queue and manage it easily. Although Celery is written in Python, the protocol can be implemented in any language. So far, there are Ruby implemented RCelery, node.js implemented nod-celery and a PHP client.

Celery architecture

Let’s use a simple picture to describe the tasks and how they work.

The architecture of Celery consists of the following three parts:

Brokers

For middleware/broker and queue here refers to the task, we must pay attention to the Celery itself is not a task queue, it is a management tool of distributed task queue, in other words, task queue in Celery can quickly the use and management of Celery can be convenient and third party’s task queue integration, For example, RabbitMQ, Redis, etc.

Worker

Task execution unit is the task execution unit provided by the tasks, in simple terms, it is the workers of the tasks. Is a worker, only work ha ha ha.

Similar to the consumer, it monitors the task queue in real time, and when new tasks are queued, it takes them from the task queue and executes them.

backend/Task result store

Task structure storage, as the name implies, it is used to store the results of the tasks executed by the Worker. Tasks can be stored in different ways, such as redis, Memcached, etc.

When the user or a trigger in our application puts the Task in the Brokers queue, the tasks Worker picks up the Task, executes it, and saves the structure to the Task Result Store.

The use of Celery

Simple implementation

In this case, we use Redis for convenience. Click here to see more support for Brokers and RabbitMQ.

First, we create a new tasks.py file.

import time
from celery import Celery

brokers = 'redis: / / 127.0.0.1:6379/0'
backend = 'redis: / / 127.0.0.1:6379/1'

app = Celery('tasks', broker=brokers, backend=backend)

@app.task 
def add(x, y) :
    time.sleep(2)
    return x + y
Copy the code

In this code, we import the celery library, create a celery instance, pass in the broker and backend, and create the task function add. We use time.sleep(2) to simulate the time consuming operation.

Next we need to start the Celery service and run it from the current command line terminal:

celery -A tasks worker  --loglevel=info
Copy the code

Note: If you want to run the following command in Windows:

celery -A celery_app worker --loglevel=info -P eventlet
Copy the code

Otherwise, an error will be reported…

We should see the following output:

D:\use_Celery> celery-tasks worker --loglevel= info-p eventlet -------------- celery@DESKTOP-8E96VUV v4.4.2 (cliffs) - * * * * * -- -- -- -- -- - * * * * * * * - Windows - 10-10.0.18362 - SP0 15:12:19 2021-07-13 - * * * - * * * -- -- -- -- -- -- -- -- -- -- -- -- -- -- (config)  ** ---------- .> app: tasks:0x35a2270 - ** ---------- .> transport: Redis: / / 127.0.0.1:6379/0 - * * -- -- -- -- -- -- -- -- -- - > results: redis: / / 127.0.0.1:6379/1 - * * * - * - > concurrency: 8 (eventlet) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery Tasks.tasks.add [2021-07-13 15:12:1997: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0 [2021-07-13 15:12:19,134: INFO/MainProcess] mingle: mingle: Searching for Neighbors [2021-07-13 15:12:20,161: INFO/MainProcess] Mingle: All alone [2021-07-13 15:12:20,271: The INFO/MainProcess] pidbox: Connected to redis: / / 127.0.0.1:6379/0. [the 2021-07-13 15:12:20, 273: INFO/MainProcess] celery@DESKTOP-8E96VUV ready.Copy the code

The output includes information for starting the Celer application as specified, tasks to register, and so on.

At this time, the worker is in standby state, and there is no task in the broker. We need to trigger the task to enter the broker, and then the worker can fetch the task for execution.

Let’s create a new add_task.py file:

from tasks import add

result = add.delay(5.6)  Tasks are called using the delay interface provided by celery

while not result.ready():
    pass
print("Complete.", result.get())

Copy the code

We can see the output log of the command window for the celery execution:

[2021-07-13 15:14:55,615: INFO/MainProcess] Received task: The tasks. The add [958 eceff e74 - af87 c067-4-6 ae4f94eb80e] [the 2021-07-13 15:14:57, 615: INFO/MainProcess] Task Tasks. add[958eceff- c067-4e74-AF87-6AE4F94eb80e] Succeeded in 2.0s: 11Copy the code

Of course, we can also see information about executing missions in Backend in Redis.

At this point, a simple celery application is complete.

Periodic/Scheduled tasks

If you add tasks in tasks, add tasks in tasks. If you add tasks in tasks, add tasks in tasks. If you add tasks in tasks, add tasks in tasks.

Create celery_conf.py:

from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'add': {
        'task': 'tasks.add'.'schedule': timedelta(seconds=3),
        'args': (16.16)}}Copy the code

Tasks. py then read the tasks configuration via app.config_from_object(‘celery_config’) :

# tasks.py
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
app.config_from_object('celery_config')

Copy the code

Then rerun the worker, followed by beat:

celery -A tasks beat

We can see the following information:

Celery Beat v4.4.2 (cliffs) is starting. __ -... __ -_ LocalTime -> 2021-07-15 09:05:32 Configuration ->. Broker -> redis:// 127.0.0.1:6379/0.loader -> celery.loaders.app.AppLoader . scheduler -> celery.beat.PersistentScheduler . db -> celerybeat-schedule . logfile -> [stderr]@% warning.maxInterval -> 5.00 minutes (300s)Copy the code

Then we can see the start worker’s command line periodically executing the task:

You can see that a task is queued for execution every 3 seconds.

What about scheduled tasks?

It’s very simple, we just need to change the configuration file:

# crontab task
# Call task.add every Thursday at 7:30
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'add-crontab-func': {
        'task': 'tasks.add'.'schedule': crontab(hour=9, minute=20, day_of_week=4),
        'args': (30.20),
    },
}
CELERY_TIMEZONE = 'Asia/Shanghai'  Configure time zone information
Copy the code

Crontab (hour=9, minute=20, day_of_week=4) indicates that the tasks will be executed once every Thursday at 9:20, so long as the tasks are left open, the tasks will be executed as scheduled. Here I also added time zone information to the configuration.

In this case, we started the tasks at 9:17 and ran beat. The output below shows that the task was executed at 20.

conclusion

As we can see, using Celery for distributed queue management will greatly improve our development efficiency. Here is only a brief introduction and use of Celery, if you are interested, you can go to the official document to learn more advanced and systematic use of Celery.

Finally, I would like to thank my girlfriend for her tolerance, understanding and support in work and life.