introduce

One of the biggest problems with Web development using Python is performance, and C10K is a bit of a struggle to solve. There are asynchronous frameworks like Tornado, Twisted, Gevent, and others that address performance issues. These frameworks offer some performance improvements, but there are also all sorts of weird problems that are hard to solve.

In python3.6, the official asynchronous coroutine library asyncio became the official standard. A number of asynchronous frameworks have emerged that use Asyncio while preserving the convenience and improving performance.

An earlier asynchronous framework is AIoHTTP, which provides both server and client sides and encapsulates Asyncio well. However, flask is not developed in the same way as flask, the most popular microframework, which is simple, lightweight and efficient.

Microservices are the most popular development model these days because they solve complexity problems, improve development efficiency, and facilitate deployment.

It is the combination of these advantages, based on Sanic, integrated multiple popular libraries to build microservices. Sanic framework is an asynchronous coroutine framework similar to Flask, which is simple and lightweight with high performance.

This project is a micro-service framework based on Sanic.

The characteristics of

  • Use SANIC asynchronous framework, simple, lightweight, efficient.
  • Using uvloop as the core engine allows SANIC to be as concurrent as Golang in many cases.
  • Asyncpg is used as the database driver to connect to the database and execute SQL statements.
  • Use AIOHTTP as Client to access other microservices.
  • Use Peewee for ORM, but only for model design and migration.
  • Use OpenTracing for a distributed tracing system.
  • Use UnitTest for unit testing, and mock to avoid accessing other microservices.
  • Use Swagger as API standard, can automatically generate API documents.

use

The project address

  

sanic-ms:`https://github.com/songcser/sanic-ms`

Example:`https://github.com/songcser/sanic-ms/tree/master/examples`
Copy the code

Swagger API

Zipkin Server

The service side

Using SANIC asynchronous framework, has high performance, but improper use will cause blocking, for IO requests to choose asynchronous library. Be careful when adding libraries. Sanic uses uvloop asynchronous drivers, which are written in Cython based on Libuv and perform better than NodeJS.

Functional specifications

Before the start of

  

@app.listener('before_server_start')

async def before_srver_start(app, loop):

    queue = asyncio.Queue()

    app.queue = queue

    loop.create_task(consume(queue, app.config.ZIPKIN_SERVER))

    reporter = AioReporter(queue=queue)

    tracer = BasicTracer(recorder=reporter)

    tracer.register_required_propagators()

    opentracing.tracer = tracer

    app.db = await ConnectionPool(loop=loop).init(DB_CONFIG)
Copy the code
  • Example Create a DB connection pool
  • Creating a Client connection
  • Create a queue that consumes a span for log tracking
  • Create OpenTracing. Tracer for log tracing

The middleware

  

@app.middleware('request')

async def cros(request):

    if request.method == 'POST' or request.method == 'PUT':

        request['data'] = request.json

    span = before_request(request)

    request['span'] = span

@app.middleware('response')

async def cors_res(request, response):

    span = request['span'] if 'span' in request else None

    if response is None:

        return response

    result = {'code': 0}

    if not isinstance(response, HTTPResponse):

        if isinstance(response, tuple) and len(response) == 2:

            result.update({

                'data': response[0],

                'pagination': response[1]

            })

        else:

            result.update({'data': response})

        response = json(result)

        if span:

            span.set_tag('http.status_code', "200")

    if span:

        span.set_tag('component', request.app.name)

        span.finish()

    return response
Copy the code
  • Create a SPAN for log tracking
  • Encapsulate the response and unify the format

Exception handling

Handles thrown exceptions and returns a uniform format

task

Create a pair of spans in the Task consumption queue for log tracking

Asynchronous processing

With an asynchronous framework, some IO requests can be processed in parallel

Example:

  

async def async_request(datas):

    # async handler request

    results = await asyncio.gather(*[data[2] for data in datas])

    for index, obj in enumerate(results):

        data = datas[index]

        data[0][data[1]] = results[index]

@user_bp.get('/<id:int>')

@doc.summary("get user info")

@doc.description("get user info by id")

@doc.produces(Users)

async def get_users_list(request, id):

    async with request.app.db.acquire(request) as cur:

        record = await cur.fetch(

            """ SELECT * FROM users WHERE id = $1 """, id)

        datas = [

            [record, 'city_id', get_city_by_id(request, record['city_id'])]

            [record, 'role_id', get_role_by_id(request, record['role_id'])]

        ]

        await async_request(datas)

        return record
Copy the code

Getcitybyid, getroleByID is parallel processing.

Related links

sanic: https://github.com/channelcat/sanic

Model design & ORM

Peewee is a simple and small ORM. It has few (but expressive) concepts, making It easy to learn and intuitive to use.Copy the code

ORM uses Peewee for model design and migration only, while database operations use Asyncpg.

Example:

  

# models.py

class Users(Model):

    id = PrimaryKeyField()

    create_time = DateTimeField(verbose_name='create time',

                                default=datetime.datetime.utcnow)

    name = CharField(max_length=128, verbose_name="user's name")

    age = IntegerField(null=False, verbose_name="user's age")

    sex = CharField(max_length=32, verbose_name="user's sex")

    city_id = IntegerField(verbose_name='city for user', help_text=CityApi)

    role_id = IntegerField(verbose_name='role for user', help_text=RoleApi)

    class Meta:

        db_table = 'users'

# migrations.py

from sanic_ms.migrations import MigrationModel, info, db

class UserMigration(MigrationModel):

    _model = Users

    # @info(version="v1")

    # def migrate_v1(self):

    #     migrate(self.add_column('sex'))

def migrations():

    try:

        um = UserMigration()

        with db.transaction():

            um.auto_migrate()

            print("Success Migration")

    except Exception as e:

        raise e

if __name__ == '__main__':

    migrations()
Copy the code
  • Run the python migrations.py command
  • Migrate_v1 adds the field sex, and the name field is added in BaseModel first
  • The info decorator creates a migrate_record table to record migrate, version must be unique in each model, version is used to record whether or not the migrate has been executed, and author and datetime can also be recorded
  • The migrate function must start with migrate_

Related links

  

peewee:http://docs.peewee-orm.com/en/latest/
Copy the code

Database operations

  

asyncpg is the fastest driver among common Python, NodeJS and Go implementations
Copy the code

Asyncpg is used as the database driver to encapsulate database connections and perform database operations.

One reason for not using ORM for database operations is performance, ORM has a performance drain and the asyncpG high performance library is not available. The other is that individual microservices are very simple, the table structure is not very complex, simple SQL statements can be handled, there is no need to introduce ORM. Using Peewee is just model design

Example:

  

sql = "SELECT * FROM users WHERE name=$1"

name = "test"

async with request.app.db.acquire(request) as cur:

    data = await cur.fetchrow(sql, name)

async with request.app.db.transaction(request) as cur:

    data = await cur.fetchrow(sql, name)
Copy the code
  • The acquire() function is non-transactional, which can improve query efficiency if only queries are involved
  • The tansAction () function is transactional and must be used for additions, deletions, and changes
  • The request argument is passed in to get a SPAN for log tracing
  • TODO database read/write separation

Related links

  

asyncpg:https://github.com/MagicStack/asyncpg

benchmarks:https://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/
Copy the code

The client

Using client in AIOHTTP, the client is simply encapsulated for access between microservices.

Don't create a session per request. Most likely you need a session per application which everyone requests altogether. A session contains a connection pool inside, connection reusage and keep-alives (both are on by default) may speed up total performance.Copy the code

Example:

  

@app.listener('before_server_start')

async def before_srver_start(app, loop):

    app.client =  Client(loop, url='http://host:port')

async def get_role_by_id(request, id):

    cli = request.app.client.cli(request)

    async with cli.get('/cities/{}'.format(id)) as res:

        return await res.json()

@app.listener('before_server_stop')

async def before_server_stop(app, loop):

    app.client.close()
Copy the code

Multiple different clients can be created for accessing different microservices, so that each client keeps -alives

Related links

  

aiohttp:http://aiohttp.readthedocs.io/en/stable/client.html
Copy the code

Log & distributed tracking system

Use official logging, with the configuration file logging.yml and sanIC version 0.6.0 or above. JsonFormatter converts logs to JSON format for input to ES

  

Enter OpenTracing: by offering consistent, expressive, vendor-neutral APIs for popular platforms, OpenTracing makes it easy for developers to add (or switch) tracing implementations with an O(1) configuration change. OpenTracing also offers a lingua franca for OSS instrumentation and platform-specific tracing helper libraries. Please refer to the Semantic Specification.
Copy the code

A decorator logger

  

@logger(type='method', category='test', detail='detail', description="des", tracing=True, level=logging.INFO)

async def get_city_by_id(request, id):

    cli = request.app.client.cli(request)
Copy the code
  • Type: indicates the log type, such as method and route
  • Category: log category. The default value is app name
  • Detail: log details
  • Description: Indicates the log description. The default value is a function comment
  • Tracing: log tracing, default True
  • Level: log level. The default value is INFO

Distributed tracking system

  • Based on distributed tracking systems such as Dapper and Zipkin, OpenTracing established unified standards.
  • Opentracing traces each request and records each microservice that the request passes through. It is crucial to analyze performance bottlenecks of microservices in a chain manner.
  • Use the OpenTracing framework, but convert to Zipkin format for output. Since most distributed tracing systems use thrift to communicate for performance reasons, in the spirit of simple, Restful style, RPC communication is not used. Output in the form of logs, you can use fluentd, Logstash and other log collection and input to Zipkin. Zipkin supports HTTP input.
  • The generated span is first queued without blocking, and the span of the queue is consumed in task. The upper sampling frequency can be added later.
  • For DB, clients are added with tracing

Related links

  

opentracing:https://github.com/opentracing/opentracing-python

zipkin:https://github.com/openzipkin/zipkin

jaeger:https://uber.github.io/jaeger/
Copy the code

API interface

API documentation uses the Swagger standard.

Example:

  

from sanic_ms import doc

@user_bp.post('/')

@doc.summary('create user')

@doc.description('create user info')

@doc.consumes(Users)

@doc.produces({'id': int})

async def create_user(request):

    data = request['data']

    async with request.app.db.transaction(request) as cur:

        record = await cur.fetchrow(

            """ INSERT INTO users(name, age, city_id, role_id)

                VALUES($1, $2, $3, $4, $5)

                RETURNING id

            """, data['name'], data['age'], data['city_id'], data['role_id']

        )

        return {'id': record['id']}
Copy the code
  • Summary: the API profile
  • Description: Provides detailed description
  • Consumes: The body data of the request
  • Produces: Response returns data
  • Tag: API
  • The consumes or Produces parameters that are passed in are Peewee’s model. It parses the Model to generate API data, and the Help_text parameter in the field field represents the consumes or Produces objects
  • http://host:ip/openapi/spec.json for generating the json data

Related links

  

swagger:https://swagger.io/
Copy the code

The Response data

Instead of returning sanic’s response, return raw data directly. The Middleware will process the returned data and return a uniform format, which can be found in [see]

Unit testing

Unit tests use UnitTest. Mock creates a MockClient by itself, because UnitTest does not yet have an asyncio mock, and sanic’s test interface also sends request requests. Pytest can be used later.

Example:

  

from sanic_ms.tests import APITestCase

from server import app

class TestCase(APITestCase):

    _app = app

    _blueprint = 'visit'

    def setUp(self):

        super(TestCase, self).setUp()

        self._mock.get('/cities/1',

                       payload={'id': 1, 'name': 'shanghai'})

        self._mock.get('/roles/1',

                       payload={'id': 1, 'name': 'shanghai'})

    def test_create_user(self):

        data = {

            'name': 'test',

            'age': 2,

            'city_id': 1,

            'role_id': 1,

        }

        res = self.client.create_user(data=data)

        body = ujson.loads(res.text)

        self.assertEqual(res.status, 200)
Copy the code
  • _blueprint indicates the name of the blueprint
  • In the setUp function, we use _mock to register the mock information so that the real server is not accessed, and payload is the body information returned
  • Each function is called using client variables, with data as body information, params as path parameters, and other parameters as route parameters

Code coverage

  

coverage erase

coverage run --source . -m sanic_ms tests

coverage xml -o reports/coverage.xml

coverage2clover -i reports/coverage.xml -o reports/clover.xml

coverage html -d reports
Copy the code
  • Coverage2colver converts coverage. XML into clover. XML, bamboo needs clover format.

Related links

unittest:https://docs.python.org/3/library/unittest.html coverage: https://coverage.readthedocs.io/en/coverage-4.4.1/Copy the code

Exception handling

Handle thrown exceptions using app.error_handler = CustomHander()

Example:

from sanic_ms.exception import ServerError @visit_bp.delete('/users/<id:int>') async def del_user(request, id): Raise ServerError(error=' internal error ',code=10500, message=" MSG ") code: error code. The value is 0 if no exception occurs. Other values are abnormal. User-defined error message status_code: indicates the HTTP status code. The standard HTTP status code is usedCopy the code

\

Author: Song Jiyi, January 29

Making ID: songcser

\

Click to read the original article and join the CodingGo programming community. To read more, click: