Flask is a synchronous Python WEB framework using THE WSGI protocol, he can’t use asynchronous event loops like Sanic and FastAPI to process requests, nor can he use ASGI servers like Uvicorn to speed himself up, time-consuming tasks need to be published to Celery to perform.

In Flask, there are time-consuming tasks and codes that need to block and wait, so most tasks are allocated to tasks in tasks, which are processed uniformly. So using asynchronous code to process tasks in a synchronous server, let’s see what happens to existing programs.

Here is an idea for implementing asynchronous tasks in flask projects using the Asyncio module. The main use of asyncio future objects is to issue commands that are executed asynchronously, and the method is executed in the asynchronous executor without blocking the current code trip.

Say first conclusion

  1. Python executes this code in a non-blocking environment, using thread pool or process pool executor or publishing directly
  2. Because loop does not manage published events (as I understand them), it does not monitor the execution status of events and cannot implement callback calls
  3. The method uses threads to share certain objects, such as future objects, to execute specified code after a task is completed

Note my thoughts here, and the detailed code will be given at the end.

The first step is to create a Flask app object and give it an event loop to post asynchronous tasks.

#! /usr/bin/env python
# A test for asyncio usage in flask app
import asyncio

from flask import Flask, current_app, jsonify, request
from loguru import logger

from works.dummy_work import delay, fake_work

app = Flask(__name__)

if __name__ == "__main__":
    app.event_loop = asyncio.get_event_loop()
    try:
        app.run(debug=True)
    except Exception as e:
        logger.error(f'Error happened: {e}')
    finally:
        app.event_loop.stop()
        app.event_loop.run_until_complete(app.event_loop.shutdown_asyncgens())
        app.event_loop.close()
Copy the code

Simulate a time-consuming task

def fake_work(*args, **kwargs):
    """ Simulate a long time cost work """
    logger.info('In fake_work function')
    logger.info(f'Args: {args}')
    time.sleep(10)
    logger.info(f'Finished fake work')
Copy the code

A way to publish asynchronous tasks

def delay(func: typing.Callable, app: Flask = None, *args, **kwargs):
    """ Publish a future async work """
    assert callable(func), "``func`` param has to be a callable function"
    future = current_app.event_loop.run_in_executor(
        None, func, *args
        )
    future.add_done_callback(callback)
    logger.info('Add callback function to future obj')
Copy the code

A callback function to test whether a callback can be implemented after successful execution

def callback(future: asyncio.Future, *args, **kwargs):
    """ async function callback """
    logger.info(f'Passed future object is {future}')
Copy the code

The logic for publishing the task is then invoked in the interface

@app.route('/test', methods=['GET'])
def index(a):
    delay(fake_work, current_app, num=request.args.get('num'))
    return jsonify({"msg": "ok"})
Copy the code

Start the service

> flask run
Copy the code

Test with Httpie

> http :5000/test
HTTP/1.0 200 OK
Content-Length: 13
Content-Type: application/json
Date: Thu, 18 Jun 2020 07:38:05 GMT
Server: Werkzeug/1.0.1 Python/3.7.5

{
    "msg": "ok"
}
Copy the code

Fake_work prints Finish Fake Work after sleeping for 10 seconds, but without a callback.

Run_in_executor is used because the internal logic of an asynchronous function will not be executed until the Event loop takes over the coroutine. Synchronized code, such as fake_work above, is executed by another executor, and the internal logic continues to execute. In fact, it is synchronized code that executes directly, but the Event loop gives it to the executor to execute. This parameter is None. Actually invokes the default actuators, concurrent. Futures. ThreadPoolExecutor object, and then handed him to deal with.

Since this is published by the Event loop, you can also wait for it to complete, loop.run_until_complete(future). Using this method, you can also implement a callback, but it blocks the execution of the waiting task, which is not what we intended.

By looking at the default executor object for a time loop, we can get rid of Asyncio and use executor directly to perform this task.

Change the method for publishing asynchronous tasks to

def delay(func: typing.Callable, app: Flask = None, *args, **kwargs):
    """ Publish a future async work """
    assert callable(func), "``func`` param has to be a callable function"
    executor = ThreadPoolExecutor(1)
    It can also be a process pool
    # executor = ProcessPoolExecutor()
    executor.submit(func, *args)
Copy the code

The experimental results will automatically exit and be recycled after the completion of thread execution

Callbacks cannot be implemented because the event loop does not monitor the progress status of the task.

One problem with calling other logic at the end of a task is that if you need to take other parameters, or data from code higher up in the task, you can share data between threads, use global variables, or use a queue to communicate. Let’s do it in a queue

def delay(func: typing.Callable, app: Flask = None, *args, **kwargs):
    """ Publish a future async work """
    assert callable(func), "``func`` param has to be a callable function"
    ThreadPoolExecutor is created by default, specifying a number of threads
    executor = ThreadPoolExecutor(1)
    q = Queue(1)
    Using global variables can also be shared between threads
    # global future
    future = current_app.event_loop.run_in_executor(
        executor, func, *[q]
        )
    # executor.submit(func, *[q])
    q.put(future)
Copy the code

Get this variable in the published thread task

def fake_work(*args, **kwargs):
    """ Simulate a long time cost work """
    logger.info('in longcost work function')
    logger.info(f'args: {args}')
    time.sleep(10)
    q = args[- 1]
    If the future is a global variable, you can use it here instead of fetching it from the queue
    future = q.get()
    callback(future)
    # del future
Copy the code

One thing to note is that non-blocking applications are published outside of flask’s request life in any way. Flask’s global objects request, G, and session cannot be used in the first place, nor can app.app_context() be used to create context. All raise RuntimeError. If an exception occurs in a non-blocking program, it needs to be caught separately. The corresponding error message may not be displayed in the log or console, but can be seen during debugging.

Since threads are created to do the work, I use Gunicorn to start Flask, simulate concurrent requests, and monitor the number of threads, with no obvious pitfalls. Tests will continue.

# works/dummy_work.py
import asyncio
import time
import typing
from concurrent.futures import ThreadPoolExecutor
from queue import Queue

from flask import Flask, current_app, request, g
from loguru import logger


def callback(future: asyncio.Future, *args, **kwargs):
    """ async function callback """
    logger.info(f'passed future object is {future}')
    del future


def delay(func: typing.Callable, app: Flask, *args, **kwargs):
    """ Publish a future async work """
    assert callable(func), "``func`` param has to be a callable function"
    # global future
    executor = ThreadPoolExecutor(1)
    q = Queue(1)
    # future = current_app.event_loop.run_in_executor(None, func, *[1, 2, 3], **kwargs)
    future = current_app.event_loop.run_in_executor(
        executor, func, *[kwargs['num'], q]
        )
    executor.submit(func, *[q])
    q.put(future)


def fake_work(*args, **kwargs):
    """ Simulate a long time cost work """
    logger.info('in longcost work function')
    logger.info(f'args: {args}')
    time.sleep(10)
    try:
        with current_app.app_context():
            logger.info(f'Request obj after a request life circle: {request}')
            logger.info(f'G obj after a request life circle: {g}')
    except RuntimeError:
        pass
    logger.info('finished longcost work')
    logger.info(f'args: {args}')
    q = args[- 1]
    future = q.get()
    callback(future)

Copy the code
# app.py
#! /usr/bin/env python
# A test for asyncio usage in flask app
import asyncio
import threading

from flask import Flask, current_app, jsonify, request
# from flask import g, request
import os
import sys
from loguru import logger
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
print(f'base dir {BASE_DIR}')
sys.path.append(f'{BASE_DIR}/app_async')
from works.dummy_work import delay, fake_work

app = Flask(__name__)


@app.route('/test', methods=['GET'])
def index(a):
    """ Test api """
    delay(fake_work, current_app, num=request.args.get('num'))
    # delay(async_work, current_app, num=request.args.get('num'))
    return jsonify({"msg": "ok"})


@app.route('/t')
def thread_count(a):
    count = len(threading.enumerate())
    return jsonify({'count': count})


app.event_loop = asyncio.get_event_loop()
if __name__ == "__main__":
    # app.event_loop = asyncio.get_event_loop()
    try:
        app.run(debug=True)
    except Exception as e:
        logger.error(f'Error happened: {e}')
    finally:
        app.event_loop.stop()
        app.event_loop.run_until_complete(app.event_loop.shutdown_asyncgens())
        app.event_loop.close()

Copy the code