0 x00 the

Celery is a simple, flexible and reliable distributed system for processing large amounts of messages, focusing on asynchronous task queues for real-time processing, while also supporting task scheduling. Celery is a call to its Worker component to complete specific task processing.

The first half of the Celery initiation process was mentioned in the preceding part and the analysis of the second half is continued in this paper.

0x01 Previous review

As mentioned above, we have come to Worker’s logic formally through a series of processes. We will continue to look at the startup process of follow-up Work as a program in the next part of this paper.

                                     +----------------------+
      +----------+                   |  @cached_property    |
      |   User   |                   |      Worker          |
      +----+-----+            +--->  |                      |
           |                  |      |                      |
           |  worker_main     |      |  Worker application  |
           |                  |      |  celery/app/base.py  |
           v                  |      +----------------------+
 +---------+------------+     |
 |        Celery        |     |
 |                      |     |
 |  Celery application  |     |
 |  celery/app/base.py  |     |
 |                      |     |
 +---------+------------+     |
           |                  |
           |  celery.main     |
           |                  |
           v                  |
 +---------+------------+     |
 |  @click.pass_context |     |
 |       celery         |     |
 |                      |     |
 |                      |     |
 |    CeleryCommand     |     |
 | celery/bin/celery.py |     |
 |                      |     |
 +---------+------------+     |
           |                  |
           |                  |
           |                  |
           v                  |
+----------+------------+     |
|   @click.pass_context |     |
|        worker         |     |
|                       |     |
|                       |     |
|     WorkerCommand     |     |
| celery/bin/worker.py  |     |
+-----------+-----------+     |
            |                 |
            +-----------------+

Copy the code

0x2 Worker as a program

The worker here is actually the business subject, which is worth noting.

Celery /apps/worker.py.

class Worker(WorkController) :
    """Worker as a program."""
Copy the code

The instantiated procedure calls init of the WorkController base class.

Initialization is basically:

  • Loader loads various configurations.
  • Setup_defaults;
  • Setup_instance is formally established, including the configuration of the queue to store messages.
  • Blueprint is used to establish each sub-module within the Worker.

The code is located in celery/apps/worker.py.

class WorkController:
    """Unmanaged worker instance."""

    app = None
    pidlock = None
    blueprint = None
    pool = None
    semaphore = None

    #: contains the exit code if a :exc:`SystemExit` event is handled.
    exitcode = None

    class Blueprint(bootsteps.Blueprint) :
        """Worker bootstep blueprint."""

        name = 'Worker'
        default_steps = {
            'celery.worker.components:Hub'.'celery.worker.components:Pool'.'celery.worker.components:Beat'.'celery.worker.components:Timer'.'celery.worker.components:StateDB'.'celery.worker.components:Consumer'.'celery.worker.autoscale:WorkerComponent',}def __init__(self, app=None, hostname=None, **kwargs) :
        self.app = app or self.app                      # Set app properties
        self.hostname = default_nodename(hostname)      Generate node hostname
        self.startup_time = datetime.utcnow()
        self.app.loader.init_worker()                   # Call app.loader's init_worker() method
        self.on_before_init(**kwargs)                   Call the initialization method
        self.setup_defaults(**kwargs)                   Set the default value
        self.on_after_init(**kwargs)
        self.setup_instance(**self.prepare_args(**kwargs))  # create an example
Copy the code

The init_worker method of app.loader is called.

2.1 loader

The app here. The loader, is set properties of the loader when the Celery is initialized, the value is the default Celery. Loaders. The app: AppLoader. What it does is import various configurations.

Celery /loaders/base.py and is defined as follows:

@cached_property
def loader(self) :
    """Current loader instance."""
    return get_loader_cls(self.loader_cls)(app=self)
Copy the code

Get_loader_cls is as follows:

def get_loader_cls(loader) :
    """Get loader class by name/alias."""
    return symbol_by_name(loader, LOADER_ALIASES, imp=import_from_cwd)
Copy the code

The loader instance is AppLoader, and the init_worker method of the class is called.

def init_worker(self) :
    if not self.worker_initialized:             # if the class has not been set
        self.worker_initialized = True          # set to set
        self.import_default_modules()           Import the default modules
        self.on_worker_init()  
Copy the code

Import_default_modules Import modules that need to be imported in the app configuration file.

def import_default_modules(self) :
    responses = signals.import_modules.send(sender=self.app)
    # Prior to this point loggers are not yet set up properly, need to
    # check responses manually and reraised exceptions if any, otherwise
    # they'll be silenced, making it incredibly difficult to debug.
    for _, response in responses:   Import modules that need to be imported in the project
        if isinstance(response, Exception):
            raise response
    return [self.import_task_module(m) for m in self.default_modules]
Copy the code

2.2 setup_defaults in worker

Continue to analyze the self.setup_defaults method in the initialization process of the Worker class, and set values for the parameters that need to be set during operation.

After that, the self. Pool_cls values for: < class ‘celery. Concurrency. Prefork. TaskPool’ >.

The code is as follows:

def setup_defaults(self, concurrency=None, loglevel='WARN', logfile=None,
                   task_events=None, pool=None, consumer_cls=None,
                   timer_cls=None, timer_precision=None,
                   autoscaler_cls=None,
                   pool_putlocks=None,
                   pool_restarts=None,
                   optimization=None, O=None.# O maps to -O=fair
                   statedb=None,
                   time_limit=None,
                   soft_time_limit=None,
                   scheduler=None,
                   pool_cls=None.# XXX use pool
                   state_db=None.# XXX use statedb
                   task_time_limit=None.# XXX use time_limit
                   task_soft_time_limit=None.# XXX use soft_time_limit
                   scheduler_cls=None.# XXX use scheduler
                   schedule_filename=None,
                   max_tasks_per_child=None,
                   prefetch_multiplier=None, disable_rate_limits=None,
                   worker_lost_wait=None,
                   max_memory_per_child=None, **_kw) :
    either = self.app.either                Get it from the configuration file. If not, use the given default value
    self.loglevel = loglevel                Set the log level
    self.logfile = logfile                  Set up the log file

    self.concurrency = either('worker_concurrency', concurrency)        # set the number of worker processes
    self.task_events = either('worker_send_task_events', task_events)   # Set time
    self.pool_cls = either('worker_pool', pool, pool_cls)               Connection pool Settings
    self.consumer_cls = either('worker_consumer', consumer_cls)         # Consumer Settings
    self.timer_cls = either('worker_timer', timer_cls)                  # time class Settings
    self.timer_precision = either(
        'worker_timer_precision', timer_precision,
    )
    self.optimization = optimization or O                               # Priority setting
    self.autoscaler_cls = either('worker_autoscaler', autoscaler_cls) 
    self.pool_putlocks = either('worker_pool_putlocks', pool_putlocks)
    self.pool_restarts = either('worker_pool_restarts', pool_restarts)
    self.statedb = either('worker_state_db', statedb, state_db)         Execute result store
    self.schedule_filename = either(
        'beat_schedule_filename', schedule_filename,
    )                                                                   # Set scheduled task scheduling
    self.scheduler = either('beat_scheduler', scheduler, scheduler_cls) Get the scheduling class
    self.time_limit = either(
        'task_time_limit', time_limit, task_time_limit)                 Get the limit time value
    self.soft_time_limit = either(
        'task_soft_time_limit', soft_time_limit, task_soft_time_limit,
    )
    self.max_tasks_per_child = either(
        'worker_max_tasks_per_child', max_tasks_per_child,
    )                                                                   # set the maximum number of tasks per child process
    self.max_memory_per_child = either(
        'worker_max_memory_per_child', max_memory_per_child,
   )                                                                   Set the maximum memory value for each child process
    self.prefetch_multiplier = int(either(
        'worker_prefetch_multiplier', prefetch_multiplier,
    ))
    self.disable_rate_limits = either(
        'worker_disable_rate_limits', disable_rate_limits,
    )
    self.worker_lost_wait = either('worker_lost_wait', worker_lost_wait)
Copy the code

2.3 setup_instance in worker

When the execution is complete, the self.setup_instance method is continued to create the instance.

def setup_instance(self, queues=None, ready_callback=None, pidfile=None,
                   include=None, use_eventloop=None, exclude_queues=None,
                   **kwargs) :
    self.pidfile = pidfile                              # pidfile
    self.setup_queues(queues, exclude_queues)           # specify the associated consumption and non-consumption queues
    self.setup_includes(str_to_list(include))           Get all tasks

    # Set default concurrency
    if not self.concurrency:                            # If no default value is set
        try:
            self.concurrency = cpu_count()              Set the number of processes to be the same as the number of cpus
        except NotImplementedError: 
            self.concurrency = 2                        The default is 2 if the fetch fails

    # Options
    self.loglevel = mlevel(self.loglevel)               Set the log level
    self.ready_callback = ready_callback or self.on_consumer_ready  Set the callback function

    # this connection won't establish, only used for params
    self._conninfo = self.app.connection_for_read()     
    self.use_eventloop = (
        self.should_use_eventloop() if use_eventloop is None
        else use_eventloop          
    )                                                   Get the eventloop type
    self.options = kwargs

    signals.worker_init.send(sender=self)               # Send signal

    # Initialize bootsteps
    self.pool_cls = _concurrency.get_implementation(self.pool_cls)  Get the buffer pool class
    self.steps = []                                     # Steps to be performed
    self.on_init_blueprint() 
    self.blueprint = self.Blueprint(
        steps=self.app.steps['worker'],
        on_start=self.on_start,
        on_close=self.on_close,
        on_stopped=self.on_stopped,
    )                                                  Initialize blueprint
    self.blueprint.apply(self, **kwargs)               Call the Apply method of the completed initializing Blueprint class
Copy the code

Setup_queues and Setup_includes do the following.

def setup_queues(self, include, exclude=None) :
    include = str_to_list(include)
    exclude = str_to_list(exclude)
    try:
        self.app.amqp.queues.select(include)        Add queue consumption
    except KeyError as exc:
        raise ImproperlyConfigured(
            SELECT_UNKNOWN_QUEUE.strip().format(include, exc))
    try:
        self.app.amqp.queues.deselect(exclude)      The task in the specified queue is not consumed
    except KeyError as exc:
        raise ImproperlyConfigured(
            DESELECT_UNKNOWN_QUEUE.strip().format(exclude, exc))
    if self.app.conf.worker_direct:
        self.app.amqp.queues.select_add(worker_direct(self.hostname))  Add a queue for consumption

def setup_includes(self, includes) :
    # Update celery_include to have all known task modules, so that we
    # ensure all task modules are imported in case an execv happens.
    prev = tuple(self.app.conf.include)                             Get task from config file
    if includes:
        prev += tuple(includes)
        [self.app.loader.import_task_module(m) for m in includes]   Add the task to the loader task
    self.include = includes                     
    task_modules = {task.__class__.__module__
                    for task in values(self.app.tasks)}             Get the registered task
    self.app.conf.include = tuple(set(prev) | task_modules)         Reconfigure the include after removing the redo
Copy the code

2.3.1 setup_queues

The self. The app. The closer. The queues. The select (include) will set the queues.

The stack is as follows:

__init__, amqp.py:59
Queues, amqp.py:259
queues, amqp.py:572
__get__, objects.py:43
setup_queues, worker.py:172
setup_instance, worker.py:106
__init__, worker.py:99
worker, worker.py:326
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
worker_main, base.py:374
Copy the code

The code is in: celery/app/amqp.py

class Queues(dict) :
    """Queue Name crooked declaration Mapping """

    #: If set, this is a subset of queues to consume from.
    #: The rest of the queues are then used for routing only.
    _consume_from = None

    def __init__(self, queues=None, default_exchange=None,
                 create_missing=True, autoexchange=None,
                 max_priority=None, default_routing_key=None) :
        dict.__init__(self) self.aliases = WeakValueDictionary() self.default_exchange = default_exchange self.default_routing_key =  default_routing_key self.create_missing = create_missing self.autoexchange = Exchangeif autoexchange is None else autoexchange
        self.max_priority = max_priority
        if queues is not None and not isinstance(queues, Mapping):
            queues = {q.name: q for q in queues}
        queues = queues or {}
        for name, q in queues.items():
            self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
Copy the code

So the present is as follows:

                                     +----------------------+
      +----------+                   |  @cached_property    |
      |   User   |                   |      Worker          |
      +----+-----+            +--->  |                      |
           |                  |      |                      |
           |  worker_main     |      |  Worker application  |
           |                  |      |  celery/app/base.py  |
           v                  |      +----------+-----------+
 +---------+------------+     |                 |
 |        Celery        |     |                 |
 |                      |     |                 |
 |  Celery application  |     |                 v
 |  celery/app/base.py  |     |  +--------------+--------------+    +---> app.loader.init_worker
 |                      |     |  | class Worker(WorkController)|    |
 +---------+------------+     |  |                             |    |
           |                  |  |                             +--------> setup_defaults
           |  celery.main     |  |    Worker as a program      |    |
           |                  |  |   celery/apps/worker.py     |    |
           v                  |  +-----------------------------+    +---> setup_instance +-----> setup_queues  +------>  app.amqp.queues
 +---------+------------+     |
 |  @click.pass_context |     |
 |       celery         |     |
 |                      |     |
 |                      |     |
 |    CeleryCommand     |     |
 | celery/bin/celery.py |     |
 |                      |     |
 +---------+------------+     |
           |                  |
           |                  |
           |                  |
           v                  |
+----------+------------+     |
|   @click.pass_context |     |
|        worker         |     |
|                       |     |
|                       |     |
|     WorkerCommand     |     |
| celery/bin/worker.py  |     |
+-----------+-----------+     |
            |                 |
            +-----------------+
Copy the code

Mobile phone as shown below:

2.4 the Blueprint

The following is to establish each sub-module inside Worker.

In the process of worker initialization, the execution order of its internal sub-modules is defined by a BluePrint class and sorted according to the dependencies between modules (in fact, such dependencies are organized into a DAG). This is done by loading default_steps from the Blueprint class.

The specific logic is:

  • The self.claim_Steps method gets the defined steps
  • _finalize_STEPS Retrieves step dependencies and sorts them topologically, returning steps sorted by dependency.
  • By dependency, return step after dependency sort.
  • Once the step is generated, the call is made to generate the component.
  • At the end of apply, the worker is returned. When all classes are initialized, a worker will be initialized.

Celery /apps/worker.py The Blueprint class is defined in the Worker at this point.

class Blueprint(bootsteps.Blueprint) :
    """Worker bootstep blueprint."""

    name = 'Worker'
    default_steps = {
        'celery.worker.components:Hub'.'celery.worker.components:Pool'.'celery.worker.components:Beat'.'celery.worker.components:Timer'.'celery.worker.components:StateDB'.'celery.worker.components:Consumer'.'celery.worker.autoscale:WorkerComponent',}Copy the code

Each module in celery Worker is set as step as follows:

  • Timer: A Timer used to perform a scheduled task, different from the Timer used for a Consumer;

  • Hub: encapsulating object of Eventloop;

  • Pool: Construct various execution pools (thread/process/coroutine);

  • Autoscaler: used for automatic growth or unit of work in a pool;

  • StateDB: Persist the data of worker restart interval (only restart);

  • Autoreloader: Used to automatically load modified code;

  • Beat: Creates the Beat process, but runs it as a child process (as opposed to the Beat argument on the command line);

Tasks tasks include tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks, tasks.

class Hub(bootsteps.StartStopStep) :
    """Worker starts the event loop."""
    requires = (Timer,)

class Consumer(bootsteps.StartStopStep) :
    """Bootstep starting the Consumer blueprint."""
    last = True
Copy the code

2.5 the Blueprint of the base class

Apply calls base class code. The base class is in celery/bootsteps.py.

class Blueprint:
    """Blueprint containing bootsteps that can be applied to objects. """

    GraphFormatter = StepFormatter
    name = None
    state = None
    started = 0
    default_steps = set()
    state_to_name = {
        0: 'initializing',
        RUN: 'running',
        CLOSE: 'closing',
        TERMINATE: 'terminating',}def __init__(self, steps=None, name=None,
                 on_start=None, on_close=None, on_stopped=None) :
        self.name = name or self.name or qualname(type(self))
        self.types = set(steps or []) | set(self.default_steps)
        self.on_start = on_start
        self.on_close = on_close
        self.on_stopped = on_stopped
        self.shutdown_complete = Event()
        self.steps = {}
Copy the code

The apply code is as follows, which is executed when the WorkController is initialized.

def apply(self, parent, **kwargs) :
    """Apply the steps in this blueprint to an object. This will apply the ``__init__`` and ``include`` methods of each step, with the object as argument:: step = Step(obj) ... step.include(obj) For :class:`StartStopStep` the services created will also be added to the objects ``steps`` attribute. "" "
    self._debug('Preparing bootsteps.')
    order = self.order = []                         This is used to store module classes that need to be executed in order
    steps = self.steps = self.claim_steps()         {step. Name, step

    self._debug('Building graph... ')
    for S in self._finalize_steps(steps):                   After the dependency sort, return the corresponding step
        step = S(parent, **kwargs)                          Get the instantiated step
        steps[step.name] = step                             # step. Name = key,step = val
        order.append(step)                                  Add to the order list
    self._debug('New boot order: {%s}'.', '.join(s.alias for s in self.order))
    for step in order:                             Walk through the order list
        step.include(parent)                       # execute include function for each step and create function for each step
    return self
Copy the code

2.5.1 Getting the defined Steps

The self.claim_steps method retrieves the defined steps as follows:

def claim_steps(self) :
    return dict(self.load_step(step) for step in self.types)Import classes in types and return the k:v dictionary of the named class

def load_step(self, step) :
    step = symbol_by_name(step)
    return step.name, step
Copy the code

Where self.types can be passed at initialization.

def __init__(self, steps=None, name=None,
             on_start=None, on_close=None, on_stopped=None) :
    self.name = name or self.name or qualname(type(self))
    self.types = set(steps or []) | set(self.default_steps)
    self.on_start = on_start
    self.on_close = on_close
    self.on_stopped = on_stopped
    self.shutdown_complete = Event()
    self.steps = {}
Copy the code

Steps are not passed in at Blueprint initialization, so the types property is default_STEPS at this point, which is the Blueprint class default_STEPS value in the WorkController class.

default_steps = {
    'celery.worker.components:Hub'.'celery.worker.components:Pool'.'celery.worker.components:Beat'.'celery.worker.components:Timer'.'celery.worker.components:StateDB'.'celery.worker.components:Consumer'.'celery.worker.autoscale:WorkerComponent',}Copy the code

2.5.2 _finalize_steps

_finalize_steps retrieves step dependencies, sorts the topology, and returns steps sorted by dependency.

The focus is on the self._finalize_steps(steps) function, where sorting is done.

def _finalize_steps(self, steps) :
        last = self._find_last()                                Last =True step (Consumer
        self._firstpass(steps)                                  Add step and required dependencies to the Steps list and get the dependencies
        it = ((C, C.requires) for C in values(steps))           (a,[b, C]),(b,[e])
        G = self.graph = DependencyGraph(                       # Dependency graph model initialization, adding points and boundaries
            it, formatter=self.GraphFormatter(root=last),
        )
        if last:                                                Add last to the step boundary for all modules
            for obj in G:
                ifobj ! = last: G.add_edge(last, obj)try:
            return G.topsort()                                  # Perform topological operations to obtain the sorted steps list
        except KeyError as exc:
            raise KeyError('unknown bootstep: %s' % exc)

Copy the code

A topology class, DependencyGraph, was defined to add fixed points and edge structures:

  • The vertices are the steps.
  • An edge structure is a step-dependent list.
  • The structure is {step1:[step2,step3]}.

Let’s look at the initialization action in the DependencyGraph class

@python_2_unicode_compatible
class DependencyGraph(object) :

    def __init__(self, it=None, formatter=None) :
        self.formatter = formatter or GraphFormatter()
        self.adjacent = {}                                              # Store graph structure
        if it is not None:
            self.update(it)

    def update(self, it) :
        tups = list(it)
        for obj, _ in tups:
            self.add_arc(obj)
        for obj, deps in tups:
            for dep in deps:
                self.add_edge(obj, dep)

    def add_arc(self, obj) :                                               # add fixed point
        self.adjacent.setdefault(obj, [])

    def add_edge(self, A, B) :                                             # add boundary
        self[A].append(B)
Copy the code

The graph structure has generated a dependency graph between the various modules, mainly relying on a set of DAG implemented by celery itself, relying on the topological sorting method to get the execution order. Topological sort is to implement dependency sort, generate the execution order of the modules, and then execute the modules sequentially.

2.5.3 Returning step after dependency sort

This part of the job is to return the step after the dependency sort, based on the dependency.

Because there are interdependent implementations of these classes, such as the Hub class,

class Hub(bootsteps.StartStopStep) :
    """Worker starts the event loop."""

    requires = (Timer,)
Copy the code

The Hub class depends on the Timer class, so the job of _finalize_steps is to import the dependent class first.

At this point, continue to analyze the order list, which is a list of all classes after the dependency order is resolved, and these steps classes inherit directly or indirectly from bootsteps.step.

@with_metaclass(StepType)
class Step(object) :.Copy the code

This class uses a metaclass. Continue to look at StepType.

class StepType(type) :
    """Meta-class for steps."""

    name = None
    requires = None

    def __new__(cls, name, bases, attrs) :
        module = attrs.get('__module__')                                Get the __module__ attribute
        qname = '{0}. {1}'.format(module, name) if module else name      Set qname to.name if __module__ is obtained, otherwise set to name
        attrs.update(
            __qualname__=qname,
            name=attrs.get('name') or qname,
        )                                                               # The attributes,name and __qualname__ in the class to be created are updated to the given type
        return super(StepType, cls).__new__(cls, name, bases, attrs)

    def __str__(self) :
        return bytes_if_py2(self.name)

    def __repr__(self) :
        return bytes_if_py2('step:{0.name}{{{0.requires! r}}}'.format(self))

Copy the code

We use our knowledge of Python metaclass programming to control the attributes of a class by controlling the values of the attributes when creating a new instance of the class. The include method of Step is called.

def _should_include(self, parent) :
    if self.include_if(parent):
        return True, self.create(parent)
    return False.None

def include(self, parent) :
    return self._should_include(parent)[0]
Copy the code

If StartStopStep is inherited, the include method is called as follows,

def include(self, parent) :
    inc, ret = self._should_include(parent)
    if inc:
        self.obj = ret
        parent.steps.append(self)
    return inc
Copy the code

After sorting, example of variable data is as follows:

order = {list: 7} 
 0 = {Timer} <step: Timer>
 1 = {Hub}  
 2 = {Pool} <step: Pool>
 3 = {WorkerComponent} <step: Autoscaler>
 4 = {StateDB} <step: StateDB>
 5 = {Beat} <step: Beat>
 6 = {Consumer} <step: Consumer>
 __len__ = {int} 7
 
 
steps = {dict: 7} 
 'celery.worker.components.Timer' = {Timer} <step: Timer>
 'celery.worker.components.Pool' = {Pool} <step: Pool>
 'celery.worker.components.Consumer' = {Consumer} <step: Consumer>
 'celery.worker.autoscale.WorkerComponent' = {WorkerComponent} <step: Autoscaler>
 'celery.worker.components.StateDB' = {StateDB} <step: StateDB>
 'celery.worker.components.Hub' = {Hub} <step: Hub>
 'celery.worker.components.Beat' = {Beat} <step: Beat>
 __len__ = {int} 7
Copy the code

2.5.4 Generating Components

Once the step is generated, the call is made to generate the component.

for step in order:
    step.include(parent)
Copy the code

For each component, the call continues. Execute the include function and create function in turn for each step

def include(self, parent) :
    return self._should_include(parent)[0]
Copy the code

If called, the component is created, such as self for timer and parent for worker instance

< a class ‘celery. Worker. The worker. WorkController’ >.

The code is as follows:

def _should_include(self, parent) :
    if self.include_if(parent):
        return True, self.create(parent)
    return False.None
    
def include_if(self, w) :
    return w.use_eventloop
Copy the code

In the case of Timer,

class Timer(bootsteps.Step) :
    """Timer bootstep."""

    def create(self, w) :
        if w.use_eventloop:
            # does not use dedicated timer thread.
            w.timer = _Timer(max_interval=10.0)
Copy the code

The stack looks like this:

create, components.py:36
_should_include, bootsteps.py:335
include, bootsteps.py:339
apply, bootsteps.py:211
setup_instance, worker.py:139
__init__, worker.py:99
worker, worker.py:326
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
worker_main, base.py:374
<module>, myTest.py:26
Copy the code

2.5.5 return worker

At the end of apply, the worker is returned. When all classes are initialized, a worker will be initialized.

def apply(self, parent, **kwargs) :
    """Apply the steps in this blueprint to an object. This will apply the ``__init__`` and ``include`` methods of each step, with the object as argument:: step = Step(obj) ... step.include(obj) For :class:`StartStopStep` the services created will also be added to the objects ``steps`` attribute. "" "
    self._debug('Preparing bootsteps.')
    order = self.order = []
    steps = self.steps = self.claim_steps()

    self._debug('Building graph... ')
    for S in self._finalize_steps(steps):
        step = S(parent, **kwargs)
        steps[step.name] = step
        order.append(step)
    self._debug('New boot order: {%s}'.', '.join(s.alias for s in self.order))
    for step in order:
        step.include(parent)
    return self
Copy the code

The self is as follows:

self = {Blueprint} <celery.worker.worker.WorkController.Blueprint object at 0x7fcd3ad33d30>
 GraphFormatter = {type} <class 'celery.bootsteps.StepFormatter'>
 alias = {str} 'Worker'
 default_steps = {set: 7} {'celery.worker.components:Hub'.'celery.worker.components:Consumer'.'celery.worker.components:Beat'.'celery.worker.autoscale:WorkerComponent'.'celery.worker.components:StateDB'.'celery.worker.components:Timer'.'celery.worker.components:Pool'}
 graph = {DependencyGraph: 7} celery.worker.autoscale.WorkerComponent(3)\n     celery.worker.components.Pool(2)\n          celery.worker.components.Hub(1)\n               celery.worker.components.Timer(0)\ncelery.worker.components.StateDB(0)\ncelery.worker.components.Hub(1)\n     celery.worker.components.Timer(0)\ncelery.worker.components.Consumer(12)\n     celery.worker.autoscale.WorkerComponent(3)\n          celery.worker.components.Pool(2)\n               celery.worker.components.Hub(1)\n                    celery.worker.components.Timer(0)\n     celery.worker.components.StateDB(0)\n     celery.worker.components.Hub(1)\n          celery.worker.components.Timer(0)\n     celery.worker.components.Beat(0)\n     celery.worker.components.Timer(0)\n     celery.worker.components.Pool(2)\n          celery.worker.components.Hub(1)\n               celery.worker.components.Timer(0)\ncelery.worker.components.Beat(0)\ncelery.worker.components.Timer(0)\ncelery.worker.components.Pool(2)\n     celery.worker.components.Hub(1)\n          celery.worker.co...
 name = {str} 'Worker'
 order = {list: 7} [<step: Timer>, <step: Hub>, <step: Pool>, <step: Autoscaler>, <step: StateDB>, <step: Beat>, <step: Consumer>]
 shutdown_complete = {Event} <threading.Event object at 0x7fcd3ad33b38>
 started = {int} 0
 state = {NoneType} None
 state_to_name = {dict: 4} {0: 'initializing'.1: 'running'.2: 'closing'.3: 'terminating'}
 steps = {dict: 7} {'celery.worker.autoscale.WorkerComponent': <step: Autoscaler>, 'celery.worker.components.StateDB': <step: StateDB>, 'celery.worker.components.Hub': <step: Hub>, 'celery.worker.components.Consumer': <step: Consumer>, 'celery.worker.components.Beat': <step: Beat>, 'celery.worker.components.Timer': <step: Timer>, 'celery.worker.components.Pool': <step: Pool>}
 types = {set: 7} {'celery.worker.autoscale:WorkerComponent'.'celery.worker.components:StateDB'.'celery.worker.components:Hub'.'celery.worker.components:Consumer'.'celery.worker.components:Beat'.'celery.worker.components:Timer'.'celery.worker.components:Pool'}
Copy the code

Here it is:

                                     +----------------------+
      +----------+                   |  @cached_property    |
      |   User   |                   |      Worker          |
      +----+-----+            +--->  |                      |
           |                  |      |                      |
           |  worker_main     |      |  Worker application  |
           |                  |      |  celery/app/base.py  |
           v                  |      +----------+-----------+
 +---------+------------+     |                 |
 |        Celery        |     |                 |
 |                      |     |                 |
 |  Celery application  |     |                 v
 |  celery/app/base.py  |     |  +--------------+--------------+    +---> app.loader.init_worker
 |                      |     |  | class Worker(WorkController)|    |
 +---------+------------+     |  |                             |    |
           |                  |  |                             +--------> setup_defaults
           |  celery.main     |  |    Worker as a program      |    |
           |                  |  |   celery/apps/worker.py     |    |
           v                  |  +-----------------------------+    +---> setup_instance +-----> setup_queues  +------>  app.amqp.queues
 +---------+------------+     |                                                +
 |  @click.pass_context |     |                                                |
 |       celery         |     |                 +------------------------------+
 |                      |     |                 |       apply
 |                      |     |                 |
 |    CeleryCommand     |     |                 v
 | celery/bin/celery.py |     |
 |                      |     |  +-------------------------------------+        +---------------------+     +--->  claim_steps
 +---------+------------+     |  | class Blueprint(bootsteps.Blueprint)|        |  classBlueprint | | | | | +------>-+ | +-------> _finalize_steps | | | | | | | | | | celery/apps/worker.py | | celery/bootsteps.py | | +--> Timer v | +-------------------------------------+ +---------------------+ +---> include +--->+ +----------+------------+ | +--> Hub | @click.pass_context | | | | worker | | +--> Pool | | | | | | | +--> ......  | WorkerCommand | | | | celery/bin/worker.py  |     |                                                                                                 +--> Consumer
+-----------+-----------+     |
            | 1  app.Worker   |
            +-----------------+

Copy the code

The mobile phone is as follows:

0x3 Start in worker command

Celery /bin/worker.py = worker

worker.start()
Copy the code

Recall the previous code:

def worker(ctx, hostname=None, pool_cls=None, app=None, uid=None, gid=None,
           loglevel=None, logfile=None, pidfile=None, statedb=None,
           **kwargs) :
    """Start worker instance. """
    app = ctx.obj.app

    if kwargs.get('detach'.False) :return detach(...)

    worker = app.Worker(...)
    
    worker.start()         # Let's go back here
    return worker.exitcode
Copy the code

3.1 start in Worker as a program

The method called is: celery/worker/worker.py. As you can see, this is a direct call to the Start function of Blueprint, which starts each component in blueprint.

def start(self) :
    try:
        self.blueprint.start(self)           Blueprint's start method is called at this point
    except WorkerTerminate:
        self.terminate()
    except Exception as exc:
        self.stop(exitcode=EX_FAILURE)
    except SystemExit as exc:
        self.stop(exitcode=exc.code)
    except KeyboardInterrupt:
        self.stop(exitcode=EX_FAILURE)
Copy the code

3.2 start in the blueprint

The code is: celery/bootsteps.py

Parent. steps is added to the array in step. Include. Parent. steps is currently [Hub,Pool,Consumer], and worker on_start is called. This example is as follows:

parent.steps = {list: 3} 
 0 = {Hub} <step: Hub>
 1 = {Pool} <step: Pool>
 2 = {Consumer} <step: Consumer>
Copy the code

The specific start code is as follows:

class Blueprint:
    """Blueprint containing bootsteps that can be applied to objects. """

    def start(self, parent) :
        self.state = RUN
        if self.on_start:
            self.on_start()
        for i, step in enumerate(s for s in parent.steps if s is not None):
            self.started = i + 1
            step.start(parent)
Copy the code

3.2.1 Callback on_start in Worker

Blueprint first calls on_start in Worker.

Celery /apps/worker.py

Concrete is:

  • Set the app;
  • On_start with the parent class;
  • Print startup information;
  • Register the corresponding signal processing handler;
  • Do relevant configuration such as redirection;
class Worker(WorkController) :
    """Worker as a program."""

    def on_start(self) :
        app = self.app                                                  # set the app
        WorkController.on_start(self)                                   Call on_start of the parent class

        # this signal can be used to, for example, change queues after
        # the -Q option has been applied.
        signals.celeryd_after_setup.send(
            sender=self.hostname, instance=self, conf=app.conf,
        )

        if self.purge:
            self.purge_messages()

        if not self.quiet:
            self.emit_banner()                                     Print startup information

        self.set_process_status('-active-')
        self.install_platform_tweaks(self)                         Register the appropriate signal processing handler
        if not self._custom_logging and self.redirect_stdouts:
            app.log.redirect_stdouts(self.redirect_stdouts_level)

        # TODO:Please Remove the following code in Celery 6.0
        # This qualifies as a hack for issue #6366.
        warn_deprecated = True
        config_source = app._config_source
        if isinstance(config_source, str) :# Don't raise the warning when the settings originate from
            # django.conf:settings
            warn_deprecated = config_source.lower() not in [
                'django.conf:settings',]Copy the code

3.2.2 on_start base class

Celery /apps/worker.py

def on_start(self) :
    app = self.app
    WorkController.on_start(self)
Copy the code

WorkController code in: celery/worker/worker.py

On_start of the parent class creates the PID file.

def on_start(self) :
    if self.pidfile:
        self.pidlock = create_pidlock(self.pidfile)
Copy the code

3.2.3 Signal Processing Handler

Where the function to register the relevant signal processing handler is as follows:

def install_platform_tweaks(self, worker) :
    """Install platform specific tweaks and workarounds."""
    if self.app.IS_macOS:
        self.macOS_proxy_detection_workaround()

    # Install signal handler so SIGHUP restarts the worker.
    if not self._isatty:
        # only install HUP handler if detached from terminal,
        # so closing the terminal window doesn't restart the worker
        # into the background.
        if self.app.IS_macOS:
            # macOS can't exec from a process using threads.
            # See https://github.com/celery/celery/issues#issue/152
            install_HUP_not_supported_handler(worker)
        else:
            install_worker_restart_handler(worker)                      # 注册重启的信号 SIGHUP
    install_worker_term_handler(worker)             
    install_worker_term_hard_handler(worker)
    install_worker_int_handler(worker)                                  
    install_cry_handler()                                               # SIGUSR1 signal processing function
    install_rdb_handler()                                               # SIGUSR2 signal processing function
Copy the code

Looking at the restart function separately,

def _reload_current_worker() :
    platforms.close_open_fds([
        sys.__stdin__, sys.__stdout__, sys.__stderr__,
    ])Close the file descriptor that is already open
    os.execv(sys.executable, [sys.executable] + sys.argv)Reload the program
Copy the code

And:

def restart_worker_sig_handler(*args) :
    """Signal handler restarting the current python program."""
    set_in_sighandler(True)
    safe_say('Restarting celery worker ({0})'.format(' '.join(sys.argv)))
    import atexit
    atexit.register(_reload_current_worker)                 Register the function executed when the program exits
    from celery.worker import state
    state.should_stop = EX_OK                               # set state
platforms.signals[sig] = restart_worker_sig_handler

Copy the code

The ffices.signals class sets the setitem method,

def __setitem__(self, name, handler) :
    """Install signal handler. Does nothing if the current platform has no support for signals, or the specified signal in particular. """
    try:
        _signal.signal(self.signum(name), handler)
    except (AttributeError, ValueError):
        pass

Copy the code

At this point, the corresponding handler is set up in the running program. _signal is the imported signal library.

3.2.4 Call step one by one

Blueprint then calls step’s start each time.

Parent. steps is added to the array in step. Include. Parent. steps is currently set to [Hub,Pool,Consumer], and worker on_start is called.

parent.steps = {list: 3} 
 0 = {Hub} <step: Hub>
 1 = {Pool} <step: Pool>
 2 = {Consumer} <step: Consumer>
Copy the code

Proceed with Blueprint’s start method at this point,

In the parent. Steps method, iterate through the Hub,Pool, and Consumer, calling step’s start method.

def start(self, parent) :
    self.state = RUN                                        Set the current running state
    if self.on_start:                                       Execute the method if the initialization is passed in
        self.on_start()
    for i, step in enumerate(s for s in parent.steps if s is not None) :Step through and call step's start method
        self._debug('Starting %s', step.alias)
        self.started = i + 1
        step.start(parent)
        logger.debug('^-- substep ok')
Copy the code
3.2.4.1 Hub

Since the Hub overrides the start method, which does nothing,

def start(self, w) :
    pass
Copy the code
3.2.4.2 Pool

If you continue to call the Pool method, StartStopStep will be called, obj will be the object returned by the create method, obj will be the Pool instance,

def start(self, parent) :
    if self.obj:
        return self.obj.start()

Copy the code

We will explain the details later.

3.2.4.3 Consumer

Continue calling the start method of Consumer,

def start(self) :
    blueprint = self.blueprint
    whileblueprint.state ! = CLOSE:Check whether the current state is closed
        maybe_shutdown()                                      Use the flag to determine if it should be closed
        if self.restart_count:                                # if reset times is set
            try:
                self._restart_state.step()                    # reset
            except RestartFreqExceeded as exc:
                crit('Frequent restarts detected: %r', exc, exc_info=1)
                sleep(1)
        self.restart_count += 1                               Number of times plus 1
        try: 
            blueprint.start(self)                             Call the start method
        except self.connection_errors as exc:
            # If we're not retrying connections, no need to catch
            # connection errors
            if not self.app.conf.broker_connection_retry:
                raise
            if isinstance(exc, OSError) and exc.errno == errno.EMFILE:
                raise  # Too many open files
            maybe_shutdown()
            ifblueprint.state ! = CLOSE:If the state is not closed
                if self.connection:
                    self.on_connection_error_after_connected(exc)
                else:
                    self.on_connection_error_before_connected(exc)
                self.on_close()
                blueprint.restart(self)                               Call the restart method

Copy the code

Consumer also has Blueprint, with the following steps:

  • Connection: Manages the Connection to the broker
  • Events: Sends monitoring Events
  • Agent:cell actor
  • Mingle: Used to synchronize status between different workers
  • Tasks: Starts the message Consumer
  • Gossip: Consume events from other workers
  • Heart: to send a heartbeat event (a consumer’s heartbeat)
  • Control: remote command management service

We now enter the start method of Blueprint, whose steps value is passed in by the Consumer at initialization.

The incoming steps is Agent, Connection, Evloop, Control, and Events, Gossip, Heart, Mingle, the Tasks instances of the class, Then according to the call finally added to the parent. The steps of the instance is [Connection, Events, Heart, Mingle, the Tasks, Control, Gossip, Evloop], this is, in turn, calls the start method of instance.

Let’s first examine the Connection start method,

def start(self, c) :
    c.connection = c.connect()
    info('Connected to %s', c.connection.as_uri())

Copy the code

By calling the connect() function of consumer,

def connect(self) :
    """Establish the broker connection. Retries establishing the connection if the :setting:`broker_connection_retry` setting is enabled """
    conn = self.app.connection_for_read(heartbeat=self.amqheartbeat)        # a heartbeat

    # Callback called for each retry while the connection
    # can't be established.
    def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP) :
        if getattr(conn, 'alt'.None) and interval == 0:
            next_step = CONNECTION_FAILOVER
        error(CONNECTION_ERROR, conn.as_uri(), exc,
              next_step.format(when=humanize_seconds(interval, 'in'.' ')))

    # remember that the connection is lazy, it won't establish
    # until needed.
    if not self.app.conf.broker_connection_retry:                           # If retry disables
        # retry disabled, just call connect directly.
        conn.connect()                                                      # Direct connection
        return conn                                                         # returns conn

    conn = conn.ensure_connection(
        _error_handler, self.app.conf.broker_connection_max_retries,
        callback=maybe_shutdown,
    )                                                                       # Make sure you connect
    if self.hub:
        conn.transport.register_with_event_loop(conn.connection, self.hub)  Use asynchronous calls
    return conn                                                             # returns conn

Copy the code

At this point, the connection is established.

Continuing with the Task start method,

def start(self, c) :
    """Start task consumer."""
    c.update_strategies()                                           Update known tasks

    # - RabbitMQ 3.3 completely redefines how basic_qos works..
    # This will detect if the new qos smenatics is in effect,
    # and if so make sure the 'apply_global' flag is set on qos updates.
    qos_global = not c.connection.qos_semantics_matches_spec

    # set initial prefetch count
    c.connection.default_channel.basic_qos(
        0, c.initial_prefetch_count, qos_global,
    )                                                               # set count

    c.task_consumer = c.app.amqp.TaskConsumer(
        c.connection, on_decode_error=c.on_decode_error,
    )                                                               # Start spending

    def set_prefetch_count(prefetch_count) :
        return c.task_consumer.qos(
            prefetch_count=prefetch_count,
            apply_global=qos_global,
        )
    c.qos = QoS(set_prefetch_count, c.initial_prefetch_count)       # set count

Copy the code

At this point, the corresponding task consumption is started. After starting consumption, we continue to analyze the opening of loop.

def start(self, c) :
    self.patch_all(c)
    c.loop(*c.loop_args())

Copy the code

Loops in consumer (celery/worker/loops. Py) asyncloop

The stack is as follows:

asynloop, loops.py:43
start, consumer.py:592
start, bootsteps.py:116
start, consumer.py:311
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:203
worker, worker.py:327
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
worker_main, base.py:374
Copy the code

Asyncloop functions are as follows:

def asynloop(obj, connection, consumer, blueprint, hub, qos,
         heartbeat, clock, hbrate=2.0) :
    """Non-blocking event loop."""                                  Where obj is the consumer instance
    RUN = bootsteps.RUN                                             Get the running status
    update_qos = qos.update
    errors = connection.connection_errors

    on_task_received = obj.create_task_handler()                    Create a task processing header

    _enable_amqheartbeats(hub.timer, connection, rate=hbrate)       Send heartbeat packets periodically

    consumer.on_message = on_task_received                          # set on_message to on_task_received
    consumer.consume()                                              # Start spending
    obj.on_ready()                                                  Call the callback function
    obj.controller.register_with_event_loop(hub)                    Register the hub with all instances of the generated Blueprint
    obj.register_with_event_loop(hub)                               

    # did_start_ok will verify that pool processes were able to start,
    # but this will only work the first time we start, as
    # maxtasksperchild will mess up metrics.
    if not obj.restart_count and not obj.pool.did_start_ok():
        raise WorkerLostError('Could not start worker processes')

    # consumer.consume() may have prefetched up to our
    # limit - drain an event so we're in a clean state
    # prior to starting our event loop.
    if connection.transport.driver_type == 'amqp':
        hub.call_soon(_quick_drain, connection)

    # FIXME: Use loop.run_forever
    # Tried and works, but no time to test properly before release.
    hub.propagate_errors = errors
    loop = hub.create_loop()                                        Create loop, essentially a generator

    try:
        while blueprint.state == RUN and obj.connection:            Check if it is running and if the connection is available
            # shutdown if signal handlers told us to.
            should_stop, should_terminate = (
                state.should_stop, state.should_terminate,
            )
            # False == EX_OK, so must use is not False
            if should_stop is not None and should_stop is not False:
                raise WorkerShutdown(should_stop)
            elif should_terminate is not None and should_stop is not False:
                raise WorkerTerminate(should_terminate)

            # We only update QoS when there's no more messages to read.
            # This groups together qos calls, and makes sure that remote
            # control commands will be prioritized over task messages.
            ifqos.prev ! = qos.value: update_qos()try:
                next(loop)                                          # Loop next
            except StopIteration:
                loop = hub.create_loop()
    finally:
        try:
            hub.reset()
        except Exception as exc:  # pylint: disable=broad-except
            logger.exception(
                'Error cleaning up after event loop: %r', exc)

Copy the code

At this point, asynchronous Loop will start, and then the server side event waiting processing will start, and the worker process will finish.

0x4 Summary of this article

This paper mainly describes the startup process of worker. Tasks are started in process mode by default and then asynchronous IO processes events on the consumer end, so far the startup process overview of worker has been analyzed.

                                     +----------------------+
      +----------+                   |  @cached_property    |
      |   User   |                   |      Worker          |
      +----+-----+            +--->  |                      |
           |                  |      |                      |
           |  worker_main     |      |  Worker application  |
           |                  |      |  celery/app/base.py  |
           v                  |      +----------+-----------+
 +---------+------------+     |                 |
 |        Celery        |     |                 |
 |                      |     |                 |
 |  Celery application  |     |                 v
 |  celery/app/base.py  |     |  +--------------+--------------+    +---> app.loader.init_worker
 |                      |     |  | class Worker(WorkController)|    |
 +---------+------------+     |  |                             |    |
           |                  |  |                             +--------> setup_defaults
           |  celery.main     |  |    Worker as a program      |    |
           |                  |  |   celery/apps/worker.py     |    |
           v                  |  +-----------------------------+    +---> setup_instance +-----> setup_queues  +------>  app.amqp.queues
 +---------+------------+     |                                                +
 |  @click.pass_context |     |                                                |
 |       celery         |     |                 +------------------------------+
 |                      |     |                 |       apply
 |                      |     |                 |
 |    CeleryCommand     |     |                 v
 | celery/bin/celery.py |     |
 |                      |     |  +-------------------------------------+        +---------------------+     +--->  claim_steps
 +---------+------------+     |  | class Blueprint(bootsteps.Blueprint)| apply  |  classBlueprint | | | | | +------>-+ | +-------> _finalize_steps | | | | | | | | | | celery/apps/worker.py | | celery/bootsteps.py | | +--> Timer v | +-----+---------------+---------------+ +---------------------+ +---> include +--->+ +----------+------------+ | ^ | +--> Hub | @click.pass_context | | | | start | | worker | | | | +--> Pool | | | |  | +----> WorkController.on_start | | | | | | | +--> ...... | WorkerCommand | | | +-------------> Hub. start / Pool.start / Task.start | | celery/bin/worker.py  |     |        |                        |                                                               +--> Consumer
+-----------+-----------+     |        |                        +---->   Evloop.start +----->  asynloop
            | 1  app.Worker   |        |
            +-----------------+        |
            |                          |
            | 2 blueprint.start        |
            +------------------------->+

Copy the code

The mobile phone is as follows:

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Celery source learning (ii) multi – process model

Study on celery principle

Celery source analysis – Wroker initialization analysis (1)

Celery source analysis -worker initialization analysis (下) blog.csdn.net/qq_33339479…

Celery worker initialization -DAG implementation

There are multiple queues and tasks with timed tasks

Celery Detail tutorial – Worker

The use of Celery

Celery source parsing a: Worker startup process overview

Celery: Worker’s executive engine

Celery = tasks with tasks

Celery: implementation of tasks

Celery: Remote control management

Celery source parsing six: implementation of Events

Celery source parsing seven: interactions between workers

Celery Celery: State and Result