Hi, I’m Sean, read the source code and I’ll see you on Thursday.
Werkzeug is a comprehensive WSGI Web application library. It started as a simple collection of various WSGI utility tools and has become one of the most advanced WSGI utility libraries, the project behind Flask. Werkzeug is a German word meaning tool. This word is a little hard for me to pronounce (and probably one of the reasons it’s not so well known), but it just so happens that the official logo is a hammer, so I’ll just call it “The German Hammer” for short. For those of you interested in pronouncing Werkzeug correctly, check out the reference links at the bottom. This paper is divided into the following parts:
- The profile
- serving && wsgi
- request && response
- Local implementation
The profile
This code version is 2.0.0, and the main structure of the project is as follows:
file | describe |
---|---|
serving | Implementation of HTTP services and WSGI specifications |
request && response | Request and response processing |
local | Multithreaded partial implementation |
middleware | Middleware part implementation |
routing && urls | Routing and URL processing |
datastuctures | The data structure |
The “German Hammer” project is very important, so I will try to read the project thoroughly by using general and slow reading methods. The paper is divided into two parts. This is the first part, which introduces the first three parts.
Before we begin, let’s review HTTP services and WSGi-Application.
A brief review of HTTP services:
# HTTP/server. Py def test (HandlerClass = SimpleHTTPRequestHandler, ServerClass = HTTPServer, protocol = "HTTP / 1.0", port=8000, bind=""): server_address = (bind, port) HandlerClass.protocol_version = protocol with ServerClass(server_address, HandlerClass) as httpd: sa = httpd.socket.getsockname() serve_message = "Serving HTTP on {host} port {port} (http://{host}:{port}/) ..." print(serve_message.format(host=sa[0], port=sa[1])) try: httpd.serve_forever() except KeyboardInterrupt: print("\nKeyboard interrupt received, exiting.") sys.exit(0) # self.rfile.readline(65537) # self.wfile.write(body)Copy the code
- HTTPServer is responsible for implementing HTTP services.
- SimpleHTTPRequestHandler handles HTTP requests.
- The request and response are on the IO rfile and Wfile.
Wsgi-application review:
# wsgiref/simple_server.py def demo_app(environ,start_response): from io import StringIO stdout = StringIO() print("Hello world!" , file=stdout) print(file=stdout) h = sorted(environ.items()) for k,v in h: print(k,'=',repr(v), file=stdout) start_response("200 OK", [('Content-Type','text/plain; charset=utf-8')]) return [stdout.getvalue().encode("utf-8")] def make_server( host, port, app, server_class=WSGIServer, handler_class=WSGIRequestHandler ): """Create a new WSGI server listening on `host` and `port` for `app`""" server = server_class((host, port), handler_class) server.set_app(app) return server if __name__ == '__main__': with make_server('', 8000, demo_app) as httpd: sa = httpd.socket.getsockname() print("Serving HTTP on", sa[0], "port", sa[1], "..." ) import webbrowser webbrowser.open('http://localhost:8000/xyz? abc') httpd.handle_request() # serve one request, then exitCopy the code
- WSGIServer implements HTTP services that conform to the WSGI specification
- WSGIRequestHandler implements wsGI requests
- Wsgi-application is responsible for implementing WSGI applications
- The application gets the request data from environ, processes the HTTP response header using the start_Response callback function, and returns the request data using the return value
serving
The Serving module provides the service entry and uses argparse to handle the command-line tools:
def main() -> None:
"""A simple command-line interface for :py:func:`run_simple`."""
import argparse
...
run_simple(
hostname=hostname or "127.0.0.1",
port=int(port or 5000),
application=import_string(args.application),
use_reloader=args.reload,
use_debugger=args.debug,
)
Copy the code
- The server IP address and port are the main parameters. The application can be the external module name, which is automatically loaded.
Responsible for creating the service:
def make_server(
host: str,
port: int,
app: "WSGIApplication",
threaded: bool = False,
processes: int = 1,
request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
passthrough_errors: bool = False,
ssl_context: t.Optional[_TSSLContextArg] = None,
fd: t.Optional[int] = None,
) -> BaseWSGIServer:
if threaded:
return ThreadedWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
elif processes > 1:
return ForkingWSGIServer(
host, port, app, processes, request_handler, passthrough_errors, ssl_context,fd=fd,
)
else:
return BaseWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
Copy the code
- Depending on the parameters, you can create multithreaded, multiprocess, or normal services
Multi-threaded and multi-process services are combined using mixins:
class ThreadedWSGIServer(socketserver.ThreadingMixIn, BaseWSGIServer):
multithread = True
daemon_threads = True
class ForkingWSGIServer(ForkingMixIn, BaseWSGIServer):
multiprocess = True
Copy the code
The basic implementation of WSGIServer:
class BaseWSGIServer(HTTPServer):
request_queue_size = LISTEN_QUEUE
def __init__(
self,
host: str,
port: int,
app: "WSGIApplication",
handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
passthrough_errors: bool = False,
ssl_context: t.Optional[_TSSLContextArg] = None,
fd: t.Optional[int] = None,
) -> None:
...
Copy the code
- Note that BaseWSGIServer is inherited fromHTTPServerThe WSGIREF module is not used
The main implementation is in WSGIRequestHandler handling requests:
class WSGIRequestHandler(BaseHTTPRequestHandler):
"""A request handler that implements WSGI dispatching."""
def handle_one_request(self) -> None:
"""Handle a single HTTP request."""
self.raw_requestline = self.rfile.readline()
...
self.parse_request():
self.run_wsgi()
Copy the code
Each request executes the corresponding WSGI implementation:
def run_wsgi(self) -> None:
self.environ = environ = self.make_environ()
status_set: t.Optional[str] = None
headers_set: t.Optional[t.List[t.Tuple[str, str]]] = None
def write(data: bytes) -> None:
self.wfile.write(data)
self.wfile.flush()
def start_response(status, headers, exc_info=None): # type: ignore
nonlocal status_set, headers_set
...
status_set = status
headers_set = headers
return write
def execute(app: "WSGIApplication") -> None:
application_iter = app(environ, start_response)
try:
for data in application_iter:
write(data)
if not headers_sent:
write(b"")
finally:
if hasattr(application_iter, "close"):
application_iter.close() # type: ignore
execute(self.server.app)
Copy the code
- Generates wsGI environ
- Generate the start_response callback method
- Execute app, pass in env and start_response callbacks, then iterate over the results and write to wfile
Make_environ converts the requested data read to env:
def make_environ(self) -> "WSGIEnvironment":
environ: "WSGIEnvironment" = {
"wsgi.version": (1, 0),
"wsgi.url_scheme": url_scheme,
"wsgi.input": self.rfile,
"wsgi.errors": sys.stderr,
"wsgi.multithread": self.server.multithread,
"wsgi.multiprocess": self.server.multiprocess,
"wsgi.run_once": False,
"werkzeug.server.shutdown": shutdown_server,
"werkzeug.socket": self.connection,
"SERVER_SOFTWARE": self.server_version,
"REQUEST_METHOD": self.command,
"SCRIPT_NAME": "",
"PATH_INFO": _wsgi_encoding_dance(path_info),
"QUERY_STRING": _wsgi_encoding_dance(request_url.query),
# Non-standard, added by mod_wsgi, uWSGI
"REQUEST_URI": _wsgi_encoding_dance(self.path),
# Non-standard, added by gunicorn
"RAW_URI": _wsgi_encoding_dance(self.path),
"REMOTE_ADDR": self.address_string(),
"REMOTE_PORT": self.port_integer(),
"SERVER_NAME": self.server.server_address[0],
"SERVER_PORT": str(self.server.server_address[1]),
"SERVER_PROTOCOL": self.request_version,
}
return environ
Copy the code
request && response
Request && Response is implemented in two layers. The bottom layer is a pure logical structure in sansio package. The upper layer contains the wsGI implementation in the Wrappers package.
sansio.Request && sansio-Response
Sansio. Request constructor. This class has important comments. I posted the original:
class Request:
"""Represents the non-IO parts of a HTTP request, including the
method, URL info, and headers.
This class is not meant for general use. It should only be used when
implementing WSGI, ASGI, or another HTTP application spec. Werkzeug
provides a WSGI implementation at :cls:`werkzeug.wrappers.Request`.
"""
def __init__(
self,
method: str,
scheme: str,
server: t.Optional[t.Tuple[str, t.Optional[int]]],
root_path: str,
path: str,
query_string: bytes,
headers: Headers,
remote_addr: t.Optional[str],
) -> None:
...
Copy the code
Sansio. Request is an HTTP Request implementation of the non-IO concept, which expects IO and logic to be sandwiched between in-IO/ business logic/out-of-IO layers. The Request object implemented in this way is abstract, does not involve IO and AIO concrete implementation, more general, and can be quickly tested. If wsgi implementation, it is recommended to use the upper werkzeug. The wrappers. The Request.
For those interested in non-IO, see the reference link.
Sansio. Request is implemented in a simpler data model, with familiar values and assignments for fields. More distinctive are the three implementations. The cached_property decorator wraps the property first:
@cached_property def full_path(self) -> str: """Requested path, including the query string.""" return f"{self.path}? {_to_str(self.query_string, self.url_charset)}"Copy the code
Combining the decorator name with the function implementation, you can see that this property is cached after only one evaluation to improve performance. Then there are the properties defined by the header_property method:
content_type = header_property[str](
"Content-Type",
doc="""The Content-Type entity-header field indicates the media
type of the entity-body sent to the recipient or, in the case of
the HEAD method, the media type that would have been sent had
the request been a GET.""",
read_only=True,
)
Copy the code
Get (“old”, type=int) to retrieve HTTP request parameters:
parameter_storage_class: t.Type[MultiDict] = ImmutableMultiDict @cached_property def args(self) -> "MultiDict[str, str]": """The parsed URL parameters (the part in the URL after the question mark). By default an :class:`~werkzeug.datastructures.ImmutableMultiDict` is returned from this function. This can be changed by setting :attr:`parameter_storage_class` to a different type. This might be necessary if the order of the form data is important. """ return url_decode( self.query_string, self.url_charset, errors=self.encoding_errors, cls=self.parameter_storage_class, )Copy the code
Request data needs to be immutable and relies heavily on the data structure of ImmutableMultiDict, which is implemented in more detail in the next article.
Sansio-response is similar to sansio-Response:
class Response:
def __init__(
self,
status: t.Optional[t.Union[int, str, HTTPStatus]] = None,
headers: t.Optional[
t.Union[
t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]],
t.Iterable[t.Tuple[str, t.Union[str, int]]],
]
] = None,
mimetype: t.Optional[str] = None,
content_type: t.Optional[str] = None,
) -> None:
...
@property
def status_code(self) -> int:
"""The HTTP status code as a number."""
return self._status_code
@status_code.setter
def status_code(self, code: int) -> None:
self.status = code # type: ignore
Copy the code
wrappers.Request
Wrappers Request and Response are a bit more complicated, so let’s read them separately. Wrappers.Request:
class Request(_SansIORequest): """Represents an incoming WSGI HTTP request, with headers and body taken from the WSGI environment. Has properties and methods for using the functionality defined by various HTTP specs. The data in requests object is read-only. """Copy the code
Request inherits from sansio.Request, and the comment details its functions and features (read-only).
The constructor can be seen as built using env and includes the familiar properties of method, scheme, query_sring and so on in HTTP requests:
def __init__(
self,
environ: "WSGIEnvironment",
populate_request: bool = True,
shallow: bool = False,
) -> None:
super().__init__(
method=environ.get("REQUEST_METHOD", "GET"),
scheme=environ.get("wsgi.url_scheme", "http"),
server=_get_server(environ),
root_path=_wsgi_decoding_dance(
environ.get("SCRIPT_NAME") or "", self.charset, self.encoding_errors
),
path=_wsgi_decoding_dance(
environ.get("PATH_INFO") or "", self.charset, self.encoding_errors
),
query_string=environ.get("QUERY_STRING", "").encode("latin1"),
headers=EnvironHeaders(environ),
remote_addr=environ.get("REMOTE_ADDR"),
)
self.environ = environ
...
Copy the code
While pure Query is simple, let’s take a look at the more complex form implementation. The form part of the business API looks something like this:
def on_new_url(self, request):
error = None
url = ""
if request.method == "POST":
url = request.form["url"]
...
Copy the code
Wrappers.Request’s form is also cached_property, which improves efficiency, while the form is parsed using FormDataParser:
form_data_parser_class: t.Type[FormDataParser] = FormDataParser
@cached_property
def form(self) -> "ImmutableMultiDict[str, str]":
self._load_form_data()
return self.form # type: ignore
def _load_form_data(self) -> None:
...
parser = self.form_data_parser_class(
self._get_file_stream,
self.charset,
self.encoding_errors,
self.max_form_memory_size,
self.max_content_length,
self.parameter_storage_class,
)
...
data = parser.parse(
self._get_stream_for_parsing(),
self.mimetype,
self.content_length,
self.mimetype_params,
)
d = self.__dict__
d["stream"], d["form"], d["files"] = data
Copy the code
Here is a rough implementation of FormDataParser:
# formparser.py class FormDataParser def parse_from_environ(self, environ: "WSGIEnvironment") -> "t_parse_result": """Parses the information from the environment as form data. :param environ: the WSGI environment to be used for parsing. :return: A tuple in the form ``(stream, form, files)``. """ content_type = environ.get("CONTENT_TYPE", "") content_length = get_content_length(environ) mimetype, options = parse_options_header(content_type) return self.parse(get_input_stream(environ), mimetype, content_length, The options)Copy the code
wrappers.Response
Wrappers.Response is similar to wrappers.Request, inherited from sansio.Response:
class Response(_SansIOResponse):
"""Represents an outgoing WSGI HTTP response with body, status, and
headers. Has properties and methods for using the functionality
defined by various HTTP specs.
...
The response object is itself a WSGI application callable. When
called (:meth:`__call__`) with ``environ`` and ``start_response``,
it will pass its status and headers to ``start_response`` then
return its body as an iterable"""
Copy the code
The wrappers.Response comment also highlights the use of Response. Let’s look at the following example:
from werkzeug.wrappers.response import Response def index(): return Response("Hello, World!" ) def application(environ, start_response): path = environ.get("PATH_INFO") or "/" if path == "/": response = index() else: response = Response("Not Found", status=404) return response(environ, start_response)Copy the code
As you can see in the example, a Response object is generated for each request, and the call method of this object is executed and returned using the environ and start_Response parameters.
Constructor of wrappers.Response:
def __init__(
self,
response: t.Optional[
t.Union[t.Iterable[bytes], bytes, t.Iterable[str], str]
] = None,
status: t.Optional[t.Union[int, str, HTTPStatus]] = None,
headers: t.Optional[
t.Union[
t.Mapping[str, t.Union[str, int, t.Iterable[t.Union[str, int]]]],
t.Iterable[t.Tuple[str, t.Union[str, int]]],
]
] = None,
mimetype: t.Optional[str] = None,
content_type: t.Optional[str] = None,
direct_passthrough: bool = False,
) -> None:
super().__init__(
status=status,
headers=headers,
mimetype=mimetype,
content_type=content_type,
)
...
if response is None:
self.response = []
elif isinstance(response, (str, bytes, bytearray)):
self.set_data(response)
else:
self.response = response
Copy the code
Key call methods and related handlers:
def __call__(
self, environ: "WSGIEnvironment", start_response: "StartResponse"
) -> t.Iterable[bytes]:
"""Process this response as WSGI application.
:param environ: the WSGI environment.
:param start_response: the response callable provided by the WSGI
server.
:return: an application iterator
"""
app_iter, status, headers = self.get_wsgi_response(environ)
start_response(status, headers)
return app_iter
def get_app_iter(self, environ: "WSGIEnvironment") -> t.Iterable[bytes]:
status = self.status_code
if (
environ["REQUEST_METHOD"] == "HEAD"
or 100 <= status < 200
or status in (204, 304)
):
iterable: t.Iterable[bytes] = ()
elif self.direct_passthrough:
return self.response # type: ignore
else:
iterable = self.iter_encoded()
return ClosingIterator(iterable, self.close)
def get_wsgi_response(
self, environ: "WSGIEnvironment"
) -> t.Tuple[t.Iterable[bytes], str, t.List[t.Tuple[str, str]]]:
headers = self.get_wsgi_headers(environ)
app_iter = self.get_app_iter(environ)
return app_iter, self.status, headers.to_wsgi_list()
Copy the code
Basically, wsGI’s Response is converted into status, header, and result iterators for WSGi-Server.
Local implementation
Local is a very important module of the German Hammer. Take a look at an example of the standard threading. Local implementation:
import threading
import logging
import random
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-0s) %(message)s',)
def show(d):
try:
val = d.val
except AttributeError:
logging.debug('No value yet')
else:
logging.debug('value=%s', val)
def f(d):
show(d)
d.val = random.randint(1, 100)
show(d)
if __name__ == '__main__':
d = threading.local()
show(d)
d.val = 999
show(d)
for i in range(2):
t = threading.Thread(target=f, args=(d,))
t.start()
Copy the code
The value of the same variable d is different for different threads:
(MainThread) No value yet
(MainThread) value=999
(Thread-1) No value yet
(Thread-1) value=56
(Thread-2) No value yet
(Thread-2) value=38
Copy the code
Threading. Local addresses two main problems:
- Data isolation between threads
- The code is simple to write, and you only need to define a variable
In fact, to achieve thread isolation, you can use a dictionary, such as the value of each thread plus the thread ID as the key to distinguish. German Hammer local uses this idea. Also to support the implementation of greenlets, or coroutines, a new local was implemented instead of using threading. Local directly.
try:
from greenlet import getcurrent as _get_ident
except ImportError:
from threading import get_ident as _get_ident
Copy the code
The official documentation for threading. Get_ident is as follows:
threading.get_ident()
Returns the Thread identifier for the current thread. It’s a non-zero integer. Its value has no immediate meaning and is primarily used as a Magic cookie, such as an index to a dictionary containing thread-related data. Thread identifiers may be reused when a thread exits and a new thread is created.
New in version 3.3.
The base of local is the ContextVar class:
class ContextVar: # type: Ignore """A fake ContextVar based on the previous greenlet/threading ident function. and old versions of gevent. """ def __init__(self, _name: str) -> None: self.storage: t.Dict[int, t.Dict[str, t.Any]] = {} def get(self, default: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: return self.storage.get(_get_ident(), default) def set(self, value: t.Dict[str, t.Any]) -> None: self.storage[_get_ident()] = valueCopy the code
- ContextVar defines a dictionary of secondary structures where a key is the thread/coroutine identifier, thus allowing thread/coroutine data isolation.
The Local implementation is primarily a _storage property using the ContextVar object:
class Local:
__slots__ = ("_storage",)
def __init__(self) -> None:
object.__setattr__(self, "_storage", ContextVar("local_storage"))
def __getattr__(self, name: str) -> t.Any:
values = self._storage.get({})
try:
return values[name]
except KeyError:
raise AttributeError(name)
def __setattr__(self, name: str, value: t.Any) -> None:
values = self._storage.get({}).copy()
values[name] = value
self._storage.set(values)
Copy the code
- Note that each time the value is set, it is copied and then modified (why? Welcome interactive discussion).
A simple stack is implemented using Local:
class LocalStack
def __init__(self) -> None:
self._local = Local()
def push(self, obj: t.Any) -> t.List[t.Any]:
"""Pushes a new item to the stack"""
rv = getattr(self._local, "stack", []).copy()
rv.append(obj)
self._local.stack = rv
return rv # type: ignore
def pop(self) -> t.Any:
"""Removes the topmost item from the stack, will return the
old value or `None` if the stack was already empty.
"""
stack = getattr(self._local, "stack", None)
if stack is None:
return None
elif len(stack) == 1:
release_local(self._local)
return stack[-1]
else:
return stack.pop()
Copy the code
LocalManager is used to manage all local data.
class LocalManager:
"""Local objects cannot manage themselves. For that you need a local
manager. You can pass a local manager multiple locals or add them
later y appending them to `manager.locals`. Every time the manager
cleans up, it will clean up all the data left in the locals for this
context
"""
def __init__(
self,
locals: t.Optional[t.Iterable[t.Union[Local, LocalStack]]] = None,
ident_func: None = None,
) -> None:
if locals is None:
self.locals = []
elif isinstance(locals, Local):
self.locals = [locals]
else:
self.locals = list(locals)
...
def cleanup(self) -> None:
"""Manually clean up the data in the locals for this context. Call
this at the end of the request or use `make_middleware()`.
"""
for local in self.locals:
release_local(local)
Copy the code
How to use local? Using singletons, here is an example from Flask:
# flask-globals
# context locals
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
Copy the code
The _request_CTX_STACK is a thread-safe global variable that can be read anywhere in a business process without passing data around.
Refer to the link
- Werkzeug document werkzeug.palletsprojects.com/en/2.0.x/
- Python technical term pronunciation guide zhuanlan.zhihu.com/p/320457692 (PyCon China 2020 speech)
- Sans I/O programming (PyCon UK talk) alexwlchan.net/2019/10/san…