Coroutine analysis of Tornado source code

Analysis of coroutine principle of Tornado


Version: 4.3.0

Tornado implements a coprogram library to support asynchrony.

The coroutine framework implemented by Tornado has the following characteristics:

  1. Support for Python 2.7, with no yield from feature, and a purely yield implementation
  2. Returns a value from the coroutine by throwing an exception
  3. Adopt the proxy coroutine of the Future class (save the execution result of the coroutine, and call the registered callback function when the execution of Ctrip ends)
  4. Using the IOLoop event loop, a registered callback is invoked in the loop when an event occurs, driving the coroutine to execute forward

As you can see, this is a classic implementation of a Python coroutine.

In this paper, a basic coroutine framework similar to Tornado is implemented, and the corresponding principle is described.

External libraries

Time is used to calculate the time of the timer callback. Bisect’s insort method maintains a time-limited timer queue. The partial methods of FuncTools bind the partial parameters of the function. Use backports_abc to import Generator to determine whether a function is a Generator.

import time
import bisect
import functools
from backports_abc import Generator as GeneratorType

Future

Is a messenger that travels between the coroutine and the scheduler.


It provides callback function registration (when the asynchronous event is completed, the registered callback is called), intermediate result saving, end result return and other functions

The add_done_callback registers the callback function and is called when the Future is resolved. Set_result sets the final state and calls the registered callback function

Each yield in the coroutine corresponds to a coroutine, which corresponds to a Future object, such as:

@coroutine
def routine_main():
    yield routine_simple()

    yield sleep(1)

Here, routine_simple() and sleep(1) each correspond to a coroutine, and there is a corresponding Future.

class Future(object):
    def __init__(self):
        self._done = False
        self._callbacks = []
        self._result = None

    def _set_done(self):
        self._done = True
        for cb in self._callbacks:
            cb(self)
        self._callbacks = None

    def done(self):
        return self._done

    def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result):
        self._result = result
        self._set_done()

    def result(self):
        return self._result

IOLoop

IoLoop here strips out the IO related parts of Tornado source code and retains only the basic functionality needed, which would be more appropriate if it were named CoroutinleLoop.

IoLoop here provides basic callback functionality. It is a thread loop that does two things in the loop:

  1. Checks for registered callbacks and executes
  2. Check if there is an expired timer callback and execute it

This is where the callback events registered in the program will eventually be executed. You can think of the coroutine program itself, the driver of the coroutine will be executed here. The coroutine itself uses a wrapper and is finally registered to the IoLoop event callback, so all of its code from preexcitation to termination is executed within the IoLoop callback. After the coroutine is pre-excited, the runner.run () function is registered with the iOLoop event callback to drive the coroutine forward.

Understanding this is critical to understanding how coroutines work.

This is the basic principle of single-threaded asynchrony. Because they are all executed in a single thread loop, we don’t have to deal with all the tedious things that multiple threads need to face.

IOLoop.start

The event loop, the callback event, and the timer event are called in the loop.

IOLoop.run_sync

Execute a coroutine.

Register run in the global callback and call func() to start the coroutine in run. Register the coroutine end callback STOP, exit the start loop of RUN_SYNC, and the event loop ends.

Class IOLoop(object):, def __init__(self): self._callbacks = [] self._timers = [] self._running = False @classmethod def instance(cls): if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance def add_future(self, future, callback): future.add_done_callback( lambda future: self.add_callback(functools.partial(callback, future))) def add_timeout(self, when, callback): bisect.insort(self._timers, (when, callback)) def call_later(self, delay, callback): return self.add_timeout(time.time() + delay, callback) def add_callback(self, call_back): self._callbacks.append(call_back) def start(self): self._running = True while self._running: # callbacks = self._callbacks self._callbacks = [] for call_back in callbacks: Call_back () # timer task while self._timers and self._timers[0][0] < time.time(): task = self._timers[0][1] del self._timers[0] task() def stop(self): self._running = False def run_sync(self, func): future_cell = [None] def run(): try: future_cell[0] = func() except Exception: pass self.add_future(future_cell[0], lambda future: self.stop()) self.add_callback(run) self.start() return future_cell[0].result()

coroutine

Coroutine decorator. Coroutines are decorated with coroutines and fall into two categories:

  1. A generator function with yield
  2. A normal function with no yield statement

Decorate the coroutine and drive the coroutine to run by registering callbacks. The coroutine is invoked in the program by yield coroutine_func(). At this point, the wrapper function is called:

  1. Gets the coroutine generator
  2. If it is a generator, then

    1. Call next() to pre-excite the coroutine
    2. Instantiate Runner() to drive the coroutine
  3. If it’s a normal function, then

    1. Call set_result() to end the coroutine

The coroutine returns the Future object for processing by the outer coroutine. The external controls the running of the coroutine by manipulating the Future. Each yield corresponds to a coroutine, and each coroutine has a Future object.

The external coroutine gets the Future object of the internal coroutine and registers the runner.run () method with the end callback of the internal coroutine’s Future if the internal coroutine has not finished yet. Thus, at the end of the internal coroutine, the registered run() method is called, driving the external coroutine forward.

Each coroutine forms a chain-type callback relationship through Future.

The Runner class is described in a separate section below.

def coroutine(func): Return _make_coroutine_wrapper(func) # Each coroutine has a future that represents the current running state of the coroutine def _make_coroutine_wrapper(func): @functools.wraps(func) def wrapper(*args, **kwargs): future = Future() try: result = func(*args, **kwargs) except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: return future else: if isinstance(result, GeneratorType): try: yielded = next(result) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: pass else: Runner(result, future, yielded) try: return future finally: future = None future.set_result(result) return future return wrapper

The coroutine returns the value

Because no yield from is used, the coroutine cannot return the value directly, so an exception is thrown.

Python 2 cannot use a return statement in a generator. However, exceptions thrown in the generator can be caught in an external send() statement. So, by throwing an exception, you store the return value in the value property of the exception and throw it. External uses such as:

try:
    yielded = gen.send(value)
except Return as e:

Gets the return value of the coroutine in this way.

class Return(Exception):
    def __init__(self, value=None):
        super(Return, self).__init__()
        self.value = value
        self.args = (value,)

Runner

Runner is the driver class of the coroutine.

Result_future holds the state of the current coroutine. Self. Future holds the state of the coroutine passed by the yield subcoroutine. Send the coroutine running result from the future of the subcoroutine to the current coroutine to drive the coroutine to execute forward.

Note that the future returned by the subcoroutine is determined. If the future has been set_result, it means that the subcoroutine has finished running, so we return to the while Ture loop and proceed to the next send. If future has no set_result, it means that the subcoroutine has not finished running. Register self.run to the callback of the end of the subcoroutine.

If a StopIteration or Return exception is caught during send() execution of this coroutine, it indicates that the execution of this coroutine has ended. Set the Return value of result_future. At this time, the registered callback function is executed. The callback function here is the run() registered by the parent of this coroutine. It is equivalent to waking up the parent coroutine that is already in the yiled state, calling back the run function through Ioloop, and then executing send().

class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.io_loop = IOLoop.instance() self.running = False self.future = None if self.handle_yield(first_yielded): self.run() def run(self): try: self.running = True while True: try: # Each yield is treated as a coroutine. (ret = yiled coroutine_func()). Value = self.future.result() expressde = self.gen.send(value) except (StopIteration, Return) as e: # The coroutine is complete. No longer register callback self.result_future.set_result(_value_from_StopIteration (e)) self.result_future = None return except Exception: If not self.handle_yield(): return finally: = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = self.running = False def handle_yield(self, yielded): self.future = yielded if not self.future.done(): # add an endof execution callback to future so that it will be called when external users use future.set_result # and this callback is the event loop that registers self.run() with IOLoop # Future.set_result registers self.run() in the IOLoop event loop, and calls self.io_loop.add_future(self.future, lambda f: self.run()) return False return True

sleep

Sleep is a delayed coroutine that fully demonstrates the standard implementation of a coroutine.

  • Create a Future and return it to the external coroutine.
  • The external coroutine finds an unfinished state and registers run() with the completion callback of Future, while the external coroutine is suspended.
  • After the delay is set, iOLoop will call back to set_result to end the coroutine;
  • IOLoop calls the run() function;
  • IoLoop calls send() to wake up suspended external coroutines.

The process is as follows:

def sleep(duration):
    f = Future()
    IOLoop.instance().call_later(duration, lambda: f.set_result(None))
    return f

run

@coroutine def routine_ur(url, wait): yield sleep(wait) print('routine_ur {} took {}s to get! '.format(url, wait)) @coroutine def routine_url_with_return(url, wait): yield sleep(wait) print('routine_url_with_return {} took {}s to get! '.format(url, wait)) raise Return((url, wait)) # do not generate a separate Runner() # coroutine Directly return an already executed future@coroutine def routine_simple(): print("it is simple routine") @coroutine def routine_simple_return(): print("it is simple routine with return") raise Return("value from routine_simple_return") @coroutine def routine_main(): yield routine_simple() yield routine_ur("url0", 1) ret = yield routine_simple_return() print(ret) ret = yield routine_url_with_return("url1", 1) print(ret) ret = yield routine_url_with_return("url2", 2) print(ret) if __name__ == '__main__': IOLoop.instance().run_sync(routine_main)

The running output is:

it is simple routine
routine_ur url0 took 1s to get!
it is simple routine with return
value from routine_simple_return
routine_url_with_return url1 took 1s to get!
('url1', 1)
routine_url_with_return url2 took 2s to get!
('url2', 2)

You can observe that the coroutine sleep has taken effect.

The source code

simple_coroutine.py

copyright

Creative Commons Attribute – Non-Commercial 4.0 International License Agreement