Django 2.1.7 tasks 4.3.0 signatures and Primitives task execution process group and chain

Related chapters:

  • Task handler raised error: ValueError(‘not enough values to unp…
  • Django 2.1.7 Integrated Celery 4.3.0 From Introduction to Entry
  • Django 2.1.7 Celery 4.3.0 configuration
  • Django 2.1.7 Celery 4.3.0 Use Celery in projects
  • Django 2.1.7 Celery 4.3.0 Calling Task

Official document

Celery. Readthedocs. IO/en/latest/u…

1. signature

So far in the calling tasks section, we’ve only learned how to use the Delay () and apply_async() methods when calling tasks, which are also very common. But sometimes we don’t want to simply send tasks to a queue, we want to pass a task function (consisting of parameters and execution options) as an argument to another function, in order to do this, Celery uses a thing called signatures.

** Signature ()** Contains the following parameters:

  • Arguments for task calls (i.e. arguments to the task itself, like the arguments in Add (x,y))
  • Debug =false,true
  • Execution options(like countdown to runtime, expirt time).

A signature wraps a single task call with one parameter and execution option. We can pass this signature to the function.

Let’s first look at the task functions defined in the tasks.py module:

from celery_tasks.celery import app as celery_app

Create a task function
@celery_app.task
def my_task1():
    print("Task function (my_task1) is executing....")

@celery_app.task
def my_task2():
    print("Task function (my_task2) is executing....")

@celery_app.task
def my_task3():
    print("Task function (my_task3) is executing....")

@celery_app.task
def my_task4(a,b):
    print("Task function (my_task4) is executing....")
    return a + b
Copy the code

My_task1 () :

# import signature
In [24]:  from celery import signature

Sign the task
In [25]: t1 = signature(my_task1,countdown=1)

# call task
In [27]: t1.delay()
Out[27]: <AsyncResult: dd77773f-e297-47f3-8fe9-42db6fda8da0>

In [28]: 

Copy the code

Let’s see the implementation of celery worker as follows:

My_task4 () ¶

In [28]: t4 = signature(my_task4,args=(20, 30),countdown=1)

In [29]: t4.delay()
Out[29]: <AsyncResult: 88958863-24f5-4314-8690-44c0045e7be9>
Copy the code

Let’s also look at the implementation of celery worker, as follows:

2. Primitives

These Primitives are themselves signature objects, so they can be combined into complex workflows in a variety of ways. The primitives are as follows:

  • Group: A group of tasks executed in parallel that return a set of return values and can be retrieved in order.

  • Chain: tasks are executed one by one. After each task is executed, the return result is passed to the next task function.

The tasks.py module is as follows:

from celery_tasks.celery import app as celery_app

Create a task function
@celery_app.task
def my_task1(a, b):
    print("Task function (my_task1) is executing....")
    return a + b

@celery_app.task
def my_task2(a, b):
    print("Task function (my_task2) is executing....")
    return a + b

@celery_app.task
def my_task3(a, b):
    print("Task function (my_task3) is executing....")
    return a + b
Copy the code

Group cases are as follows:

# import each task
In [1]: from celery_tasks.tasks import my_task1, my_task2, my_task3

# import group
In [2]: from celery import group

# import signature
In [3]: from celery import signature

# to create a signature
In [4]: t1 = signature(my_task1,args=(1, 2),countdown=1)

In [5]: t2 = signature(my_task2,args=(3, 4),countdown=1)

In [6]: t3 = signature(my_task3,args=(5, 6),countdown=1)

Put multiple signatures into the same group
In [7]: my_group = group(t1,t2,t3)

Execute group tasks
In [11]: ret = my_group()

Output the results of each task
In [13]: print(ret.get())
[3, 7, 11]

Copy the code

It can be seen from the worker log of celery that when group tasks are performed, 3 tasks are carried out simultaneously.

Chain cases are as follows:

In [1]: from celery_tasks.tasks import my_task1, my_task2, my_task3

In [2]: from celery import signature

In [3]: from celery import chain

Combine multiple signatures into a task chain
The result of # my_task1 will be passed to my_task2
The result of # my_task2 is passed to my_task3In [4] : my_chain = chain (my_task1. S (10, 10) | my_task2. S (20) | my_task3. S (30))# Execute task chain
In [5]: ret = my_chain()

Output the final result
In [6]: print(ret.get())
70

In [7]: 

Copy the code

Check the worker log as follows:

As you can see, the result of the execution is the sum of the three task executions. My_task1.s (10,10) is also signature.