1. Teasing fastAPI middleware design

In the process of building my own scaffolding, an example template I built myself looks like this structure.

But in the process of using middleware actually encountered a very loveless non-inductive experience, is our so-called middleware Fastapi custom if you actually apply it to our middleware, I’m sorry, but you will always to the effect of a “kaspersky”, has been waiting for, the process would have been unable to perform forever.

If you only use middleware to design our authentication, it will work very well. Of course, the premise is that your token must be placed in our request header for transmission. If you use other methods such as: GET or POST, I am sorry! Middleware – based authentication modes also encounter the dead-waiting effect described above.

Symptom 1: Route registration modes include APP and APIRouter

Specific practice examples are as follows:

import uvicorn from fastapi import FastAPI from pydantic import BaseModel from starlette.requests import Request from starlette.middleware.base import BaseHTTPMiddleware app = FastAPI() class LogerMiddleware1(BaseHTTPMiddleware): Async def dispatch(self, request: request, call_next): # Call perf_counter() once, randomly select A time point A from the computer system and calculate how many seconds it is from the current time point B1. # when the function is called the second time, the default is the number of seconds from the first call point A to the current point B2. # The difference between the two functions is realized from time point B1 to B2 timing function. # logger. The info (" 111111111111111111111111111 ") print (" I am ProcesMiddleware - 1 - middleware!!!!!!!!!" , request. Headers) print("ProcesMiddleware-- 1----! The request ID in: ", id(request)) response = await call_next(request) return response class AuthMiddleware2(BaseHTTPMiddleware): Async def dispatch(self, request: request, call_next): # Call perf_counter() once, randomly select A time point A from the computer system and calculate how many seconds it is from the current time point B1. # when the function is called the second time, the default is the number of seconds from the first call point A to the current point B2. # The difference between the two functions is realized from time point B1 to B2 timing function. # logger. The info (" 111111111111111111111111111 ") print (" I am ProcesMiddleware - 2 - middleware!!!!!!!!!!!" , request. Headers) print("ProcesMiddleware-- 2----! The request ID in: ", id(request)) response = await call_next(request) return response from fastapi import APIRouter bp = APIRouter(tags=[' admin '], prefix='/ API /v1') class AdminLogin(BaseModel): """ "Password: str = None username: str = None @bp.post("/one") async def sum_numbers(req: Request): Print ('one ', id(req)) return 'one' @bp.post("/two") async def sum_numbers(req: Request, sdsada: AdminLogin): Print ('two ', id(req)) return 'two' @app.post("/inapprsp") async def inapprsp(req: Request, sdsada: AdminLogin): print('two ', id(req)) return 'two' @app.post("/inapprsp") async def inapprsp(req: Request, sdsada: AdminLogin): Print ('inapprsp- inapprsp: ', Id (req)) return 'inapprsp' # Register middleware app.add_middleware(ProcesMiddleware1) app.add_middleware(ProcesMiddleware2) # Add routing group app.include_router(bp) if __name__ == '__main__': Uvicorn.run (app=app, host="127.0.0.1", port=5586, workers=1, reload=False, debug=False,)Copy the code

The corresponding interface is displayed as follows:

The above related interface request access is no problem! It all works!

But now the problem has arisen!

First of all, the above defines two middleware,

  • Suppose the first log middleware needs to read its own incoming req: Request Request information, such as json and body content (not the content of query parameters query_params)
  • The second authentication middleware also needs to read its own incoming Request message from REQ: Request

For example, the modified intermediate content is:

The main addition is to read the entire content of the code:

_body = await request.body()
Copy the code

Of course you can also read json()– but reading json() without any value is an exception!

You’ll be surprised when we ask for our interface again (any interface, even at startup)! All lines crashed! No one was spared!

Even if you want to start access, you’re stuck there!

When the request is cancelled:

    async for chunk in self.stream():
  File "C:\Users\mayn\.virtualenvs\fastapi_5g_msg\lib\site-packages\starlette\requests.py", line 214, in stream
    raise ClientDisconnect()
starlette.requests.ClientDisconnect
Copy the code

What the hell! So that makes a point!! This is a very serious problem with middleware fetching requests.

Symptom 2: Only APP is used for route registration (not APIRouter)

Modify the code to:

import uvicorn from fastapi import FastAPI from pydantic import BaseModel from starlette.requests import Request from starlette.middleware.base import BaseHTTPMiddleware app = FastAPI() class LogerMiddleware1(BaseHTTPMiddleware): Async def dispatch(self, request: request, call_next): # Call perf_counter() once, randomly select A time point A from the computer system and calculate how many seconds it is from the current time point B1. # when the function is called the second time, the default is the number of seconds from the first call point A to the current point B2. # The difference between the two functions is realized from time point B1 to B2 timing function. # logger. The info (" 111111111111111111111111111 ") print (" I am ProcesMiddleware - 1 - middleware!!!!!!!!!" , request.headers) _body = await request.body() # print(" I am ProcesMiddleware-- 1---- middleware! I read JSON content information: !!!!!!!!" , _body) print("ProcesMiddleware-- 1----! The request ID in: ", id(request)) response = await call_next(request) return response class AuthMiddleware2(BaseHTTPMiddleware): Async def dispatch(self, request: request, call_next): # Call perf_counter() once, randomly select A time point A from the computer system and calculate how many seconds it is from the current time point B1. # when the function is called the second time, the default is the number of seconds from the first call point A to the current point B2. # The difference between the two functions is realized from time point B1 to B2 timing function. # logger. The info (" 111111111111111111111111111 ") print (" I am ProcesMiddleware - 2 - middleware!!!!!!!!!!!" , request.headers) _body = await request.body() # print(" I am ProcesMiddleware-- 2---- middleware! I read JSON content information: !!!!!!!!" , _body) print("ProcesMiddleware-- 2----! The request ID in: ", id(request)) response = await call_next(request) return response from fastapi import APIRouter bp = APIRouter(tags=[' admin '], prefix='/ API /v1') class AdminLogin(BaseModel): """ "Password: str = None username: str = None @app.post("/one") async def sum_numbers(req: Request): Print ('one ', id(req)) return 'one' @app.post("/two") async def sum_numbers(req: Request, sdsada: AdminLogin): Print ('two ', id(req)) return 'two' @app.post("/inapprsp") async def inapprsp(req: Request, sdsada: AdminLogin): print('two ', id(req)) return 'two' @app.post("/inapprsp") async def inapprsp(req: Request, sdsada: AdminLogin): Print ('inapprsp- inapprsp: ', Id (REq)) return 'inapprsp' # Register middleware app.add_middleware(LogerMiddleware1) app.add_middleware(AuthMiddleware2) # Add routing group app.include_router(bp) if __name__ == '__main__': Uvicorn.run (app=app, host="127.0.0.1", port=5586, workers=1, reload=False, debug=False,)Copy the code

The main differences are:

  • 1: I have modified the route registration and only use APP

  • 2: The middleware has one and only one enablement for retrieving request information:

At this time to request our interface: sometimes can request success, sometimes and the above situation! Weird!!

Try to solve the problem, modify the definition in the middle:

Complete sample code:

#! The/usr/bin/evn python # - * - coding: utf-8 - * - "" "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the file name: ceshi file function description: Founder of functional description: let students creation time: 2021/6/9 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - 2021/6/9 modify description: ------------------------------------------------- """ import uvicorn from fastapi import FastAPI from pydantic import BaseModel from starlette.requests import Request from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.types import ASGIApp, Message, Receive, Scope, Send # Init server context app = FastAPI() class CommomMiddleware: def __init__( self, app: ASGIApp, ): self._app = app async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: if scope["type"] ! = "http": Await self._app(scope, receive, send) return # Create Request Request = Request(scope, receive=receive) print Start!!!!! My request-id is: ", id(request)) # After Request # span = await self._create_span(request=request) scope.setdefault("trace_ctx", '33333333333333333333333333333333') # if consumption again here, # sada =await request.body() status_code = 500 async def wrapped_send(message: message) -> None: if message['type'] == 'http.response.start': nonlocal status_code status_code = message['status'] await send(message) try: Await self._app(scope, receive, wrapped_send) except Exception as err: pass print(" new middleware!! Over!!" Class ProcesMiddleware1(BaseHTTPMiddleware): "" async def dispatch(self, request: Request, call_next): Call perf_counter() once, randomly select A time point A from the computer system, and calculate how many seconds it is from the current time point B1. # when the function is called the second time, the default is the number of seconds from the first call point A to the current point B2. # The difference between the two functions is realized from time point B1 to B2 timing function. # logger. The info (" 111111111111111111111111111 ") print (" I am ProcesMiddleware - 1 - middleware!!!!!!!!!" , request. Headers) print("ProcesMiddleware-- 1----! ID (request)) print("ProcesMiddleware--1----- middleware! The request: scope", request.scope.get('trace_ctx')) response = await call_next(request) return response class ProcesMiddleware2(BaseHTTPMiddleware): "" async def dispatch(self, request: request, call_next): Call perf_counter() once, randomly select A time point A from the computer system, and calculate how many seconds it is from the current time point B1. # when the function is called the second time, the default is the number of seconds from the first call point A to the current point B2. # The difference between the two functions is realized from time point B1 to B2 timing function. # logger. The info (" 111111111111111111111111111 ") print (" I am ProcesMiddleware - 2 - middleware!!!!!!!!!!!" , request. Headers) print("ProcesMiddleware-- 2----! ID (request)) print("ProcesMiddleware--2----- middleware! The request: scope", request.scope.get('trace_ctx')) response = await call_next(request) return response from fastapi import APIRouter bp = APIRouter (tags = [' rights management], prefix = '/ API/v1) bp. # route_class = ValidationErrorLoggingRoute class AdminLogin (BaseModel) : Password: STR = None username: STR = None @bp.post("/one") async def sum_numbers(req: Request): Print ('one ', id(req)) return 'one' @bp.post("/two") async def sum_numbers(req: Request, sdsada: AdminLogin): Print ('two ', id(req)) return 'two' @app.post("/inapprsp") async def inapprsp(req: Request, sdsada: AdminLogin): print('two ', id(req)) return 'two' @app.post("/inapprsp") async def inapprsp(req: Request, sdsada: AdminLogin): Print ('inapprsp- inapprsp: ', Id (req)) return 'inapprsp' app.add_middleware(ProcesMiddleware2) app.add_middleware(ProcesMiddleware1) # app.add_middleware(CommomMiddleware) app.include_router(bp) if __name__ == '__main__': Uvicorn.run (app=app, host="127.0.0.1", port=5586, workers=1, reload=False, debug=False,)Copy the code

The above code can only solve the problem of customizing a new intermediate solution to our request.scope:

 scope.setdefault("trace_ctx", '33333333333333333333333333333333')
Copy the code

Then all of our other middleware in the inner layer can be accessed from

Get the value, this can provide an idea for the follow-up!

But if we continue to try to consume our body, we will also block !!!! Waiting for death!

Problem analysis, analysis object memory address:

In the end, we comment on the Request of the code, look at their the req: Request memory address, can see a problem: such as visit: http://127.0.0.1:5586/inapprsp

Their middleware is not the same object, but the content of their request headers can be passed to each middleware.

So I read the related ISSess, the author himself admitted that there is a problem with this, github.com/tiangolo/fa…

As the author replies:

According to the author’s reply:

  • Once our req: Request has been consumed once, it’s gone if it’s not in the scope of the function!
  • Scenarios to be consumed include when reading the body, such as defining our basemodel

2: Use a custom route according to the author’s suggestions

All right, but The Fastapi design is still good. Hopefully, in the future, it won’t become ineffective because of a single purchase. Middleware like SANIC framework it can be passed on very well!

The scheme given by the author is as follows:

import time
from typing import Callable

from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute


class TimedRoute(APIRoute):
    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        async def custom_route_handler(request: Request) -> Response:
            before = time.time()
            
            # 这里可以获取的我们的请求的体的信息----
            
            response: Response = await original_route_handler(request)
            
            # 下面可以处理我们的响应体的报文信息
            
            duration = time.time() - before
            response.headers["X-Response-Time"] = str(duration)
            print(f"route duration: {duration}")
            print(f"route response: {response}")
            print(f"route response headers: {response.headers}")
            return response

        return custom_route_handler


app = FastAPI()
router = APIRouter(route_class=TimedRoute)


@app.get("/")
async def not_timed():
    return {"message": "Not timed"}


@router.get("/timed")
async def timed():
    return {"message": "It's the time of my life"}


app.include_router(router)
Copy the code

Then we need to modify our custom_route_handler to solve our logging problem, but note that we need to replace it:

router = APIRouter(route_class=TimedRoute)
Copy the code

or

router = APIRouter()
router.route_class=TimedRoute
Copy the code

In my own scaffolding framework is a batch import registration:

Finally, MY own code for a custom APIRouter based logging behavior is as follows:

from time import perf_counter from loguru import logger from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import StreamingResponse import uuid import json from apps.utils import json_helper from apps.utils import aiowrap_helper from fastapi import APIRouter, FastAPI, Request, Response, Body from fastapi.routing import APIRoute from typing import Callable, List from fastapi.responses import Response class ContextIncludedRoute(APIRoute): @staticmethod def sync_trace_add_log_record(request: Request, event_des='', msg_dict={}, remarks=''): ":param event_des: logs Event description :param msg_dict: logs dictionary: Param Remarks: logs Information :return: ''' # request.request_links_index = request.request_links_index + 1 if hasattr(request, 'traceid'): Log = {# define a new parameter to copy to our request context object 'traceid': getattr(request, 'traceid'), # define link so serial number 'trace_index': Getattr (request, 'trace_links_index'), # event_des: event_des, # msg_dict: If not remarks: log.pop('remarks') if not msg_dict: Log.pop ('msg_dict') try: log_msg = json_helper.dict_to_json_ensure_ASCII (log) # return text logger.info(log_msg) except: Logger. info(getattr(request, 'traceid') + ': ' + str(getattr(request, 'trace_links_index')) + ': log message write exception ') # encapsulate logging about logging serial number log @staticMethod async def for all link log requests async_trace_add_log_record(request: Request, event_des='', msg_dict={}, remarks=''): ''' :param event_des: Description of the log recording event :param msg_dict: log recording information Dictionary: Param Remarks: log Remarks :return: Request. request_links_index = request. links_index + 1 If hasattr(request, 'traceid'): log = {# define a new parameter to copy to the object in our request context 'traceid': Getattr (request, 'traceid') : getattr(request, 'trace_links_index') : Event_des, # log content details 'msg_dict': msg_dict, # log information 'remarks': Remarks} # Delete unnecessary empty log content information, if not remarks: log.pop('remarks') if not msg_dict: log.pop('msg_dict') try: Log_msg = json_helper.dict_to_json_ensure_ASCII Logger. info(getattr(request, 'traceid') + ': '+ STR (getattr(request, 'trace_links_index')) + ': log writing exception ') async def _init_trace_start_log_record(self, request: Request): "" Request record initialization :return: ''' path_info = request.url.path if path_info not in ['/favicon.ico'] and 'websocket' not in path_info: if request.method ! = 'OPTIONS': Request. Trace_links_index = 0 request. Traceid = STR (uuid.uuid4()).replace('-', Request.start_time = perf_counter() Request.url. path # print('scope', request.scope) # try: body_form = await request.form() except: body_form = None body =None try: body_bytes = await request.body() if body_bytes: try: body = await request.json() except: pass if body_bytes: try: body =body_bytes.decode('utf-8') except: body = body_bytes.decode('gb2312') except: pass log_msg = { # 'headers': str(request.headers), # 'user_agent': STR (request.user_agent), # request. Headers if STR (request.headers) else ", # request. Url, # record request request method' method': method, # record request source IP 'IP ': IP, # 'path': request.path, # record request request parameters 'params': {'query_params': '' if not request.query_params else request.query_params, 'from': body_form, 'body': Body}, # record the request start time # 'start_time': F '{(start_time)}',} # await self.async_trace_add_log_record(request, event_des=' start ', event_des=' start ', msg_dict=log_msg) async def _init_trace_end_log_record(self, request: Request, response: Response): # https://stackoverflow.com/questions/64115628/get-starlette-request-body-in-the-middleware-context if hasattr(request, 'traceid'): start_time = getattr(request, 'start_time') end_time = f'{(perf_counter() -start_time):.2f}' resp_body = None if Isinstance (response, response): resp_body = STR (response.body) log_msg = {# record request time' cost_time': End_time, # record the final message of the request response 'resp_body': Resp_body} await self.async_trace_add_log_record(request, event_des=' request end ', msg_dict=log_msg) def get_route_handler(self) -> Callable: original_route_handler = super().get_route_handler() async def custom_route_handler(request: Request) -> Response: # request_id = STR (uuid4()) # await self._init_trace_start_log_record(request) response: Response = await original_route_handler(request) # await self._init_trace_end_log_record(request, response) # # if await request.body(): # print(await request.body()) return response return custom_route_handlerCopy the code

3: comprehensive plan a thinking and practice

Before, I used another way of custom middleware, which was actually just to expand and fill one of our request.scope. Based on this idea, we can think of another one. So I have a new idea:

  • The custom route reads our body or js and writes our request traceid and other information that needs to be passed to our request.scope.
  • Then the outermost middleware reads our relevant request.scope and writes information such as traceid

First customize our ContextIncludedRoute:

from time import perf_counter from loguru import logger from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from starlette.responses import StreamingResponse import uuid import json from apps.utils import json_helper from apps.utils import aiowrap_helper from fastapi import APIRouter, FastAPI, Request, Response, Body from fastapi.routing import APIRoute from typing import Callable, List from fastapi.responses import Response class ContextIncludedRoute(APIRoute): def get_route_handler(self) -> Callable: original_route_handler = super().get_route_handler() async def custom_route_handler(request: Request) -> Response: # request_id = STR (uuid4()) # await self._init_trace_start_log_record(request) try: body_form = await request.form() except: body_form = None body = None try: body_bytes = await request.body() if body_bytes: try: body = await request.json() except: pass if body_bytes: try: body = body_bytes.decode('utf-8') except: body = body_bytes.decode('gb2312') except: pass request.scope.setdefault("traceid", str(uuid.uuid4()).replace('-', '')) request.scope.setdefault("trace_links_index", 0) request.scope.setdefault("requres_body_form", Body_form) request.scope.setdefault("requres_body", body) Response = await original_route_handler(request) return response return custom_route_handlerCopy the code

The middleware that defines our logging reads the relevant information to record, and at the end records the request result information. Feel this more or less will point round and round the feeling! But you can modify it according to your own needs!

4:

  • The Request context object can only be consumed once. I really hope to solve this problem in the future, otherwise it will be a bit of a shock. In most cases, our logging generally does not make relevant records in the application layer, very special cases, my personal business situation, it is necessary to do the relevant request information acquisition, so the demand for logging in the middle is very strong!

At the end

Simple notes! For reference only!

END

Jane: www.jianshu.com/u/d6960089b…

The Denver nuggets: juejin. Cn/user / 296393…

Public account: wechat search [children to a pot of wolfberry wine tea]

Let students | article | welcome learning exchange together 】 【 original 】 【 QQ: 308711822