The last few articles
- Python concurrency 1: Handle concurrency using futures
- Python concurrency 2: Use asyncio to handle concurrency
Asyncio Last article we introduced the Asyncio package and how to use asynchronous programming to manage high concurrency in network applications. In this article, we focus on two examples of programming with the Asyncio package.
Async/await syntax
Coroutine decorator asyncio. Coroutine decorator yield from. Here we use async and await.
Python concurrency 2: Use asyncio to handle concurrency
Async /await is the new syntax for Python3.5, which looks like this:
async def read_data(db):
passCopy the code
Async is a keyword that explicitly declares a function as a coroutine, even if there is no await expression, the function execution returns a coroutine object. Inside coroutine functions, we can use the await keyword before an expression to suspend execution of a coroutine until it completes:
async def read_data(db):
data = await db.fetch('SELECT ... ')Copy the code
This code has the syntax asyncio.coroutine decorator:
@asyncio.coroutine
def read_data(db):
data = yield from db.fetch('SELECT ... ')Copy the code
The result of both code execution is the same, that is, asyncio.coroutine can be replaced with async, yield from replaced with await.
What are the benefits of using the new syntax:
- Makes the concepts of generators and coroutines easier to understand because of the different syntax
- You can eliminate ambiguous errors caused by accidentally removing the yield declaration from the coroutine during refactoring, which causes the coroutine to become a normal generator.
Write the server using the Asyncio package
This example uses the Asyncio package and the UnicodeData module to find Unicode characters by canonical names.
Let’s look at the code first:
# charfinder.py
import sys
import re
import unicodedata
import pickle
import warnings
import itertools
import functools
from collections import namedtuple
RE_WORD = re.compile('\w+')
RE_UNICODE_NAME = re.compile('^[A-Z0-9 -]+$')
RE_CODEPOINT = re.compile('U\+[0-9A-F]{4, 6}')
INDEX_NAME = 'charfinder_index.pickle'
MINIMUM_SAVE_LEN = 10000
CJK_UNI_PREFIX = 'CJK UNIFIED IDEOGRAPH'
CJK_CMP_PREFIX = 'CJK COMPATIBILITY IDEOGRAPH'
sample_chars = [
'$'.# DOLLAR SIGN
'A'.# LATIN CAPITAL LETTER A
'a'.# LATIN SMALL LETTER A
'\u20a0'.# EURO-CURRENCY SIGN
'\u20ac'.# EURO SIGN
]
CharDescription = namedtuple('CharDescription'.'code_str char name')
QueryResult = namedtuple('QueryResult'.'count items')
def tokenize(text):
''' :param text: :return: return iterable of uppercased words '''
for match in RE_WORD.finditer(text):
yield match.group().upper()
def query_type(text):
text_upper = text.upper()
if 'U+' in text_upper:
return 'CODEPOINT'
elif RE_UNICODE_NAME.match(text_upper):
return 'NAME'
else:
return 'CHARACTERS'
class UnicodeNameIndex:
# Unicode name index class
def __init__(self, chars=None):
self.load(chars)
def load(self, chars=None):
# Loading Unicode name
self.index = None
if chars is None:
try:
with open(INDEX_NAME, 'rb') as fp:
self.index = pickle.load(fp)
except OSError:
pass
if self.index is None:
self.build_index(chars)
if len(self.index) > MINIMUM_SAVE_LEN:
try:
self.save()
except OSError as exc:
warnings.warn('Could not save {! r}: {}'
.format(INDEX_NAME, exc))
def save(self):
with open(INDEX_NAME, 'wb') as fp:
pickle.dump(self.index, fp)
def build_index(self, chars=None):
if chars is None:
chars = (chr(i) for i in range(32, sys.maxunicode))
index = {}
for char in chars:
try:
name = unicodedata.name(char)
except ValueError:
continue
if name.startswith(CJK_UNI_PREFIX):
name = CJK_UNI_PREFIX
elif name.startswith(CJK_CMP_PREFIX):
name = CJK_CMP_PREFIX
for word in tokenize(name):
index.setdefault(word, set()).add(char)
self.index = index
def word_rank(self, top=None):
# (len(self.index[key], key) = len(self.index[key], key
res = [list((len(self.index[key], key)) for key in self.index)]
res.sort(key=lambda item: (-item[0], item[1]))
if top is not None:
res = res[:top]
return res
def word_report(self, top=None):
for postings, key in self.word_rank(top):
print('{5} {}'.format(postings, key))
def find_chars(self, query, start=0, stop=None):
stop = sys.maxsize if stop is None else stop
result_sets = []
for word in tokenize(query):
# tokenize is the generator of query A and B will be the generator of ['a', 'b']
chars = self.index.get(word)
if chars is None:
result_sets = []
break
result_sets.append(chars)
if not result_sets:
return QueryResult(0, ())
result = functools.reduce(set.intersection, result_sets)
result = sorted(result) # must sort to support start, stop
result_iter = itertools.islice(result, start, stop)
return QueryResult(len(result),
(char for char in result_iter))
def describe(self, char):
code_str = 'U+{:04X}'.format(ord(char))
name = unicodedata.name(char)
return CharDescription(code_str, char, name)
def find_descriptions(self, query, start=0, stop=None):
for char in self.find_chars(query, start, stop).items:
yield self.describe(char)
def get_descriptions(self, chars):
for char in chars:
yield self.describe(char)
def describe_str(self, char):
return '{:7}\t{}\t{}'.format(*self.describe(char))
def find_description_strs(self, query, start=0, stop=None):
for char in self.find_chars(query, start, stop).items:
yield self.describe_str(char)
@staticmethod # not an instance method due to concurrency
def status(query, counter):
if counter == 0:
msg = 'No match'
elif counter == 1:
msg = '1 match'
else:
msg = '{} matches'.format(counter)
return '{} for {! r}'.format(msg, query)
def main(*args):
index = UnicodeNameIndex()
query = ' '.join(args)
n = 0
for n, line in enumerate(index.find_description_strs(query), 1):
print(line)
print('({}).format(index.status(query, n)))
if __name__ == '__main__':
if len(sys.argv) > 1:
main(*sys.argv[1:)else:
print('Usage: {} word1 [word2]... '.format(sys.argv[0]))Copy the code
This module reads Python’s built-in Unicode database, indexes each word in each character name, and then inverts the index to store it in a dictionary. For example, in an inverted index, the entry for the ‘SUN’ key is a set of 10 Unicode characters whose names contain the word ‘SUN’. The inverted index is stored locally in a file named charfinder_index.pickle. If you query for more than one word, the intersection of the collections from the index is calculated. The following is an example:
>>> main('rook') # doctest: +NORMALIZE_WHITESPACE
U+2656 ♖ WHITE CHESS ROOK
U+265C ♜ BLACK CHESS ROOK
(2 matches for 'rook')
>>> main('rook'.'black') # doctest: +NORMALIZE_WHITESPACE
U+265C ♜ BLACK CHESS ROOK
(1 match for 'rook black')
>>> main('white bishop') # doctest: +NORMALIZE_WHITESPACE
U+2657 ♗ WHITE CHESS BISHOP
(1 match for 'white bishop')
>>> main("jabberwocky's vest")
(No match for "jabberwocky's vest")Copy the code
This module does not use concurrency and provides support for servers written using the Asyncio package. Let’s take a look at the tcp_charfinder.py script:
# tcp_charfinder.py
import sys
import asyncio
# used to build indexes and provide query methods
from charfinder import UnicodeNameIndex
CRLF = b'\r\n'
PROMPT = b'? > '
# instantiate UnicodeNameIndex, which uses the charfinder_index.pickle file
index = UnicodeNameIndex()
async def handle_queries(reader, writer):
This coroutine is passed to asyncio.start_server and receives two arguments asyncio.StreamReader and asyncio.StreamWriter
while True: # This loop processes the session until it exits after receiving the control character from the client
writer.write(PROMPT) # can't await! # this method is not a coroutine, just a normal function; This line sends? > prompt
await writer.drain() # must await! # this method flushes the writer buffer; Because it is a coroutine, we should use await
data = await reader.readline() This method is also a coroutine and returns a bytes object, also with await
try:
query = data.decode().strip()
except UnicodeDecodeError:
UnicodeDecodeError may be thrown when the # Telenet client sends control characters
We send null characters by default
query = '\x00'
client = writer.get_extra_info('peername') Return the remote address of the socket connection
print('Received from {}: {! r}'.format(client, query)) Print the query record on the console
if query:
if ord(query[:1]) < 32: Exit the loop if a control character or null character is received
break
# Return a generator that produces a string containing Unicode code points, real characters, and character names
lines = list(index.find_description_strs(query))
if lines:
Convert lines to bytes objects using the default UTF-8 encoding, and add a carriage return at the end of each line to conform to the newline character
The argument list is a generator
writer.writelines(line.encode() + CRLF for line in lines)
writer.write(index.status(query, len(lines)).encode() + CRLF) # output state
await writer.drain() Flush the output buffer
print('Sent {} results'.format(len(lines))) Log the response in the server console
print('Close the client socket') Record the end of the session on the console
writer.close() Close the StreamWriter stream
def main(address='127.0.0.1', port=2323): # add the default address and port, so the call can be called without arguments by default
port = int(port)
loop = asyncio.get_event_loop()
# asyncio.start_server after the coroutine runs,
The returned coroutine object returns an asyncio.Server instance, which is a TCP socket Server
server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop)
server = loop.run_until_complete(server_coro) Drive the server_coro coroutine to start the server
host = server.sockets[0].getsockname() Get the address and port of the first socket on this server
print('Serving on {}. Hit CTRL-C to stop.'.format(host)) # Display address and port in console
try:
loop.run_forever() The main function blocks here until ctrl-C is pressed in the server console
except KeyboardInterrupt: # CTRL+C pressed
pass
print('Server shutting down.')
server.close()
# server.wait_closed returns a future
# Call loop.run_until_complete to run the future
loop.run_until_complete(server.wait_closed())
loop.close() End the event loop
if __name__ == '__main__':
main(*sys.argv[1:)Copy the code
Run tcp_charfinders. Py
python tcp_charfinders.pyCopy the code
Open the terminal and use Telnet command to request service. The running result is as follows:
The main function displays Serving on… almost immediately. Message, and then blocks when the loop.run_forever() method is called. At this point, control flows into the event loop and waits, occasionally returning to the Handle_QUERIES coroutine, which gives control back to the event loop when it needs to wait for the network to send or receive data.
The handLE_QUERIES coroutine can process multiple requests from multiple clients. Whenever a new client connects to the server, an instance of the Handle_QUERIES coroutine is started.
Handle_queries USES bytes for I/O operations. The data we get from the network is decoded, and the data we send is encoded
The Asyncio package provides a high-level streaming API, provides off-the-shelf servers, and we only need to implement a handler. Detailed information can view the document: docs.python.org/3/library/a…
Although asyncio package provides a server, but the function is relatively simple. Now let’s use the Web framework Sanci based on Asyncio package to achieve a simple SERVER of HTTP version
A simple introduction to SANIC was introduced in the previous article,
Python Web framework Sanci quick start
Write a Web server using the SANIC package
Sanic is a web framework based on Python3.5+ and Flask. It provides a higher level API, such as routing, request parameters, response, etc. We only need to implement processing logic.
Here is a simple character query HTTP Web service implemented using SANIC:
from sanic import Sanic
from sanic import response
from charfinder import UnicodeNameIndex
app = Sanic()
index = UnicodeNameIndex()
html_temp = '<p>{char}</p>'
# app.route('/charfinder') # app.route('/charfinder') # app.route('/charfinder') # app.route('/charfinder') # app.route('/charfinder'
async def charfinder(request):
# request.args can fetch the query parameters of the URL
#? Key1 =value1&key2=value2 results in {'key1': ['value1'], 'key2': ['value2']}
Request. Args. Getlist ('char')
# If we use request.args. Get ('char'), we can only get the first argument
query = request.args.getlist('char')
query = ' '.join(query)
lines = list(index.find_description_strs(query))
# Generate HTML for the result
html = '\n'.join([html_temp.format(char=line) for line in lines])
return response.html(html)
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000) Set the server address and port numberCopy the code
Comparing the two pieces of code shows that using SANIC is very simple.
Running the service:
python http_charsfinder.pyCopy the code
We in the browser input address http://0.0.0.0:8000/charfinder? Char =sun The following is an example
Now compare the two pieces of code
In the TCP example, the server creates and schedules the runtime with these two lines of code under the main function:
server_coro = asyncio.start_server(handle_queries, address, port,
loop=loop)
server = loop.run_until_complete(server_coro)Copy the code
Whereas in sanic’s HTTP example, use, create a server:
app.run(host="0.0.0.0", port=8000)Copy the code
Server_coroutine = loop.create_server() is called inside app.run(). Server_coroutine is driven by loop.run_until_complete().
So, in order to start the server, both of these are run by the loop.run_until_complete driver. Sanic encapsulates the run method to make it easier to use.
Here’s a basic fact: Coroutines can only do things if they are driven, and coroutines that drive asyncio.coroutine decorations can either yield from or pass to a function in the asyncio package that takes a coroutine or future as an argument, such as run_until_complete
Now, if you search for CJK, you get 70,000 pieces of data in an HTML file of 3M, which takes about 2s, which is not acceptable if it’s a request for a production service, so we can use paging, so we can only fetch 200 pieces of data at a time, Send the next batch of data using Ajax or WebSockets when the user wants to see more data.
In this article we implement a TCP server using the Asyncio package and an HTTP server using SANIC (based on Asyncio SANIC uses uvloop instead of Asyncio by default) for searching Unicode characters by name. However, the server concurrency part is not covered, which can be discussed later.
This is also a reading note for the Asyncio chapter of Fluent Python, and the next installment will be the third installment of Python concurrency, Concurrency with Threads.
Refer to the link
- Python 3.5 will support the Async/Await asynchronous programming: http://www.infoq.com/cn/news/2015/05/python-async-await
- Python Web framework Sanci quick start
- Python concurrency 2: Use asyncio to handle concurrency
Finally, thank your girlfriend for her support.
Word-wrap: break-word! Important; “> | > Buy me a Fanta |
---|---|