The last few articles

  • Python concurrency 1: Handle concurrency using futures
  • Python concurrency 2: Use asyncio to handle concurrency

Asyncio Last article we introduced the Asyncio package and how to use asynchronous programming to manage high concurrency in network applications. In this article, we focus on two examples of programming with the Asyncio package.

Async/await syntax

Coroutine decorator asyncio. Coroutine decorator yield from. Here we use async and await.

Python concurrency 2: Use asyncio to handle concurrency

Async /await is the new syntax for Python3.5, which looks like this:

async def read_data(db):
    passCopy the code

Async is a keyword that explicitly declares a function as a coroutine, even if there is no await expression, the function execution returns a coroutine object. Inside coroutine functions, we can use the await keyword before an expression to suspend execution of a coroutine until it completes:

async def read_data(db):
    data = await db.fetch('SELECT ... ')Copy the code

This code has the syntax asyncio.coroutine decorator:

@asyncio.coroutine
def read_data(db):
    data = yield from db.fetch('SELECT ... ')Copy the code

The result of both code execution is the same, that is, asyncio.coroutine can be replaced with async, yield from replaced with await.

What are the benefits of using the new syntax:

  • Makes the concepts of generators and coroutines easier to understand because of the different syntax
  • You can eliminate ambiguous errors caused by accidentally removing the yield declaration from the coroutine during refactoring, which causes the coroutine to become a normal generator.

Write the server using the Asyncio package

This example uses the Asyncio package and the UnicodeData module to find Unicode characters by canonical names.

Let’s look at the code first:

# charfinder.py
import sys
import re
import unicodedata
import pickle
import warnings
import itertools
import functools
from collections import namedtuple

RE_WORD = re.compile('\w+')
RE_UNICODE_NAME = re.compile('^[A-Z0-9 -]+$')
RE_CODEPOINT = re.compile('U\+[0-9A-F]{4, 6}')

INDEX_NAME = 'charfinder_index.pickle'
MINIMUM_SAVE_LEN = 10000
CJK_UNI_PREFIX = 'CJK UNIFIED IDEOGRAPH'
CJK_CMP_PREFIX = 'CJK COMPATIBILITY IDEOGRAPH'

sample_chars = [
    '$'.# DOLLAR SIGN
    'A'.# LATIN CAPITAL LETTER A
    'a'.# LATIN SMALL LETTER A
    '\u20a0'.# EURO-CURRENCY SIGN
    '\u20ac'.# EURO SIGN
]

CharDescription = namedtuple('CharDescription'.'code_str char name')

QueryResult = namedtuple('QueryResult'.'count items')


def tokenize(text):
    ''' :param text: :return: return iterable of uppercased words '''
    for match in RE_WORD.finditer(text):
        yield match.group().upper()


def query_type(text):
    text_upper = text.upper()
    if 'U+' in text_upper:
        return 'CODEPOINT'
    elif RE_UNICODE_NAME.match(text_upper):
        return 'NAME'
    else:
        return 'CHARACTERS'


class UnicodeNameIndex:
    # Unicode name index class

    def __init__(self, chars=None):
        self.load(chars)

    def load(self, chars=None):
        # Loading Unicode name
        self.index = None
        if chars is None:
            try:
                with open(INDEX_NAME, 'rb') as fp:
                    self.index = pickle.load(fp)
            except OSError:
                pass
        if self.index is None:
            self.build_index(chars)
        if len(self.index) > MINIMUM_SAVE_LEN:
            try:
                self.save()
            except OSError as exc:
                warnings.warn('Could not save {! r}: {}'
                              .format(INDEX_NAME, exc))

    def save(self):
        with open(INDEX_NAME, 'wb') as fp:
            pickle.dump(self.index, fp)

    def build_index(self, chars=None):
        if chars is None:
            chars = (chr(i) for i in range(32, sys.maxunicode))
        index = {}
        for char in chars:
            try:
                name = unicodedata.name(char)
            except ValueError:
                continue
            if name.startswith(CJK_UNI_PREFIX):
                name = CJK_UNI_PREFIX
            elif name.startswith(CJK_CMP_PREFIX):
                name = CJK_CMP_PREFIX

            for word in tokenize(name):
                index.setdefault(word, set()).add(char)

        self.index = index

    def word_rank(self, top=None):
        # (len(self.index[key], key) = len(self.index[key], key
        res = [list((len(self.index[key], key)) for key in self.index)]
        res.sort(key=lambda  item: (-item[0], item[1]))
        if top is not None:
            res = res[:top]
        return res

    def word_report(self, top=None):
        for postings, key in self.word_rank(top):
            print('{5} {}'.format(postings, key))

    def find_chars(self, query, start=0, stop=None):
        stop = sys.maxsize if stop is None else stop
        result_sets = []
        for word in tokenize(query):
            # tokenize is the generator of query A and B will be the generator of ['a', 'b']
            chars = self.index.get(word)
            if chars is None:
                result_sets = []
                break
            result_sets.append(chars)

        if not result_sets:
            return QueryResult(0, ())

        result = functools.reduce(set.intersection, result_sets)
        result = sorted(result)  # must sort to support start, stop
        result_iter = itertools.islice(result, start, stop)
        return QueryResult(len(result),
                           (char for char in result_iter))

    def describe(self, char):
        code_str = 'U+{:04X}'.format(ord(char))
        name = unicodedata.name(char)
        return CharDescription(code_str, char, name)

    def find_descriptions(self, query, start=0, stop=None):
        for char in self.find_chars(query, start, stop).items:
            yield self.describe(char)

    def get_descriptions(self, chars):
        for char in chars:
            yield self.describe(char)

    def describe_str(self, char):
        return '{:7}\t{}\t{}'.format(*self.describe(char))

    def find_description_strs(self, query, start=0, stop=None):
        for char in self.find_chars(query, start, stop).items:
            yield self.describe_str(char)

@staticmethod # not an instance method due to concurrency
    def status(query, counter):
        if counter == 0:
            msg = 'No match'
        elif counter == 1:
            msg = '1 match'
        else:
            msg = '{} matches'.format(counter)
        return '{} for {! r}'.format(msg, query)

def main(*args):
    index = UnicodeNameIndex()
    query = ' '.join(args)
    n = 0
    for n, line in enumerate(index.find_description_strs(query), 1):
        print(line)
    print('({}).format(index.status(query, n)))


if __name__ == '__main__':
    if len(sys.argv) > 1:
        main(*sys.argv[1:)else:
        print('Usage: {} word1 [word2]... '.format(sys.argv[0]))Copy the code

This module reads Python’s built-in Unicode database, indexes each word in each character name, and then inverts the index to store it in a dictionary. For example, in an inverted index, the entry for the ‘SUN’ key is a set of 10 Unicode characters whose names contain the word ‘SUN’. The inverted index is stored locally in a file named charfinder_index.pickle. If you query for more than one word, the intersection of the collections from the index is calculated. The following is an example:

    >>> main('rook')  # doctest: +NORMALIZE_WHITESPACE
    U+2656  ♖  WHITE CHESS ROOK
    U+265C  ♜  BLACK CHESS ROOK
    (2 matches for 'rook')
    >>> main('rook'.'black')  # doctest: +NORMALIZE_WHITESPACE
    U+265C  ♜  BLACK CHESS ROOK
    (1 match for 'rook black')
    >>> main('white bishop')  # doctest: +NORMALIZE_WHITESPACE
    U+2657  ♗   WHITE CHESS BISHOP
    (1 match for 'white bishop')
    >>> main("jabberwocky's vest")
    (No match for "jabberwocky's vest")Copy the code

This module does not use concurrency and provides support for servers written using the Asyncio package. Let’s take a look at the tcp_charfinder.py script:

# tcp_charfinder.py
import sys
import asyncio

# used to build indexes and provide query methods
from charfinder import UnicodeNameIndex

CRLF = b'\r\n'
PROMPT = b'? > '

# instantiate UnicodeNameIndex, which uses the charfinder_index.pickle file
index = UnicodeNameIndex()

async def handle_queries(reader, writer):
    This coroutine is passed to asyncio.start_server and receives two arguments asyncio.StreamReader and asyncio.StreamWriter
    while True:  # This loop processes the session until it exits after receiving the control character from the client
        writer.write(PROMPT)  # can't await! # this method is not a coroutine, just a normal function; This line sends? > prompt
        await writer.drain()  # must await! # this method flushes the writer buffer; Because it is a coroutine, we should use await
        data = await reader.readline()  This method is also a coroutine and returns a bytes object, also with await
        try:
            query = data.decode().strip()
        except UnicodeDecodeError:
            UnicodeDecodeError may be thrown when the # Telenet client sends control characters
            We send null characters by default
            query = '\x00'
        client = writer.get_extra_info('peername')  Return the remote address of the socket connection
        print('Received from {}: {! r}'.format(client, query))  Print the query record on the console
        if query:
            if ord(query[:1]) < 32:  Exit the loop if a control character or null character is received
                break
            # Return a generator that produces a string containing Unicode code points, real characters, and character names
            lines = list(index.find_description_strs(query)) 
            if lines:
                Convert lines to bytes objects using the default UTF-8 encoding, and add a carriage return at the end of each line to conform to the newline character
                The argument list is a generator
                writer.writelines(line.encode() + CRLF for line in lines) 
            writer.write(index.status(query, len(lines)).encode() + CRLF) # output state

            await writer.drain()  Flush the output buffer
            print('Sent {} results'.format(len(lines)))  Log the response in the server console

    print('Close the client socket')  Record the end of the session on the console
    writer.close()  Close the StreamWriter stream



def main(address='127.0.0.1', port=2323):  # add the default address and port, so the call can be called without arguments by default
    port = int(port)
    loop = asyncio.get_event_loop()
    # asyncio.start_server after the coroutine runs,
    The returned coroutine object returns an asyncio.Server instance, which is a TCP socket Server
    server_coro = asyncio.start_server(handle_queries, address, port,
                                loop=loop) 
    server = loop.run_until_complete(server_coro) Drive the server_coro coroutine to start the server

    host = server.sockets[0].getsockname()  Get the address and port of the first socket on this server
    print('Serving on {}. Hit CTRL-C to stop.'.format(host))  # Display address and port in console
    try:
        loop.run_forever()  The main function blocks here until ctrl-C is pressed in the server console
    except KeyboardInterrupt:  # CTRL+C pressed
        pass

    print('Server shutting down.')
    server.close()
    # server.wait_closed returns a future
    # Call loop.run_until_complete to run the future
    loop.run_until_complete(server.wait_closed())  
    loop.close()  End the event loop


if __name__ == '__main__':
    main(*sys.argv[1:)Copy the code

Run tcp_charfinders. Py

python tcp_charfinders.pyCopy the code

Open the terminal and use Telnet command to request service. The running result is as follows:

Access the TCP version string in the Telnet session to find the query made by the server

The main function displays Serving on… almost immediately. Message, and then blocks when the loop.run_forever() method is called. At this point, control flows into the event loop and waits, occasionally returning to the Handle_QUERIES coroutine, which gives control back to the event loop when it needs to wait for the network to send or receive data.

The handLE_QUERIES coroutine can process multiple requests from multiple clients. Whenever a new client connects to the server, an instance of the Handle_QUERIES coroutine is started.

Handle_queries USES bytes for I/O operations. The data we get from the network is decoded, and the data we send is encoded

The Asyncio package provides a high-level streaming API, provides off-the-shelf servers, and we only need to implement a handler. Detailed information can view the document: docs.python.org/3/library/a…

Although asyncio package provides a server, but the function is relatively simple. Now let’s use the Web framework Sanci based on Asyncio package to achieve a simple SERVER of HTTP version

A simple introduction to SANIC was introduced in the previous article,
Python Web framework Sanci quick start

Write a Web server using the SANIC package

Sanic is a web framework based on Python3.5+ and Flask. It provides a higher level API, such as routing, request parameters, response, etc. We only need to implement processing logic.

Here is a simple character query HTTP Web service implemented using SANIC:

from sanic import Sanic
from sanic import response

from charfinder import UnicodeNameIndex

app = Sanic()

index = UnicodeNameIndex()

html_temp = '<p>{char}</p>'

# app.route('/charfinder') # app.route('/charfinder') # app.route('/charfinder') # app.route('/charfinder') # app.route('/charfinder'
async def charfinder(request):
    # request.args can fetch the query parameters of the URL
    #? Key1 =value1&key2=value2 results in {'key1': ['value1'], 'key2': ['value2']}
    Request. Args. Getlist ('char')
    # If we use request.args. Get ('char'), we can only get the first argument
    query = request.args.getlist('char')
    query = ' '.join(query)
    lines = list(index.find_description_strs(query))
    # Generate HTML for the result
    html = '\n'.join([html_temp.format(char=line) for line in lines])
    return response.html(html)

if __name__ == '__main__':
    app.run(host="0.0.0.0", port=8000)  Set the server address and port numberCopy the code

Comparing the two pieces of code shows that using SANIC is very simple.

Running the service:

python http_charsfinder.pyCopy the code

We in the browser input address http://0.0.0.0:8000/charfinder? Char =sun The following is an example

https://p1-jj.byteimg.com/tos-cn-i-t2oaga2asx/gold-user-assets/2017/7/1/28f220bae095ad24a1e70b1d8684eea2~tplv-t2oaga2asx -image.image

Now compare the two pieces of code

In the TCP example, the server creates and schedules the runtime with these two lines of code under the main function:

server_coro = asyncio.start_server(handle_queries, address, port,
                                loop=loop)
server = loop.run_until_complete(server_coro)Copy the code

Whereas in sanic’s HTTP example, use, create a server:

app.run(host="0.0.0.0", port=8000)Copy the code

Server_coroutine = loop.create_server() is called inside app.run(). Server_coroutine is driven by loop.run_until_complete().

So, in order to start the server, both of these are run by the loop.run_until_complete driver. Sanic encapsulates the run method to make it easier to use.

Here’s a basic fact: Coroutines can only do things if they are driven, and coroutines that drive asyncio.coroutine decorations can either yield from or pass to a function in the asyncio package that takes a coroutine or future as an argument, such as run_until_complete

Now, if you search for CJK, you get 70,000 pieces of data in an HTML file of 3M, which takes about 2s, which is not acceptable if it’s a request for a production service, so we can use paging, so we can only fetch 200 pieces of data at a time, Send the next batch of data using Ajax or WebSockets when the user wants to see more data.

In this article we implement a TCP server using the Asyncio package and an HTTP server using SANIC (based on Asyncio SANIC uses uvloop instead of Asyncio by default) for searching Unicode characters by name. However, the server concurrency part is not covered, which can be discussed later.

This is also a reading note for the Asyncio chapter of Fluent Python, and the next installment will be the third installment of Python concurrency, Concurrency with Threads.

Refer to the link

  • Python 3.5 will support the Async/Await asynchronous programming: http://www.infoq.com/cn/news/2015/05/python-async-await
  • Python Web framework Sanci quick start
  • Python concurrency 2: Use asyncio to handle concurrency

Finally, thank your girlfriend for her support.

Word-wrap: break-word! Important; “> > Buy me a Fanta
Welcome to attention
Buy me a Fanta