Hi, I’m Sean. See you every week for source code parsing

Celery is a very simple, flexible and reliable distributed system for processing large numbers of messages and provides a full set of tools to operate the system. Celery is also a message queuing tool for processing real-time data as well as task scheduling.

Vine, py-amqp, kombu, celery, celery, celery

  1. Celery source parsing – Vine implementation Promise function
  2. Celery source parsing -py-AMQp implementation of amQP protocol
  3. Celery source parsing – Kombu, a Python implementation of the message library
  4. Celery source code parsing – Kombu enterprise-level algorithm

Basic clear celery base base, we officially enter celery source analysis, this article includes the following several parts:

  • Celery application Example
  • Celery Project Overview
  • Worker starts process tracking
  • The client starts flow tracing
  • Celery app
  • Start process in worker mode
  • summary

Celery application Example

Before starting celery, we start a Redis service using Docker as a broker:

$Docker run -p 6379:6379 -- Name Redis -d Redis :6.2.3-alpineCopy the code

Use Telnet to monitor redis service and observe task scheduling:

$Telnet 127.0.0.1 6379 Trying 127.0.0.1... Connected to localhost. Escape character is '^]'. monitor +OKCopy the code

Our celery service code myapp.py is shown below:

# myapp.py
from celery import Celery

app = Celery(
    'myapp',
    broker='redis://localhost:6379/0',
    result_backend='redis://localhost:6379/0'
)

@app.task
def add(x, y):
    print("add", x, y)
    return x + y

if __name__ == '__main__':
    app.start()
Copy the code

Open a new terminal and start the worker service with the following command:

$ python myapp.py worker -l DEBUG
Copy the code

In normal cases, you can see that the worker starts normally. The banner information will be displayed when started, including the AMQP implementation protocol, task, etc. :

DEBUG $celery - A myapp worker - l -- -- -- -- -- -- -- -- -- -- -- -- -- -- celery @ bogon v5.1.2 (sun - harmonics) - * * * * * -- -- -- -- -- -- - * * * * * * * MacOS - 10.16 - x86_64 - i386-64 - bit 20:33:45 2021-09-08 - * * * - * * * -- -- -- -- -- -- -- -- -- -- -- -- -- -- (config) * * -- -- -- -- -- -- -- -- -- -. > app: myapp:0x7f855079e730 - ** ---------- .> transport: redis://localhost:6379/0 - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 12 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery Lil =celery(celery) [tasks]. Add [2021-09-08 20:33:33:46,220: INFO/MainProcess] Connected to Redis ://localhost:6379/0 [2021-09-08 20:33:46,234: INFO/MainProcess] Mingle: Searching for Neighbors [2021-09-08 20:33:47,279: INFO/MainProcess] Mingle: All Alone [2021-09-08 20:33:47,315: INFO/MainProcess] celery@bogon ready.Copy the code

Open a terminal window and execute the following code as a client. You can see that the add function executes correctly and get the result of 16+16 32. Note: This procedure is performed remotely, using the delay method, and the function print(“add”, x, y) does not print:

$python >>> from myapp import add >>> task = add.delay(16,16) >>> task <AsyncResult: 5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b> >>> task.get() 32Copy the code

In the worker service window of celery you can see output similar to the following. A request is received to execute task myapp.add. The uUID of the request is 5aABFC0B-04b5-4A51-86B0-6a7263e2EF3b. The parameter array is [16, 16].

[the 2021-11-11 20:13:48, 040: INFO/MainProcess] Task myapp.add[5aABFC0B-04b5-4a51-86B0-6a7263e2EF3b] Received [2021-11-11 20:13:48,040: DEBUG/MainProcess] TaskPool: Apply <function fast_trace_task at 0x7fda086baa60> (args:('myapp.add', '5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b', {'lang': 'py', 'task': 'myapp.add', 'id': '5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b', 'parent_id': None, 'argsrepr': '(16, 16)', 'kwargsrepr': '{}', 'origin': 'gen63119@localhost', 'ignore_result': False, 'reply_to': '97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16', 'correlation_id': '5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b', 'hostname': 'celery@localhost', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}, 'args': [16, 16], 'kwargs': {}}, b'[[16, 16], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": Null}] ', 'application/json', 'utf-8') kwargs: {}) [20:13:49 2021-11-11, 059: INFO/ Forkpoolworker-8] Task myapp.add[5aABFC0B-04b5-4a51-86B0-6a7263e2EF3b] Succeeded in 1.0166977809999995s: 32Copy the code

In the Monitor window of Redis, you can see similar output, showing some operation commands to Redis during the process:

+ 1636632828.304020 [0 172.16.0.117:51127] "SUBSCRIBE" celery "- a task - aabfc0b meta - 5-04 b0 a51 b5-4-86-6 a7263e2ef3b" [0 172.16.0.118:591] "celery" [0 172.16.0.118:591] "celery" [0 172.16.0.118:591] "celery" [0 172.16.0.118:591] "celery" [0 172.16.0.118:591] "celery" {\"body\": \"W1sxNiwgMTZdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"myapp.add\", \"id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"parent_id\": null, \"argsrepr\": \"(16, 16)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen63119@localhost\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"reply_to\": \"97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \" 20DBD584-b669-4ef0-8a3B-41D19b354690 \"}}" +1636632828.307040 [0 172.16.0.117:52014] "MULTI" +1636632828.307075 [0 172.16.0.117:52014] "ZADD" "unacked_index" "1636632828.038743" "20DBD584-b669-4EF0-8A3B-41D19b354690" +1636632828.307088 [0 172.16.0.117:52014] "HSET" "unc" "20DBD584-b669-4ef0-8a3B-41d19b354690" "[{\"body\": \"W1sxNiwgMTZdLCB7fSwgeyJjYWxsYmFja3MiOiBudWxsLCAiZXJyYmFja3MiOiBudWxsLCAiY2hhaW4iOiBudWxsLCAiY2hvcmQiOiBudWxsfV0=\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"myapp.add\", \"id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"parent_id\": null, \"argsrepr\": \"(16, 16)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen63119@localhost\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"5aabfc0b-04b5-4a51-86b0-6a7263e2ef3b\", \"reply_to\": \"97a3e117-c8cf-3d4c-97c0-c0a76aaf9a16\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"20dbd584-b669-4ef0-8a3b-41d19b354690\"}}, \"\", \"celery\"]" ...Copy the code

Let’s review the following figure again to compare the examples and understand:

  • We start with a celery worker service as a consumer
  • Start another window to execute tasks as producers
  • Using Redis as a broker, responsible for messaging between producers and consumers
  • Finally, the producer’s task is sent as a message to the remote consumer for execution, and the result of execution is sent back to the producer over the network

The above example shows the execution of Celery as a distributed task scheduling system, with local task calls, wrapped by the AMQP protocol, sent as messages to remote consumer executions.


Celery Project Overview

Parsing celery uses code version 5.0.5, main module structure:

The module describe
app Celery APP implementation
apps Celery service has three main modes, Worker, Beat and Multi
backends Task result store
bin Command line tool implementation
concurrency Various concurrent implementations, including threads, GEvent, Asyncpool, etc
events Event implementation
worker Service startup implementation
beat.py&&schedules.py Timing and scheduling implementation
result.py Task result realization
signals.py Some signal definitions
status.py Some state definitions

From the project structure, there are many modules and complex functions. However, we have already done vine, Py-AMQP and KOMbu, then we only need to understand worker, beat and multi service models, so we can better understand how to build celery distributed system.


Worker starts process tracking

Worker start command celery -a myapp worker-l DEBUG celery as A module, entry in main file main function:

# ch23 - celery celery - 5.0.5 / celery / __main__ py def main () : ""Entrypoint to the 'celery' 'umbrella command.""" """celery command entry """... Celery. Bin. Celery import main as _main sys.exit(_main())Copy the code

Celery command as main command, include-app will also start worker sub-command:

# ch23 - celery celery - 5.0.5 / celery/bin/celery. Py def celery (CTX, app, broker, result_backend, loader, config, workdir, no_color, quiet, version): """Celery command entrypoint.""" ... ctx.obj = CLIContext(app=app, no_color=no_color, workdir=workdir, Quiet =quiet) # User options worker.params.extend(ctx.obj.app.user_options.get('worker', [])) beat.params.extend(ctx.obj.app.user_options.get('beat', [])) events.params.extend(ctx.obj.app.user_options.get('events', [])) def main() -> int: """Start celery umbrella command. This function is the main entrypoint for the CLI. :return: The exit code of the CLI. """ return celery(auto_envvar_prefix="CELERY")Copy the code

Create worker in worker subcommand and start:

# ch23 - celery celery - 5.0.5 / celery/bin/worker. Py def worker (CTX, the hostname = None, pool_cls = None, app = None, uid = None, gid=None, loglevel=None, logfile=None, pidfile=None, statedb=None, **kwargs): Worker = app.Worker(hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, logfile=logfile, # node format handled by celery.app.log.setup pidfile=node_format(pidfile, hostname), statedb=node_format(statedb, hostname), no_color=ctx.obj.no_color, quiet=ctx.obj.quiet, **kwargs) worker.start()Copy the code

Here is to create the worker, and a celery. Apps. Worker: a worker object:

# ch23 - celery celery - 5.0.5 / celery/app/base. Py def Worker (self) : # to create the worker return self. Subclass_with_self (' celery. Apps. Worker: the worker ')Copy the code

During service startup, the call link is as follows:

                                 +----------+
                             +--->app.celery|
                             |   +----------+
+---------+   +----------+   |
|main.main+--->bin.celery+---+
+---------+   +----------+   |
                             |   +----------+   +-----------+
                             +--->bin.worker+--->apps.worker|
                                 +----------+   +-----------+
Copy the code

During this service startup, two applications, include-Application and worker-Application, are created. As for the specific startup process, let’s skip ahead and look at the client-side process first.


Analyze the client startup process

The example client startup process consists of the following 4 steps: 1 create celery-application, 2 create task 3 call delay method of task to execute task to get an asynchronous result 4 finally use get method of asynchronous result to get real result

Task is a Promise object created through the app-created decorator:

Task_celery = celery. Task_celery = celery. Task_celery = task_celery. Task_celery = task_celery. """Decorator to create a task class out of any callable. """ def inner_create_task_cls(shared=True, filter=None, lazy=True, **opts): def _create_task_cls(fun): ret = PromiseProxy(self._task_from_fun, (fun,), opts, __doc__=fun.__doc__) return ret return _create_task_cls return inner_create_task_cls(**opts)Copy the code

Task is actually a dynamically created subclass of the task base class:

def _task_from_fun(self, fun, name=None, base=None, bind=False, **options): base = base or self.Task task = type(fun.__name__, (base,), dict({ 'app': self, 'name': name, 'run': run, '_decorated': True, '__doc__': fun.__doc__, '__module__': fun.__module__, '__annotations__': fun.__annotations__, '__header__': staticmethod(head_from_fun(fun, bound=bind)), '__wrapped__': run}, **options)) add_autoretry_behaviour(task, **options) # add task self._tasks[task.name] = task task.bind(self) # connect task to this app add_autoretry_behaviour(task, **options) return taskCopy the code

The task is executed using the app’s send_task method:

Celery.py def delay(self, celery.py, celery.py, celery.py, celery.py, celery.py, celery.py): return app.send_task( self.name, args, kwargs, task_id=task_id, producer=producer, link=link, link_error=link_error, result_cls=self.AsyncResult, shadow=shadow, task_type=self, **options )Copy the code

Celery-application = celery-application = celery-application = celery-application = celery-application = celery-application = celery-application


Celery APP has two main functions

Celery constructor:

class Celery: Events_cls = 'celery. App. events_cls = 'celery. App. events_cls = 'app. events_cls =' loader_cls = ' None log_cls = 'celery. App. Log: Logging' # control class control_cls = 'celery. App. Control: control' class task_cls = # task 'celery. App. Task: task' # task registry registry_cls = 'celery. App. Registry: TaskRegistry'... def __init__(self, main=None, loader=None, backend=None, amqp=None, events=None, log=None, control=None, set_as_current=True, tasks=None, broker=None, include=None, changes=None, config_source=None, fixups=None, task_cls=None, autofinalize=True, namespace=None, strict_typing=True, **kwargs): Steps = defaultdict(set) # Task self._pending = deque() # All tasks self._tasks = self.registry_cls(self._tasks or {}) ... self.__autoset('broker_url', broker) self.__autoset('result_backend', backend) ... self.on_init() _register_app(self)Copy the code

You can see that the celery class provides the names of some default module classes from which objects can be dynamically created. The processing of app object tasks uses a queue as a task container for pending state, and TaskRegistry is used to manage the registration of tasks.

Tasks are recorded in the TaskRegistry of tasks via task decorator:

def task(self, *args, **opts): ... [task.name] = task task. Bind (self) # connect task to this app add_autoretry_behaviour(task, **options) ...Copy the code

Celery another core function is to provide connectivity to brokers:

def _connection(self, url, userid=None, password=None,
                virtual_host=None, port=None, ssl=None,
                connect_timeout=None, transport=None,
                transport_options=None, heartbeat=None,
                login_method=None, failover_strategy=None, **kwargs):
    conf = self.conf
    return self.amqp.Connection(
        url,
        userid or conf.broker_user,
        password or conf.broker_password,
        virtual_host or conf.broker_vhost,
        port or conf.broker_port,
        transport=transport or conf.broker_transport,
        ssl=self.either('broker_use_ssl', ssl),
        heartbeat=heartbeat,
        login_method=login_method or conf.broker_login_method,
        failover_strategy=(
            failover_strategy or conf.broker_failover_strategy
        ),
        transport_options=dict(
            conf.broker_transport_options, **transport_options or {}
        ),
        connect_timeout=self.either(
            'broker_connection_timeout', connect_timeout
        ),
    )
broker_connection = connection

@cached_property
def amqp(self):
    """AMQP related functionality: :class:`~@amqp`."""
    return instantiate(self.amqp_cls, app=self)
Copy the code

The implementation of AMQP relies on the AMQP protocol encapsulation provided by KOMBU:

from kombu import Connection, Consumer, Exchange, Producer, Queue, pools

class AMQP:
    """App AMQP API: app.amqp."""

    Connection = Connection
Copy the code

Queue, Consumer and Producer are used to generate and consume messages:

def Queues(self, queues, create_missing=None,
           autoexchange=None, max_priority=None):
    ...
    return self.Queues(
            queues, self.default_exchange, create_missing,
            autoexchange, max_priority, default_routing_key,
        )
        
def TaskConsumer(self, channel, queues=None, accept=None, **kw):
    ...
    return self.Consumer(
        channel, accept=accept,
        queues=queues or list(self.queues.consume_from.values()),
        **kw
    )

def _create_task_sender(self):
    ...
    producer.publish(
                body,
                exchange=exchange,
                routing_key=routing_key,
                serializer=serializer or default_serializer,
                compression=compression or default_compressor,
                retry=retry, retry_policy=_rp,
                delivery_mode=delivery_mode, declare=declare,
                headers=headers2,
                **properties
            )
    ...
Copy the code

We have a general understanding of the two main functions of cELERy-app, managing tasks and managing AMQP links.


Start process in worker mode

The Worker mode is started in the WorkController, which divides the service into different phases and then assembles each phase into a way called Blueprint for management:

Class WorkController: # Inner class Blueprint(bootsteds.blueprint): """Worker bootstep blueprint.""" name = 'Worker' default_steps = { 'celery.worker.components:Hub', 'celery.worker.components:Pool', 'celery.worker.components:Beat', 'celery.worker.components:Timer', 'celery.worker.components:StateDB', 'celery.worker.components:Consumer', 'celery.worker.autoscale:WorkerComponent', } def __init__(self, app=None, hostname=None, **kwargs): self.blueprint = self.Blueprint( steps=self.app.steps['worker'], on_start=self.on_start, on_close=self.on_close, on_stopped=self.on_stopped, ) self.blueprint.apply(self, **kwargs)Copy the code

Startup Blueprint:

Def start(self): start worker self.blueprint. Start (self) except WorkerTerminate: self.terminate() except Exception as exc: logger.critical('Unrecoverable error: %r', exc, exc_info=True) self.stop(exitcode=EX_FAILURE) except SystemExit as exc: self.stop(exitcode=exc.code) except KeyboardInterrupt: self.stop(exitcode=EX_FAILURE)Copy the code

Startup steps, relatively simple, roughly the code is as follows:

class StepType(type):
    """Meta-class for steps."""

    name = None
    requires = None

class Step(metaclass=StepType):
    ...
    
    def instantiate(self, name, *args, **kwargs):
        return symbol_by_name(name)(*args, **kwargs)
    
    def include_if(self, parent):
        return self.enabled
        
    def _should_include(self, parent):
        if self.include_if(parent):
            return True, self.create(parent)
        return False, None

    def create(self, parent):
        """Create the step."""
Copy the code

It can be seen from Step that:

  • For each step, there can be dependencies that require
  • Each step can have specific actions instantiate
  • Steps have a tree parent structure and can automatically create parent steps

For example, a consumer step relies on the Connection step. The Connection is consumed at startup time. The codes for both are as follows:

class ConsumerStep(StartStopStep):
    """Bootstep that starts a message consumer."""

    requires = ('celery.worker.consumer:Connection',)
    consumers = None

    def start(self, c):
        channel = c.connection.channel()
        self.consumers = self.get_consumers(channel)
        for consumer in self.consumers or []:
            consumer.consume()

class Connection(bootsteps.StartStopStep):
    """Service managing the consumer broker connection."""

    def __init__(self, c, **kwargs):
        c.connection = None
        super().__init__(c, **kwargs)

    def start(self, c):
        c.connection = c.connect()
        info('Connected to %s', c.connection.as_uri())
Copy the code

Create and manage these steps in Blueprint:

class Blueprint: def __init__(self, steps=None, name=None, on_start=None, on_close=None, on_stopped=None): Self. Name = name or the self. The name or qualname (type (self)) # and collection of the self. The types = set (steps or []) | set (self. Default_steps)... self.steps = {} def apply(self, parent, **kwargs): steps = self.steps = dict(symbol_by_name(step) for step in self.types) self._debug('Building graph... ') for S in self._finalize_steps(steps): step = S(parent, **kwargs) steps[step.name] = step order.append(step) self._debug('New boot order: {%s}', ', '.join(s.alias for s in self.order)) for step in order: step.include(parent) return selfCopy the code

Start the Blueprint:

def start(self, parent):
    self.state = RUN
    if self.on_start:
        self.on_start()
    for i, step in enumerate(s for s in parent.steps if s is not None):
        self._debug('Starting %s', step.alias)
        self.started = i + 1
        step.start(parent)
        logger.debug('^-- substep ok')
Copy the code

The startup process was divided into multiple STEP units, and then the units were combined to build graph and started one by one.


summary

In this section we have formally studied the process of using celery to complete a remote task by using Redis as broker, service as consumer and client as generator. Briefly explore the startup process of worker service mode, focusing on analyzing the implementation of the two functions of cELERy-application management task and management link.

tip

Celery shows a way to create classes and objects dynamically:

task = type(fun.__name__, (Task,), dict({
                'app': self,
                'name': name,
                'run': run,
                '_decorated': True,
                '__doc__': fun.__doc__,
                '__module__': fun.__module__,
                '__annotations__': fun.__annotations__,
                '__header__': staticmethod(head_from_fun(fun, bound=bind)),
                '__wrapped__': run}, **options))()
Copy the code

Create a dynamic task subclass using the type function, and then execute () to instantiate a task subobject.

Refer to the link

  • Programmatically define classes python3 – cookbook. Readthedocs. IO/zh_CN/lates…