This is the 26th day of my participation in the August More Text Challenge

Life is too short to learn Python together

Celery task structure

Write in a py file (not recommended)

# worker file
from celery import Celery
broker='redis: / / 127.0.0.1:6379/1'  # broker task queue
backend='redis: / / 127.0.0.1:6379/2' # Result store
app=Celery(__name__,broker=broker,backend=backend)

# Add task (use this decorator to decorate, @app.task)
@app.task
def add(x,y) :
    print(x,y)
    returnX + y start worker: You need to switch to the path of the py file, Celery_task -l info - Windows :celery worker -A celery_task -l info -p eventletSubmit tasks to the broker file
from celery_task import add
add(3.4)  Execute directly and will not be added to the broker
ret = add.delay(1.2)  Add a task to the broker
print(ret)  # RET is the task number, which is used to obtain the task execution result in the later stage

View the task execution result file
from celery_task import app
from celery.result import AsyncResult
id = '3e397fd7-e0c1-4c5c-999c-2655a96793bb'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('Mission failed')
        elif async.status == 'PENDING':
        print('Task in wait being executed')
    elif async.status == 'RETRY':
        print('Task retry after exception')
    elif async.status == 'STARTED':
        print('Task has been executed')
Copy the code

Package structure

Create a new package with an arbitrary name, such as Celery_task

  • __init__.py/celery.py
from celery import Celery
broker='redis: / / 127.0.0.1:6379/0'  # broker task queue
backend='redis: / / 127.0.0.1:6379/1' # Result store

app=Celery(__name__,broker=broker,backend=backend,include=['celery_task.task1',]),Celery get app object

app=Celery() After Configuring Broker Backend in the configuration file, objects can be instantiated directly without passing parameters

app.config_from_object('celery_task.celery_config') If the configuration in the configuration file conflicts with the configuration in the current file, the configuration in the current file is preferred (partial preference).
Copy the code
  • celery_config.py

The configuration of the app object instantiated, the configuration associated with the celery run can be written in this file

from datetime import timedelta

Configure the app object to be instantiated
BROKER_URL = 'redis: / / 127.0.0.1:6379/2'  # broker task queue
CELERY_RESULT_BACKEND = 'redis: / / 127.0.0.1:6379/3'  # address where results are stored

CELERYD_CONCURRENCY = 2 Celeryworker = celeryworker = celeryworker = celeryworker = celeryworker = celeryworker = celeryworker = celeryworker = celeryworker = celeryworker = celeryworker
CELERYD_MAX_TASKS_PER_CHILD = 5  [celery memory leak] Every worker will die as many tasks as they perform

# task import
CELERY_IMPORTS = (
    'celery_task.task01'CELERY_TIMEZONE =,)'Asia/Shanghai'
CELERY_ENABLE_UTC = False
* celery.py * celery.py * celery.py * celery.py * celery.py * celery.py
CELERYBEAT_SCHEDULE = {
    'delta_task': {'task':'celery_task.task01.add'.'schedule':timedelta(seconds=5),
        'args': ((1.2))}}Copy the code
  • task.py
from .celery import app

@app.task
def add(x,y) :
    print(x,y)
    return x+y
Copy the code
  • add_task.py
from celery_task.task01 import add

ret = add.delay(1.2)
Copy the code

Configuration mode of the celery parameter

  • Through the app.conf. parameter

Configure some common parameters. If some parameters cannot be configured in this mode, you still need to use the configuration file to configure them

app.conf.timezone = 'Asia/Shanghai'
Copy the code
  • Directly in the configuration file, in__init__.pyThrough/celery. Pyapp.config_from_object('celery_task.celery_config')Load the configuration

Common configuration mode, configuration of the app object instantiated, associated configuration during the celery run can be written in this file

# celery_config.py
CELERY_TIMEZONE = 'Asia/Shanghai'

# __init__.py/celery.py
app.config_from_object('celery_task.celery_config')
# 'celery_task.celery_config' == from celery_task import celery_config
Copy the code

supplement

Two ways to start Woker

  • Through the command line

celery worker -A celery_task -l info -P eventlet

  • From app.start(), right click to run
app.start(argv=['celery', 'worker', '-A', 'celery_task','-l','info', '-P', 'eventlet'])
Copy the code

conclusion

The article was first published in the wechat public account Program Yuan Xiaozhuang, at the same time in nuggets.

The code word is not easy, reprint please explain the source, pass by the little friends of the lovely little finger point like and then go (╹▽╹)