Celery is a very simple, flexible and reliable distributed system for processing large numbers of messages and provides a full set of tools to operate the system. Celery is a message queue tool for processing real-time data and task scheduling.

Celery has 18K star and 4.1K fork on Github and is very popular; There have been 216 releases so far, the last one 19 days ago, and updates are very active. If you need a Python implementation of the task scheduling framework, this is the first place to go.

Starting this week we will read the source of Celery, learn how to use Celery, learn how the distributed task scheduling framework is built and dive into the implementation details of Celery. There is a large amount of celery code and it is expected to take 3~4 weeks. Without further ado, to get started, this week’s content mainly consists of the following sections:

  • Task Application Scenario
  • Celery project structure
  • Implementation of the Promise library
  • summary
  • tip

It is also very important to make a small improvement, considering that the source code and interpretation of the previous article were not good enough. I will add my own comments to the project source code and upload it to Github. If you want to know the details of the implementation, please use [read article]. My source code reading project Yuanmahui, you are also welcome to like it on Github ❤

Celery Application Scenario

We have a Flask implemented Web service where the Hello interface needs to make external calls (such as sending SMS captcha). Here we use time.sleep to simulate this time-consuming operation:

import time from flask import Flask app = Flask(__name__) @app.route('/') def hello(): Return 'Hello, World! 'Copy the code

We can use ab to verify the elapsed time of this interface:

# ab-n 10-c 5 http://127.0.0.1:5000/... Requests per second: 3.30 [#/ SEC] (mean) Time per request: 1513.100 [ms] (mean) Time per request: 302.641 [MS] (mean, Across all concurrent requests) Transfer rate: 0.54 [Kbytes/ SEC] Received Connection Times (ms) min mean[+/-sd] Median Max Connect: 0 0 0.1 0 0 Processing: 1007 1008 1.2 1008 1010 Waiting: 1005 1007 1.0 1007 1008 1008 Total: 1007 1008 1.2 1009 1010...Copy the code

Tests show that this interface takes about 1.5 seconds on average and responds slowly. At the same time, such an interface can cause the front page to lag. To solve this problem, we can use task scheduling to convert this time-consuming operation into a background task and return HTTP response in time. The adjustment method is as follows:

. @app.route('/') def hello(): # time.sleep(1) do_task(id(request)) # return 'hello, World! ' def do_task(index): t = threading.Thread(target=lambda idx: time.sleep(1), args=(index,)) t.start()Copy the code

Do_task opens a new task thread to perform the time.sleep operation, and the current thread returns immediately after starting the task thread. Test the interface time using AB again:

. Time per Request: 6.304 [ms] (mean)...Copy the code

You can see that the response efficiency of the Hello interface is greatly improved by using the task mode. Of course, there are two problems with this simple task scheduling:

  1. The result of the task execution cannot be returned to the front end
  2. Tasks and Web services are executed in one process, which is not very efficient

The distributed task scheduling of Celery can better solve these two problems.

Celery project structure

Celery we choose 5.0.5 version, first requirements/default. TXT file describes mainly depends on the following library:

  • Implementation of a multi-process pool provided by the Billiard celery project
  • Kombu celery project provides a message library that can be linked to different message queues such as RabbitMQ, Redis
  • Vine Celery project provides a promise implementation that can handle task combinations and pipline etc

Celery supports the following 3 working modes:

  • Beat starts with a timed heartbeat
  • Multi Starts in cluster mode and forms multiple working processes
  • Worker Starts a normal work process

Results of celery task execution also support a variety of storage modes:

  • Mongo
  • Redis
  • Elasticsearch
  • .

Celery concurrency also supports multiple implementations

  • Multi-process fork
  • gevent
  • multithreading
  • eventlet
  • .

Celery support workflow

  • Scheduling can be based on function signatures
  • Can support chained tasks
  • Can support groups, chords…

The project structure of celery is briefly introduced into these and will be covered in detail in subsequent chapters.

Implementation of the Promise library

Promise is very important in asynchronous tasks so celery has a vine project which implements the promise function and before starting celery we clear these peripheral obstacles.

Introduction of promise

A Promise is an object that represents the final completion or failure of an asynchronous operation. I think the introduction in MDN is very good, so let’s look at it first and see how it is implemented in Python. For example, if there is an operation to create an audio file, different output will be used for success and failure:

Function successCallback(result) {console.log(" audio file created successfully: "+ result); function successCallback(result) {console.log(" audio file created successfully:" + result); } function failureCallback(error) {console.log(" audio file creation failed: "+ error); } createAudioFileAsync(audioSettings, successCallback, failureCallback)Copy the code

The traditional method is to use callback calls, passing the correct and wrong callbacks to createAudioFileAsync. If you use the Promise approach, this becomes:

Const Promise = createAudioFileAsync(audioSettings); Then (successCallback, failureCallback);Copy the code

Although the above code uses JavaScript, I’m sure you can understand it correctly if you’re familiar with Python. In the example above, it’s not easy to see the merits of Promise. Continue with the following example:

doSomething(function(result) {
  doSomethingElse(result, function(newResult) {
    doThirdThing(newResult, function(finalResult) {
      console.log('Got the final result: ' + finalResult);
    }, failureCallback);
  }, failureCallback);
}, failureCallback);
Copy the code

Here is an implementation of multiple callbacks, processing doSomething first, processing doSomethingElse after receiving the result, and finally executing doThirdThing. Only 3 levels of callbacks are acceptable, and if you have too many callbacks you end up with callback hell, ugly and hard to use code. If you implement it using Promise, this would be:

doSomething().then(function(result) {
  return doSomethingElse(result);
})
.then(function(newResult) {
  return doThirdThing(newResult);
})
.then(function(finalResult) {
  console.log('Got the final result: ' + finalResult);
})
.catch(failureCallback);
Copy the code

You can see that when you use the Promise approach, your code gets flatter and cleaner. Two features of Promises are shown here:

  • The then function executes and returns a Promise object
  • A Promise can be called in a chain

Now that we understand promises, let’s go back to doSomething, doSomethingElse, and doThirdThing, and say, if these are three tasks that need to be executed sequentially, the latter needs the execution result of the former as an argument, right? So you can see that, logically speaking, the Promise feature is very important for task scheduling. This implementation is actually language independent.

The realization of the Promise

The first unit test case looks at the use of promise:

def test_signal(self): Mock a callback = mock (name='callback') # create a Promise object a = Promise () # add callback a.hen (callback) # Execute the Promise object A (42) # the function is called with 42 callback.assert_called_once_with(42)Copy the code

It’s not hard to see that the Python version uses Promise much differently than the JavaScript version. The promise constructor looks like this:

class promise: ... def __init__(self, fun=None, args=None, kwargs=None, callback=None, on_error=None, weak=False, ignore_result=False): Self. Weak = weak self. Ignore_result = ignore_result # Self. Fun = self._get_fun_or_weakref(fun=fun, If self.args = args or () # if self.kwargs = kwargs or {} # ready, failed, Cancelled three states, Default: false self.ready = false self.failed = false self.value = None self.reason = None # Optimization # Most promises will only have one callback, so we optimize for this # case by using a list only when there are multiple callbacks. # s(calar) pending / l(ist) Pending # Single callback/ multiple callback self._svpending = None self._lvpending = None self.on_error = on_error self.cancelled = If callback is not None: self. Then (callback)Copy the code

Each Promise has three status bits :ready, Cancelled, and failed, all of which default to false.

The key is then functions:

Def then(self, callback, on_error=None): if not isinstance(callback, Thenable): callback = promise(callback, on_error=on_error) if self.cancelled: callback.cancel() return callback if self.failed: callback.throw(self.reason) elif self.ready: args, kwargs = self.value callback(*args, **kwargs) if self._lvpending is None: svpending = self._svpending if svpending is not None: self._svpending, self._lvpending = None, deque([svpending]) else: Self._svpending = callback return callback # Add to right self._lvpending.append(callback) # Then (fun_y). Then (fun_z). Then (fun_z)Copy the code

Two features of Promises that have been introduced in the JavaScript version are that then returns a new Promise object, and since it returns a Promise object, you can continue to execute the then function to add new callback chain calls.

The Promise object is executed in the magic function *__call__:

def __call__(self, *args, **kwargs): retval = None if self.cancelled: If args else self.args final_kwargs = dict(self.kwargs, **kwargs) if kwargs else self.kwargs # self.fun may be a weakref fun = self._fun_is_alive(self.fun) if fun is not None: try: if self.ignore_result: fun(*final_args, **final_kwargs) ca = () ck = {} else: Retval = fun(*final_args, **final_kwargs) self.value = (ca, ck) = (retval,), {} except Exception: Return self.throw() else: Self. value = (ca, ck) = final_args, final_kwargs # Change ready state self.ready = True Pipeline if svpending is not None: try: svpending(* CA, **ck) finally: pipeline if svPending is not None: try: svpending(*ca, **ck) finally: self._svpending = None else: lvpending = self._lvpending try: while lvpending: P = lvpending.popleft() p(*ca, **ck) finally: self._lvpending = None return retvalCopy the code

The main steps of the above code are:

  • Merges the parameters of the Promise object with the function call parameters
  • If there is an initial function, the initial function is executed
  • Change the ready state of the Promise object
  • Implement the callback

For an example of a chain call, see:

def test_chained(self): def add(x, y): return x + y def pow2(x): return x ** 2 adder = Mock(name='adder') adder.side_effect = add power = Mock(name='multiplier') power.side_effect = Pow2 final = Mock(name='final') p = promise() # Mock(name='final') p = promise() 42) assert p.value == ((42, 42), {}) adder.assert_called_with(42, 42) power.assert_called_with(84) final.assert_called_with(7056)Copy the code

The realization of the Barrier

Vine also provides an implementation called barrier that handles serialization of multiple Promise objects. Here’s a unit test:

class test_barrier: def setup(self): self.m1, self.m2, self.m3 = Mock(), Mock(), Mock() self.ps = [promise(self.m1), promise(self.m2), promise(self.m3)] def test_evaluate(self): Ready x = barrier(self.ps) x() assert not x.readx. dd(promise()) x() assert not X.date x() assert X.date x() x() # Raises an error when the run is completed with Pytest.raises (ValueError): x.date (promise())Copy the code

With a barrier, the calls to the four Promise objects can be serialized and executed in a single step. Executing x() once consumes a Promise.

Barrier constructors:

class barrier: def __init__(self, promises=None, args=None, kwargs=None, callback=None, size=None): P = Promise () self.args = args or () self.kwargs = kwargs or {} self._value = 0 self.size = size or 0 if not self.size and promises: # iter(l) calls len(l) so generator wrappers # can only return NotImplemented in the case the # generator is not fully consumed yet. plen = promises.__len__() if plen is not NotImplemented: self.size = plen self.ready = self.failed = False self.reason = None self.cancelled = False self.finalized = False # List comprehensions [self add_noincr (p) for p promises or []] in the self. The finalized = bool (promises or self. The size) if the callback: self.then(callback)Copy the code

The barrier focuses on having a promise implementation by default that ends the entire batch. The list of Promises in the argument is chained by the add_noincr function:

Def add_noincr(self, p): if not self. Cancelled: # If self. Raise ValueError('Cannot add promise to full barrier') # then().then().then() Callback p.teng (self)Copy the code

Count each execution until the state is changed and the Promise object itself (tail) is executed

Def __call__(self, *args, **kwargs): self._value += 1 if self.finalized and self._value >= self.size: self.ready = True self.p(*self.args, **self.kwargs)Copy the code

summary

We know about Celery, a task scheduling system implemented by Python that is very popular on Github with active updates. Learning how to use task scheduling helps us deal with some of the time-consuming tasks in Web services. Briefly understand some characteristics of celery project, starting with the dependent item vine of celery, to understand the application of Promise in task scheduling system. Finally, learn how to create a Promise system from vine project source code.

tip

For abstract implementations, you can use composition mixins in Python in addition to inheritance. For example:

class Thenable(Callable, metaclass=abc.ABCMeta): # pragma: no cover ... @abc.abstractmethod def then(self, on_success, on_error=None): "" raise NotImplementedError() class CanThen: def then(self, x, y): pass assert isinstance(CanThen(), Thenable)Copy the code

You can see that CanThen implements the THEN function and can be considered an implementation of Thenable, but CanThen does not inherit from Thenable. This magic is implemented by ABCMeta’s register and __subclasshook__ methods:

class Thenable(Callable, metaclass=abc.ABCMeta): ... Def __subclasshook__(CLS, C): # also provided by ABCMeta if CLS is Thenable: if any('then' in B.__dict__ for B in C.__mro__): return True return NotImplemented @classmethod def register(cls, other): # overide to return other so register can be used as a decorator Its implementation class using a decorator way # https://docs.python.org/zh-cn/3/library/abc.html type (CLS). Register (CLS, other) return other @Thenable.register class promise: pass assert isinstance(promise(lambda x: x), Thenable)Copy the code

The Promise class, annotated with the thenable. register decorator, is recognized as an implementation of Thenable and does not require explicit write inheritance.

Refer to the link

  • Celery Chinese handbook www.celerycn.io/
  • JavaScript promise implementation developer.mozilla.org/zh-CN/docs/…
  • Source link github.com/game404/yua…