What is the celery

Celery is a simple, flexible distributed system capable of handling asynchronous tasks, timed tasks and large numbers of messages

Asynchronous task queues that focus on real-time processing

It also supports task scheduling

The Celery frame consists of three parts: AMQP broker, tasks execution unit and Task Result Store

Message middleware

Celery itself does not provide a messaging service but facilitates integration with third party provided messaging middleware including RabbitMQ, Redis etc

Task execution unit

Worker is the task execution unit provided by celery. The Worker runs concurrently in the task node

Task execution results are stored

Tasks result Store is used to store the results of tasks performed by the Worker, Celery supports storing the results of tasks in different ways including AMQP, redis etc

Version Support

Celery version 4.0Runs on Python ❨2.7.3.4.3.5❩
        PyPy ❨5.4.5.5❩
    This is the last version to support Python 2.7.and from the next version (Celery 5.x) Python 3.5 or newer isIf you're running an older version of Celery, you need to be running an older version of Celery: Celery2.6: Celery series 3.1 or earlier.
        Python 2.5: Celery series 3.0 or earlier.
        Python 2.4 was Celery series 2.2 or earlier.

    Celery is a project withMinimal Funding, so we don't support Microsoft Windows. Please don'topen any issues related to that platform.
Copy the code

2. Application scenarios

Asynchronous tasks: submit time-consuming tasks to Celery for asynchronous execution such as SMS/email, push messages, audio & video processing etc

Timed task: To perform something regularly, such as daily statistics

3. Erection and configuration of Celery

Cyl install celery: celery = celery ('Character name',backend='xxx',broker='xxx')

Copy the code

Celery perform asynchronous tasks

The basic use

Create project: CeleryTest

Create the py file :tasks.py

from celery import Celery
import time
broker = 'redis: / / 127.0.0.1:6379/1'
backend = 'redis: / / 127.0.0.1:6379/2'
app = Celery('test',broker=broker,backend=backend)

@app.task
def add(x,y) :
    return x+y


Copy the code

Create the py file :add_task.py and add the task

from tasks import add
result = add.delay(4.5)
print(result.id)

Copy the code

Celery worker -A tasks -l info

Celery worker -A tasks -l info -p eventlet

from tasks import add
if __name__ == '__main__':
    add.worker_main()
    # cel.worker_main(argv=['--loglevel=info')
Copy the code

Create the py file result.py and view the task execution result

from celery.result import AsyncResult
from tasks import add
async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel)
if async.successful():
    result = async.get()
    print(result)
    # result.forget() # delete result
elif async.failed():
    print('Execution failed')
elif async.status == 'PENDING':
    print('Task in wait being executed')
elif async.status == 'RETRY':
    print('Task retry after exception')
elif async.status == 'STARTED':
    print('Task has been executed')
Copy the code

Run add_task.py to add the task and get the task ID

Run. Py or run :celery worker -a tasks -l info -p eventlet

Execute result.py to check the status of the task and obtain the results

Multitask structure

Multi_celery ├ ─ ─ celery_taskCelery related folders│ ├ ─ ─ celery. Py# celery links and configuration related files must be named by this name│ └ ─ ─ tasks1. Py# All task functions│ └ ─ ─ tasks2. Py# All task functions├ ─ ─ result. Py# Check results└ ─ ─ add_task. Py# Trigger task
Copy the code

celery.py

from celery import Celery
# broker: Redis
broker='redis: / / 127.0.0.1:6379/1'
The results are stored in Redis
backend='redis: / / 127.0.0.1:6379/2'
The first argument is an alias, which you can write as you like
# include=[]
app=Celery('test',broker=broker,backend=backend,include=['celery_task.task1'.'celery_task.task2'])


# time zone
app.conf.timezone = 'Asia/Shanghai'
Whether to use UTC
app.conf.enable_utc = False
Copy the code

task1.py

from .celery import app
@app.task
def add(x,y) :
    return x+y
Copy the code

taks2.py

from .celery import app
@app.task
def write_file(s) :
    with open('a.txt'.'a',encoding='utf-8')as f:
        f.write(s)
    return 'Write successfully'
Copy the code

result.py

from celery.result import AsyncResult
Import celery object
from celery_task.celery import app

async = AsyncResult(id="ac2a7e52-ef66-4caa-bffd-81414d869f85", app=app)

if async.successful():
    The result of the task execution, that is, the return value
    result = async.get()
    print(result)
    # result.forget() # delete result
elif async.failed():
    print('Execution failed')
elif async.status == 'PENDING':
    print('Task in wait being executed')
elif async.status == 'RETRY':
    print('Task retry after exception')
elif async.status == 'STARTED':
    print('Task has been executed')
Copy the code

add_task.py

from celery_task import task1
from celery_task import task2

Add a 2+3 task to the queue
result=task1.add.delay(2.3)
print(result.id)
Add a write file task to the queue
result=task2.write_file.delay('lqz')
print(result.id)
Copy the code

Add tasks (run add_task.py), start worker:celery worker -a celery_task -l info -p eventlet, check task execution result (run result.py)

Carry out timed tasks

Set time for celery to perform a task

add_task.py

from celery_app_task import add
from datetime import datetime

Style #
# v1 = datetime(2019, 2, 13, 18, 19, 56)
# print(v1)
# v2 = datetime.utcfromtimestamp(v1.timestamp())
# print(v2)
# result = add.apply_async(args=[1, 3], eta=v2)
# print(result.id)

Way # 2
ctime = datetime.now()
The default time is UTC
utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
from datetime import timedelta
time_delay = timedelta(seconds=10)
task_time = utc_ctime + time_delay

# Use apply_async and set the time
result = add.apply_async(args=[4.3], eta=task_time)
print(result.id)
Copy the code

A scheduled task similar to contab

Celery. Py in the multi-task structure is modified as follows

from datetime import timedelta
from celery import Celery
from celery.schedules import crontab

cel = Celery('tasks', broker='redis: / / 127.0.0.1:6379/1', backend='redis: / / 127.0.0.1:6379/2', include=[
    'celery_task.tasks1'.'celery_task.tasks2',
])
cel.conf.timezone = 'Asia/Shanghai'
cel.conf.enable_utc = False

cel.conf.beat_schedule = {
    Name whatever you want
    'add-every-10-seconds': {
        Run the test_celery function under tasks1
        'task': 'celery_task.tasks1.test_celery'.Run this command every 2 seconds
        # 'schedule' : 1.0,
        # 'schedule': crontab(minute="*/1"),
        'schedule': timedelta(seconds=2),
        # pass parameters
        'args': ('test',)},# 'add-every-12-seconds': {
    # 'task': 'celery_task.tasks1.test_celery',
    # Every year on April 11, at 8:42
    # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4),
    # 'args': (16, 16)
    #}.
}
Copy the code

Start A beat: celery beat -A celery_task-l info

Celery worker -A celery_task -l info -p eventlet

6. use Celery in Django

Create celeryconfig.py in the project

import djcelery

djcelery.setup_loader()
CELERY_IMPORTS = (
    'app01.tasks'.)There are situations to prevent deadlocks
CELERYD_FORCE_EXECV = True
Set the number of concurrent workers
CELERYD_CONCURRENCY = 4
# allow retry
CELERY_ACKS_LATE = True
# A maximum of 100 tasks per worker are destroyed to prevent memory leaks
CELERYD_MAX_TASKS_PER_CHILD = 100
# timeout
CELERYD_TASK_TIME_LIMIT = 12 * 30

Copy the code

Create tasks.py under the app01 directory

from celery import task
import time
@task
def add(x,y) :
    time.sleep(3)
    return x+y
Copy the code

The view function views.py

from django.shortcuts import render,HttpResponse

# Create your views here.
from app01 import tasks

def test(request) :
    result=tasks.add.delay(2.4)
    print(result.id)
    return HttpResponse('ok')

Copy the code

settings.py

INSTALLED_APPS = [
    ...
    'djcelery'.'app01']...from djagocele import celeryconfig
BROKER_BACKEND='redis'
BROKER_URL='redis: / / 127.0.0.1:6379/1'
CELERY_RESULT_BACKEND='redis: / / 127.0.0.1:6379/2'
Copy the code

Worker:

python3 manage.py celery worker
Copy the code