preface

Flask, if you’ve ever used it, is the opposite of importing a module and calling it anywhere, thread-safe and isolated. In fact, a closer look at the internal source code, in fact, is not a dictionary + proxy mode Settings to achieve.

In the previous article, we did a simple wrapper around our GRPC, but did not introduce this pattern, and I used contextVars to manage the context. I did not implement a similar way of importing a module and exporting it to be called, which means I need to display the calls:

g.current_app_request.get()
Copy the code

This method is used to retrieve the current request

request
Copy the code

Can we implement something like flask mode that I’ll do

g.current_app_request.get()
Copy the code

Package it into a module and import it directly. This is actually the implementation of the proxy mode!

Of course: in fact, every service implementation in our framework already has request and Context passing

def SayHelloAgain(self, request, context):
Copy the code

As for why you need to package a similar global import module to use, mainly your own scene needs it, maybe if you carry out module separation, maybe you can use it! This depends on your personal habits!

Today’s main topics are:

  • 1: Implement a proxy mode similar to flask, which represents requests in our GRAP
  • 2: Using the more concise context module ContextVars in the proxy mode to realize the request in the global proxy GRAP

1: implement global proxy module similar to FLASWK

1.1 define local. Py

In this module, the main problems to be solved are:

  • To define Local, consider thread-safety issues, etc., where Local is equivalent to a dictionary, key is the current thread ID, or coroutine ID

  • Operations on Local data are equivalent to operations on data in threads, implementing data isolation between threads

  • __slots__ defines limited extension attributes for performance and memory savings

  • Release and clean up in time

  • Define a LocalStack, which is similar to a Local object except that its data structure is a stack, while Local is a dictionary

  • Define LocalProxy, which acts as the proxy of Local and forwards all operations to the actual data in Local.

PS: Object calls using proxies rather than explicitly: The main purpose is that if an object is used too frequently throughout the context request and throughout the request cycle, explicit passing can easily lead to circular imports, and it is convenient to use an object that requires a third party to decouple.

Complete code:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     locsasl
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/12/13
-------------------------------------------------
   修改描述-2021/12/13:         
-------------------------------------------------
"""


import copy
from threading import get_ident

try:
    from greenlet import getcurrent as get_ident
except ImportError:
    try:
        from thread import get_ident
    except ImportError:
        from _thread import get_ident


def release_local(local):
    '''
    清空相关的字典内的数据
    传入的是一个对象,对象里面调用的是它的一个叫做__release_local__()的方法
    :param local:
    :return:
    '''
    local.__release_local__()


class LocalStack:
    """
    存储本地参数的堆栈:
    LocalStack与Local对象类似,区别在于其数据结构是栈的形式,而Local是字典的形式
    Local相当于dict,LocalStack相当于栈
    """

    def __init__(self):
        self._local = Local()

    def __release_local__(self):
        self._local.__release_local__()

    @property
    def __ident_func(self):  # pylint: disable=unused-private-member
        return self._local.__ident_func__

    @__ident_func.setter
    def __ident_func__(self, value):
        object.__setattr__(self._local, "__ident_func__", value)

    def __call__(self):
        def _lookup():
            rv = self.top
            if rv is None:
                raise RuntimeError("Object unbond")
            return rv

        return LocalProxy(_lookup)

    def push(self, obj):
        """ 将一个数据入栈
        """
        # 判断字典线程字典里面是否存在stack的属性
        rv = getattr(self._local, "stack", None)
        if rv is None:
            self._local.stack = rv = []  # pylint: disable=assigning-non-slot
        rv.append(obj)
        return rv

    def pop(self):
        # 判断字典线程字典里面是否存在stack的属性
        stack = getattr(self._local, "stack", None)
        if stack is None:
            return None
        # 栈顶
        if len(stack) == 1:
            self._local.__release_local__()
            return stack[-1]

        return stack.pop()

    @property
    def top(self):
        """ 返回栈顶元素
        """
        try:
            return self._local.stack[-1]
        except (AttributeError, IndexError):
            return None


class Local:
    """ Local 相当于一个字典,key为当前线程ID,
    对Local中的数据进行操作,相当于对线程内的数据进行操作,实现了线程之间数据隔离,
    对key操作其实就是对:当前线程ID内获取到对应的dict存储空间
    # Python内置的字典本质是一个哈希表,它是一种用空间换时间的数据结构。为了解决冲突的问题,
    #当字典使用量超过2/3时,Python会根据情况进行2-4倍的扩容。
    #由此可预见,取消__dict__的使用可以大幅减少实例的空间消耗
    # ===================
    :问题点:
    --1:普通类身上时,使用__slots__后会丧失动态添加属性和弱引用的功能
    --2:当一个类需要创建大量实例时,可以使用__slots__来减少内存消耗
    --3:slots的特性来限制实例的属性
    """
    # 这个字典仅仅只能有这两个的属性存在
    __slots__ = ("__storage__", "__ident_func__")

    def __init__(self):
        object.__setattr__(self, "__storage__", {})
        object.__setattr__(self, "__ident_func__", get_ident)

    def __iter__(self):
        return iter(self.__storage__.items())

    # 当调用【Local对象】时,返回对应的LocalProxy
    def __call__(self, proxy):
        """ 创建某一个name的Proxy,
        返回一个代理的对象
        """
        print(f"返回一个代理的对象,创建某一个{proxy}的Proxy")
        return LocalProxy(self, proxy)

    def __release_local__(self):
        '''
        # Local类中特有的method,用于清空greenlet id或线程id对应的dict数据
        :return:
        '''
        self.__storage__.pop(self.__ident_func__(), None)

    def __getattr__(self, name):
        '''
        获取某线程内的某参数的执行信息
        :param name:
        :return:
        '''
        try:
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)  # pylint: disable=raise-missing-from

    def __setattr__(self, name, value):
        # 执行了__setattr__,创建了{ident:{stack:any}}
        ident = self.__ident_func__()
        storage = self.__storage__
        try:
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}

    def __delattr__(self, name):
        try:
            del self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)  # pylint: disable=raise-missing-from


class LocalProxy:
    """ LocalProxy 作为 Local 的代理,转发所有的操作到Local中的实际数据
    未封装前操作数据需要用 `local.xxx` 的语法进行操作,封装时候直接操作数据就行
    目的:
    使用代理而不是显式的对象的主要目的在于这四个对象使用太过频繁,贯穿整个请求周期,显式传递很容易造成循环导入的问题,需要一个第三方的对象来进行解耦
    """

    __slots__ = ("__local", "__dict__", "__name__", "__wrapped__")  # pylint: disable=class-variable-slots-conflict

    def __init__(self, local, name=None):
        # 是这代理对象代理的local对象
        object.__setattr__(self, "_LocalProxy__local", local)
        # 设置代码对象的名
        object.__setattr__(self, "__name__", name)
        # 1:被代理的对象必须是一个callable【callable() 函数用于检查一个对象是否是可调用的】
        # 2:被代理的对象必须有__release_local__的属性,释放内部的字典数据
        if callable(local) and not hasattr(local, "__release_local__"):
            object.__setattr__(self, "__wrapped__", local)

    def _get_current_object(self):
        """
        '''返回当前对象。如果出于性能原因您一次希望将真实对象放在代理后面,
       或者因为要将对象传递到不同的上下文,这将很有用。
       '''
        '''
        1.由于所有Local或LocalStack对象都有__release_local__ method, \
           所以如果没有该属性就表明self.__local为callable对象。
           2.当初始化参数为callable对象时,则直接调用以返回Local或LocalStack对象
           '''
        """
        # 如果被代码的对象没有__release_local__ 那么就的返回__local()实例化的对象
        if not hasattr(self.__local, "__release_local__"):
            return self.__local()
        try:
            # 此处self.__local为Local或LocalStack对象
            return getattr(self.__local, self.__name__)
        except AttributeError:
            raise RuntimeError(f"no object bond to {self.__name__}")  # pylint: disable=(raise-missing-from

    @property
    def __dict__(self):
        '''
        _get_current_object当前对象的字典序列化返回
        :return:
        '''
        try:
            return self._get_current_object().__dict__
        except RuntimeError:
            raise AttributeError("__dict__")  # pylint: disable=(raise-missing-from

    def __repr__(self):
        '''
         _get_current_object__repr__()方法:显示属性
        :return:
        '''
        try:
            obj = self._get_current_object()
        except RuntimeError:
            return "<%s unbond>" % self.__class__.__name__
        return repr(obj)

    def __bool__(self):
        try:
            return bool(self._get_current_object())
        except RuntimeError:
            return False

    def __dir__(self):
        '''
        返回当前对象的自省性格的属性信息
        :return:
        '''
        try:
            return dir(self._get_current_object())
        except RuntimeError:
            return []

    def __getattr__(self, name):
        if name == "__members__":
            return dir(self._get_current_object())
        return getattr(self._get_current_object(), name)

    def __setitem__(self, key, value):

        self._get_current_object()[key] = value

    def __delitem__(self, key):
        del self._get_current_object()[key]

    # 重载了绝大多数操作符,以便在调用LocalProxy的相应操作时,
    # 通过_get_current_object method来获取真正代理的对象,然后再进行相应操作
    __setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v)
    __delattr__ = lambda x, n: delattr(x._get_current_object(), n)
    __str__ = lambda x: str(x._get_current_object())
    __lt__ = lambda x, o: x._get_current_object() < o
    __le__ = lambda x, o: x._get_current_object() <= o
    __eq__ = lambda x, o: x._get_current_object() == o
    __ne__ = lambda x, o: x._get_current_object() != o
    __gt__ = lambda x, o: x._get_current_object() > o
    __ge__ = lambda x, o: x._get_current_object() >= o
    __hash__ = lambda x: hash(x._get_current_object())
    __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw)
    __len__ = lambda x: len(x._get_current_object())
    __getitem__ = lambda x, i: x._get_current_object()[i]
    __iter__ = lambda x: iter(x._get_current_object())
    __contains__ = lambda x, i: i in x._get_current_object()
    __add__ = lambda x, o: x._get_current_object() + o
    __sub__ = lambda x, o: x._get_current_object() - o
    __mul__ = lambda x, o: x._get_current_object() * o
    __floordiv__ = lambda x, o: x._get_current_object() // o
    __mod__ = lambda x, o: x._get_current_object() % o
    __divmod__ = lambda x, o: x._get_current_object().__divmod__(o)
    __pow__ = lambda x, o: x._get_current_object() ** o
    __lshift__ = lambda x, o: x._get_current_object() << o
    __rshift__ = lambda x, o: x._get_current_object() >> o
    __and__ = lambda x, o: x._get_current_object() & o
    __xor__ = lambda x, o: x._get_current_object() ^ o
    __or__ = lambda x, o: x._get_current_object() | o
    __div__ = lambda x, o: x._get_current_object().__div__(o)
    __truediv__ = lambda x, o: x._get_current_object().__truediv__(o)
    __neg__ = lambda x: -(x._get_current_object())
    __pos__ = lambda x: +(x._get_current_object())
    __abs__ = lambda x: abs(x._get_current_object())
    __invert__ = lambda x: ~(x._get_current_object())
    __complex__ = lambda x: complex(x._get_current_object())
    __int__ = lambda x: int(x._get_current_object())
    __float__ = lambda x: float(x._get_current_object())
    __oct__ = lambda x: oct(x._get_current_object())
    __hex__ = lambda x: hex(x._get_current_object())
    __index__ = lambda x: x._get_current_object().__index__()
    __coerce__ = lambda x, o: x._get_current_object().__coerce__(x, o)
    __enter__ = lambda x: x._get_current_object().__enter__()
    __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)
    __radd__ = lambda x, o: o + x._get_current_object()
    __rsub__ = lambda x, o: o - x._get_current_object()
    __rmul__ = lambda x, o: o * x._get_current_object()
    __rdiv__ = lambda x, o: o / x._get_current_object()
    __rtruediv__ = __rdiv__
    __rfloordiv__ = lambda x, o: o // x._get_current_object()
    __rmod__ = lambda x, o: o % x._get_current_object()
    __rdivmod__ = lambda x, o: x._get_current_object().__rdivmod__(o)
    __copy__ = lambda x: copy.copy(x._get_current_object())
    __deepcopy__ = lambda x, memo: copy.deepcopy(x._get_current_object(), memo)
Copy the code

1.2 Define Request. py to encapsulate Reques objects

The Reques object encapsulates the request context passed in our GRPC, encapsulates the method to facilitate subsequent calls, and our subsequent proxy object is an example of this object, so that we can call the method directly.

Complete code:

#! The/usr/bin/evn python # - * - coding: utf-8 - * - "" "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the file name: rc file function description: Founder of functional description: let students creation time: 2021/12/13 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - 2021/12/13 modify description: ------------------------------------------------- """ class Request: def __init__(self, request, context): Initializing Request instance :param Request: Original Request :param context: Self. Request = request self.context = context def headers(self): """ if self.rpc_event is not None: Getattr (self.rpc_event, "invocation_metadata") return dict(metadata) return None def method(self): """ Get the method called for this request """ if self.call_details is not None: Method = getattr(self.call_details, "method") return method.decode("utf8") if method else method return None def service(self): If self.method is not None: return self.method.split("/")[-2] return None def rpc_event(self): """ return RPC event """ if self.context is not None: Return getattr(self.context, "_rpc_event") return None def call_details(self): Return getattr(self.rpc_event, "call_details") return None if self.rpc_event is not NoneCopy the code

1.3 Define ctx.py and encapsulate RequestContext

Manage the squeezing and pushing of our Reques objects.

Code:

from grpcframe.grpclocal.request import Request from grpcframe.grpclocal.g import _request_ctx_stack import typing as t Def __init__(self, params=None, context=None) -> None: def __init__(self, params=None, context=None) -> None: Self. Request = request (params, Preserved = False self._preserved_exc = None def push(self) -> None: """ top = _request_ctx_stack. Top if top is not None and top. Preserved: Top.pop (top._preserved_exc) Self. request _request_ctx_stack.push(self) def pop(self, exc: T.optional [BaseException] = object()) -> None: """ rv = _request_ctx_stack.pop() if rv is not self: raise RuntimeError("Popped wrong request context") def __enter__(self) -> "RequestContext": self.push() return self def __exit__(self, exc_type, exc_val, exc_tb): self.pop(exc_val)Copy the code

1.4 define g.p y

This is mainly the module that defines the object we need to delegate. Because our context is fetched according to the context, the proxy we need is actually the Request instance object mentioned above, but this instance object is a different Request, so it is fetched from the stack.

So the whole agency process is:

  • _request_ctx_stack = LocalStack()
  • 2: Define a _lookup_req_object method that fetches a RequestContext object from the top of the _request_ctx_stack, We then return the request instance object of the RequestContext.
  • 3: use partial functions to help automatically pass parameters, define methods that need proxies, and the corresponding attribute values.

Complete code:

from grpcframe.grpclocal.local import LocalStack,LocalProxy from functools import partial from Grpcframe. Grpclocal. Request the import request # request context _request_ctx_stack = LocalStack (def) Top = _request_ctx_stack. Top if top is None: Raise RuntimeError("Working outside of request Context ") # "request" return getattr(top, Request is a proxy object that points to the request attribute of the RequestContext of the current request. It is an object frequently needed by the program's processing logic. # Stores almost all of the Request data information, which is essentially a Request object. The life cycle is the duration of a request, which is destroyed after the request is processed. #>>>-- "request"-- the request attribute of the RequestContext object that points to it -- prompts for the occurrence of methods :Request Request: request = LocalProxy(partial(_lookup_req_object, "request"))Copy the code

1.5 Define the instantiation method

Def request_context(self, request, context): """ return RequestContext(request, context)Copy the code

1.6 Carry out the instantiation of push and push

Using screenshots:

Instantiate in the default fixed base middleware (it is better to define an extra one to avoid multiple instantiations)

Complete code:

(ServerInterceptor, BaseAppMiddleware, metaclass= abc.abcMeta) @abc.abstractmethod def before_handler(self, request: Any, context: grpc.ServicerContext, method_name): raise NotImplementedError() @abc.abstractmethod def after_handler(self, context: grpc.ServicerContext, response): raise NotImplementedError() def intercept(self, method: Callable, request: Any, context: Grpc. ServicerContext, method_name: STR,) -> Any: # create context object CTX = None if self.app: CTX = self.app.request_context(request, context) try: Ctx.push () # Callback handles self.before_handler(request, context, method_name) Response = method(request, method_name) Self.after_handler (context, response) return response except GrpcException as e: context.set_code(e.status_code) context.set_details(e.details) raise finally: if self.app: ctx.pop()Copy the code

1.7 Importing Request Use it freely

request:Request = LocalProxy(partial(_lookup_req_object, "request"))
Copy the code

Such as:

2: Implementation of global proxy module similar to FLASWK based on ContextVars

2.1 Define our global managed objects

@dataclass class GManager(metaclass=Singleton): # current_app_server = contextvars.ContextVar('current_app_server', Default =None) # current_app_request = contextvars.ContextVar('current_app_request', Default =None) # current_app_context = contextvars.ContextVar('current_app_context', Active_tracer_span = contextvars.ContextVar(' current_active_tracer_SPAN ', default=None) g = GManager()Copy the code

2.2 Instantiate the context in the corresponding middleware

from grpcframe.core.middlewares import BaseMiddleware from typing import Any, Callable, NamedTuple, Tuple from grpcframe.core.app import g import grpc # @app.add_middleware class CxtgMiddleware(BaseMiddleware): def before_handler(self, request: Any, context: grpc.ServicerContext, method_name): Set (request) g.current_app_context.set(context) # create a context like flask object def after_handler(self, context: grpc.ServicerContext, response): passCopy the code

2.3 Defining proxy Operation Objects

Current_app_request.get ()

def get_current_app_request(g=None): Get () return g.current_app_request.get() # define a current_app_request object in the global proxy g, Grequest =LocalProxy(partial(get_current_app_request, g))Copy the code

2.4 Import grequest for use

A similar approach:

Def parse_request_action(self,request) -> Dict: param request: :return: ''' request_dict = json_format.MessageToDict(request, preserving_proto_field_name=True) return request_dictCopy the code

Pass in a global object for use:

conclusion

The above is just a personal combination of their own actual needs, do study practice notes! If there are clerical errors! Welcome criticism and correction! Thank you!

At the end

END

Jane: www.jianshu.com/u/d6960089b…

The Denver nuggets: juejin. Cn/user / 296393…

Public account: wechat search [children to a pot of wolfberry wine tea]

Let students | article | QQ: welcome to learn communication 】 【 308711822