0 x00 the

Celery is a simple, flexible and reliable distributed system for processing large amounts of messages, focusing on asynchronous task queues for real-time processing, while also supporting task scheduling.

Earlier we analysed Kombu with several articles which laid the foundation for the analysis of Celery.

Message queue Kombu mailbox

Message queue Hub of Kombu

Message queue Consumer of Kombu

Producer of message queue Kombu

Start process of message queue Kombu

The basic architecture of message queue Kombu

In this series we will continue to study Celery in depth with you through source analysis. This article is a series of the first article, drew lessons from the work of a few net friends, according to their own understanding to rearrange, then get this article.

A brief introduction 0 x01 Celery

1.1 What is Celery

Celery is one of the most sought-after back office managers in the Python world. It is a simple, flexible and reliable distributed system for processing large amounts of messages, focusing on asynchronous task queues for real-time processing, and also supporting task scheduling.

Using multiple threads, tasks such as eventlets, gevents, Celery etc can be executed concurrently on single or multiple worker servers. Tasks can be executed asynchronously (running in the background) or synchronously (waiting for the task to complete). Celery can handle millions of tasks per day when used in production systems.

Celery is written in Python, but the protocol can be implemented in any language. It can also be implemented with other languages via WebHooks.

Celery suggests message queues are RabbitMQ, but also supports Redis, Beanstalk, MongoDB, CouchDB, and databases (using SQLAlchemy or Django’s ORM). And can act as both producer and consumer.

1.2 scenario

Common scenarios for using Celery are as follows:

  • Web applications. When an operation triggered by the user takes longer to complete it can be given to Celery for asynchronous execution and then returned to the user. This time the user does not have to wait, improving the overall throughput and response time of the site.

  • Scheduled tasks. Production environments often run timed tasks. If you have thousands of servers and thousands of tasks, it is difficult to manage timed tasks, Celery can help us quickly set up tasks on different machines.

  • Additional work that is done synchronously can be done asynchronously. Such as sending SMS/email, pushing messages, clearing/setting cache, etc.

1.3 features

Celery provides the following characteristics:

  • You can view the execution status of scheduled tasks, such as successful execution, current status, and task execution duration.

  • You can add, update, and delete tasks using the fully functional management background or command line.

  • Easy to associate tasks with configuration management.

  • Multiple processes, Eventlet, and Gevent can be executed concurrently.

  • Provides error handling mechanisms.

  • Multiple task primitives are provided to facilitate task grouping, splitting, and call chains.

  • Support for multiple message brokers and storage backends.

1.4 the difference between

The biggest difference between message queues and task queues is the concept of message queues delivering “messages” and task queues delivering “tasks”.

  • Message queues are used to quickly consume messages in queues. Message queue focuses on message processing and processing, and has the ability to process massive information. It is also possible to use the message queue growth and consumer concept to achieve the function of the task queue, but additional development is required.
  • A task queue is used to execute a time-consuming task. Task queues provide the functions needed to perform tasks, such as task retry, result return, task status recording, and so on. Although there is concurrent processing capability, it is generally not suitable for high-throughput fast consumption scenarios.

Structure of 0x02 Celery

The basic logic of Celery is: distributed asynchronous message task queue.

In Celery, a distributed management approach is adopted with broadcast/unicast communication between each node to achieve synergy. In fact, only some of the ancillary management functions are coordinated; the underlying business functions are not.

2.1 components

Celery contains the following components:

  • The Beat process reads the contents of the configuration file and periodically sends tasks that are due in the configuration and need to be executed to the task queue.

  • Celery Worker: Consumers who perform tasks, usually run multiple consumers on multiple servers to improve execution efficiency.

  • Broker: Message Broker, or message middleware, receives task messages from task producers, stores them in queues and distributes them in sequence to task consumers (usually message queues or databases).

  • Producer: Tasks generated by calling the API, functions or decorators provided by Celery and assigned to the task queue are task producers.

  • Result Backend: Saves status information and results after tasks are processed for query. Celery default supports Redis, RabbitMQ, MongoDB, Django ORM, SQLAlchemy, etc.

To understand this again:

  • A system can have multiple “message queues”, and different messages can be assigned to different Message queues.
  • Routing_key is specified when sending messages to “message queues”, Exchange uses routing_key to route messages to different “message queues” (Celery base dependent on Kombu, It involves Exchange.
  • Exchange corresponds to a message queue, that is, exchange corresponds to a queue through the mechanism of “message routing”, and each queue corresponds to each worker.

2.2 Task Process

Celery communicates through a messaging mechanism, usually using brokers as clients and Worker moderation. The process for starting a task is:

  • The client sends a message to the message queue.
  • A Broker then passes the message to a Worker, supporting RabbitMQ, Redis, etc. ;
  • Finally, the Worker performs the tasks assigned by the Broker.

2.3 architecture diagram

The structure diagram of Celery is shown below:

+-----------+ +--------------+ | Producer | | Celery Beat | +-------+---+ +----+---------+ | | | | v v +-------------------------+ | Broker | +------------+------------+ | | | +-------------------------------+ | | | v v v +----+-----+ +----+------+ +-----+----+ | Exchange | | Exchange | | Exchange | +----+-----+ +----+------+ +----+-----+ |  | | v v v +-----+ +-------+ +-------+ |queue| | queue | | queue | +--+--+ +---+---+ +---+---+ | | | | | | v v v +---------+ +--------+ +----------+ | worker | | Worker | | Worker | +-----+---+ +---+----+ +----+-----+ | | | | | | +-----------------------------+ | | v +---+-----+ | backend | +---------+Copy the code

0x03 Celery design inference

So far we have the following information:

  • The basic logic of Celery is: distributed asynchronous message task queue;
  • Celery bottom relies on Kombu, based on Kombu to complete basic functions;
  • We have seen the general logic of Kombu in several articles;

Then we need to deduce how Celery should be designed according to Kombu.

Basic functions of Celery

Firstly we will look at what components (modules) Celery should have in order to complete the basic functions and we will ask some questions which will be answered in the subsequent analysis.

Because the basic logic of Celery is: distributed asynchronous message task queue, Celery contains the following basic components:

  • Producer: A component is required to perform the following functions: package user-defined code into a task and submit it to a task queue. The question is:
    • What about tasks, that is, tasks?
    • What is the nature of task?
    • What functions should the Task include?
    • If task is a function, how do I pass the task function to the server? What if the Task function is large?
    • How do I transfer task-related information from the client to the server?
  • BrokerTo understand coupling, you need an intermediate component to cache messages. This is the message broker, or messaging middleware. Its function is to accept the task message sent by the task producer, save it in the queue and then distribute it to the task consumer (usually message queue or database) in sequence. The question is:
    • How do you distinguish between different message sources, that is, how do you route them?
    • Is there a fault tolerance mechanism?
  • Worker: There needs to be a component to perform the task, which is Worker:
    • The Worker needs to accept tasks from the broker. This requires a consumer, and the question is how the consumer gets messages from the broker.
    • After receiving the task, Worker needs to know the task and how to execute it. So there is a question: how does the Worker know about the client task?
    • Usually, multiple workers are run on multiple servers to improve execution efficiency. This involves a question: how to coordinate multiple workers? How to allocate tasks among multiple workers?
  • Result Backend: Saves status information and results after tasks are processed for query. Celery default supports Redis, RabbitMQ, MongoDB, Django ORM, SQLAlchemy, etc.

3.2 Celery auxiliary functions

That’s the basic functionality, but as a distributed asynchronous message task queue, we need ancillary functionality (and related issues), such as.

  • A timer for executing a scheduled task;

  • Monitoring events need to be handled.

  • How to manage by remote command;

  • How to deal with worker problems;

  • How to improve concurrency?

  • How to encapsulate AMQP?

  • How does the message loop engine work?

  • Which of these functions are distributed?

3.3 Division

A further question is: are these ancillary functions part of the basic functional module? Or does it stand alone as a functional module?

This is really a philosophical question, and every implementation has its reasons, or many decisions are the result of a sudden brainwave by the author.

For example, the Consumer component, which we’ll mention later, is ostensibly a functional module that gets messages from the broker, using Kombu’s Consumer directly.

However, in fact, the concept of celery Consumer components is much larger than Kombu consumers, not only utilizing Kombu consumers to get messages from the broker. Also includes message consumption, distribution, monitoring, heartbeat and a series of functions. It can be said that except the message cycle engine is carried out by hub, multi-process is carried out by Pool, Autoscaler, timed task is carried out by timer, beat, other major functions are carried out by Consumer.

Therefore, we need to look at:

  • Which components can be completed directly with Kombu and which ones need to be redesigned by Celery herself.

  • If redesigned, which can be based on Kombu design, how to call the corresponding Kombu module.

  • If Kombu module is used as the Celery module variable, which Kombu modules belong to respectively.

0x04 Encapsulation of AMQP/Kombu

If you want to be a message processing system you first need to solve message protocol and message transport issues.

  • The Message Protocol is Advanced Message Queuing Protocol (AMQP). Celery supports all AMQP routing mechanisms and can be configured to perform associated message routing.
  • Message implementation and transport are handled by Kombu. From the previous analysis of Kombu, we know that Kombu is positioned as a message queue abstraction compatible with THE AMQP protocol, and a library that encapsulates message passing into a unified interface.

So let’s first look at how to package AMQP/Kombu.

Queues are encapsulated in the celery/app/amqp.py file where there are two main classes: AMqp and Queues.

4.1 packaging

The AMQP class is used to send/receive messages, which is another encapsulation of the AMQP protocol implementation, in this case, the komBU class.

As you can see, the internal member variables are all from Kombu. Examples include Connection, Consumer, Exchange, Producer, Queue, and pools.

from kombu import Connection, Consumer, Exchange, Producer, Queue, pools

class AMQP:
    """App AMQP API: app.amqp."""

    Connection = Connection
    Consumer = Consumer
    Producer = Producer

    #: compat alias to Connection
    BrokerConnection = Connection

    queues_cls = Queues

    #: Cached and prepared routing table.
    _rtable = None

    #: Underlying producer pool instance automatically
    #: set by the :attr:`producer_pool`.
    _producer_pool = None

    # Exchange class/function used when defining automatic queues.
    # For example, you can use ``autoexchange = lambda n: None`` to use the
    # AMQP default exchange: a shortcut to bypass routing
    # and instead send directly to the queue named in the routing key.
    autoexchange = None
Copy the code

To better understand, let’s print out the details of the AMQP class.

amqp = {AMQP}  
 BrokerConnection = {type} <class 'kombu.connection.Connection'>
 Connection = {type} <class 'kombu.connection.Connection'>
 Consumer = {type} <class 'kombu.messaging.Consumer'>
 Producer = {type} <class 'kombu.messaging.Producer'>
 app = {Celery} <Celery myTest at 0x252bd2903c8>
 autoexchange = {NoneType} None
 default_exchange = {Exchange} Exchange celery(direct)
 default_queue = {Queue} <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>
 producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
 publisher_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
 queues = {Queues: 1} {'celery': <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>}
 queues_cls = {type} <class 'celery.app.amqp.Queues'>
 router = {Router} <celery.app.routes.Router object at 0x00000252BDC6B248>
 routes = {tuple: 0} ()
 task_protocols = {dict: 2} {1: <bound method AMQP.as_task_v1 of <celery.app.amqp.AMQP object at 0x00000252BDC74148> >,2: <bound method AMQP.as_task_v2 of <celery.app.amqp.AMQP object at 0x00000252BDC74148>>}
  _event_dispatcher = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x00000252BE750348>
  _producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x00000252BDC8F408>
  _rtable = {tuple: 0} ()
Copy the code

The specific logic is as follows:

+---------+
| Celery  |    +----------------------------+
|         |    |   celery.app.amqp.AMQP     |
|         |    |                            |
|         |    |                            |
|         |    |          BrokerConnection +----->  kombu.connection.Connection
|         |    |                            |
|   amqp+----->+          Connection       +----->  kombu.connection.Connection
|         |    |                            |
+---------+    |          Consumer         +----->  kombu.messaging.Consumer
               |                            |
               |          Producer         +----->  kombu.messaging.Producer
               |                            |
               |          producer_pool    +----->  kombu.pools.ProducerPool
               |                            |
               |          queues           +----->  celery.app.amqp.Queues
               |                            |
               |          router           +----->  celery.app.routes.Router
               +----------------------------+
Copy the code

4.2 the Queues

Queues are an extension, a logical concept that can be thought of as a shortened version of the concept of Broker.

Producer sends tasks to Queues, and workers get tasks from Queues to consume.

App.amqp. queues is an example of queues, where all kombu.queues can be read by the Worker.

class Queues(dict) :
    ""Queue name - Function used in memory (queues): Initial list/tuple or dict of queues. create_missing (bool): By default any unknown queues will be added automatically, but if this flag is disabled the occurrence of unknown queues in `wanted` will raise :exc:`KeyError`. max_priority (int): Default x-max-priority for queues with none set. """

    #: If set, this is a subset of queues to consume from.
    #: The rest of the queues are then used for routing only.
    _consume_from = None

    def __init__(self, queues=None, default_exchange=None,
                 create_missing=True, autoexchange=None,
                 max_priority=None, default_routing_key=None) :
        dict.__init__(self) self.aliases = WeakValueDictionary() self.default_exchange = default_exchange self.default_routing_key =  default_routing_key self.create_missing = create_missing self.autoexchange = Exchangeif autoexchange is None else autoexchange
        self.max_priority = max_priority
        if queues is not None and not isinstance(queues, Mapping):
            queues = {q.name: q for q in queues}
        queues = queues or {}
        for name, q in queues.items():
            self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
Copy the code

For a Consumer, we can configure its queue. A Consumer can have multiple queues, such as:

def add_consumer(state, queue, exchange=None, exchange_type=None,
                 routing_key=None, **options) :
    """Tell worker(s) to consume from task queue by name."""
    state.consumer.call_soon(
        state.consumer.add_task_queue,
        queue, exchange, exchange_type or 'direct', routing_key, **options)
    return ok(f'add consumer {queue}')
Copy the code

The add_consumer name, which I think is misleading, is actually adding a queue, but it looks like adding a Consumer.

In Consumer, queues are configured specifically.

def add_task_queue(self, queue, exchange=None, exchange_type=None,
                   routing_key=None, **options) :
    cset = self.task_consumer
    queues = self.app.amqp.queues
    if queue in queues:
        q = queues[queue]
    else:
        exchange = queue if exchange is None else exchange
        exchange_type = ('direct' if exchange_type is None
                         else exchange_type)
        q = queues.select_add(queue,
                              exchange=exchange,
                              exchange_type=exchange_type,
                              routing_key=routing_key, **options)
    if not cset.consuming_from(queue):
        cset.add_queue(q)
        cset.consume()
        info('Started consuming from %s', queue)
Copy the code

0x05 TBC

From the above analysis you should have a preliminary understanding of the structure of Celery. In the next article, we will consider further from several aspects, stay tuned.

0xEE Personal information

★★★★ Thoughts on life and technology ★★★★★

Wechat official account: Rosie’s Thoughts

If you want to get a timely news feed of personal articles, or want to see the technical information of personal recommendations, please pay attention.

0 XFF reference

Basic concepts of Master and Worker in Nginx data

1: Overview of Worker startup process

2: Worker’s execution engine

3: Implementation of Task objects

4: Implementation of scheduled tasks

5: Remote control management

6: Implementation of Events

7: Interaction between workers

8: the State and the Result

Application of Spark distributed computing engine

MFC message Queue concept _ What is the difference between message queue and task queue?