Some microservices instructions

preface

Before switching back to Python, I had actually experimented with microservices, but at the time I also looked at go-Micro-v2, which I had only dabbled in at that time

In fact, micro service this thing does not need to micro service and micro service! Unless really business needs, in fact, there is no need to split, after all, join you just a person to dry again! Ha ha that you introduce this micro service words, estimate is to give oneself tired to lie down!

Here I am mainly for the sake of learning and learning to do the example, the production environment, in fact, may involve a lot of problems, I am mainly here to summarize the prototype of some micro services.

About Microservices

PS: I will probably not use Nameko here. This frame has been fixed for 19 years and seems not to have been updated. And not universal across languages!

Referring to the previous study notes, the overall architecture of microservices is as follows:

This source: https://github.com/stack-labs/learning-videos/tree/master/docs/micro-api

If we use fastAPI to do this, it will just act as our aggregation service layer inside.

In fact, there are several problems related to microservices:

  • How to split services
  • How do services communicate with each other
  • How to Do Service Registration and Discovery (Consul, EDCT)
  • How to do service configuration center (Nacos, Apollo, Spring Cloud Config)
  • API gateway does SLB layer processing (Goku, Kong, Apisix)
  • Link Tracing related to microservices
  • Log aggregation issues in microservices

So a complete microservice diagram would look something like this:

This source: https://github.com/stack-labs/learning-videos/tree/master/docs/micro-api

Fastapi Microservices Prelude:

1: Brief description of Protobuf:

  • 1: It is a clean and efficient data storage format, is a data exchange format
  • 2: high compression
  • 3: XML and JSON serialization and deserialization compression transmission is relatively high
  • 4: Fast transmission
  • 5: Support cross-language, cross-platform, a language, platform independent, extensible serialized structured data
  • 6: It is just a protocol that can exist without a concrete framework
  • 7: Interface Definition Language (IDL) to describe the structure of service interfaces and payload messages

Procedure for using Protobuf:

Write proto files -> build with Protoc -> Add Protobuf runtime -> integrate in projectCopy the code

Process of updating protobuf:

Modify the proto file -> recompile with Protoc -> modify the integration place in the projectCopy the code

2: Brief introduction to GRPC

About the RPC

Definition:

  • Remote Procedure Call
  • A method by which one server invokes a service on another server looks like a local invocation

Common RPC Frameworks

  • GRPC (Google)
  • Thrift (Facebook – Now rename buy it
  • Dubbo (Ali’s JAVA family)

Definition:

Grpc is based on the RPC framework of protobuf data protocol. It uses Protobuf for data transfer.

GRPC features:

  • 1: based on c++ high performance, and protocol based on protobuf serialization and deserialization (unlike Python XML and json rpa framework)
  • 2: Universal, across common mainstream languages (Python clients can call clients written by Go)
  • 3: High-performance, general open source RPC framework
  • 4: Easier to create distributed applications and services

GrPC-Python

GRPC. Making. IO/GRPC/python…

3. Simple use experience of GRPC framework in Python:

Lower versions of IDE:

3.1 Pychram Install the Protobuf plug-in

Protobuf file format for easy identification:

Step 1- Download the plug-in ZIP file:

Download address: https://plugins.jetbrains.com/plugin/8277-protobuf-support  https://plugins.jetbrains.com/plugin/16228-protobuf-support/versions/stable/144595Copy the code

Step 2- Install the plug-in locally

Step 3- Restart Pychram

After the restart can be normal to identify the proto file!

For the 2021 version, search directly:

After installation, it can automatically identify:

3.2 Installing the GRPC Tool in Python:

Specific tool kit:

2:1: grapio grpcio - toolsCopy the code

Installation:

pip install grpcio -i https://pypi.tuna.tsinghua.edu.cn/simple
pip install grpcio-tools -i https://pypi.tuna.tsinghua.edu.cn/simple
Copy the code

3.3 GrPC-Python Experience Examples on the official website:

Example steps are as follows:

1: Step 1 – Write a protobuf file (version uses 3)

syntax = "proto3"; Service Greeter {// Define a specific service containing method for PAC RPC SayHello (HelloRequest) returns (HelloReply) {} RPC SayHelloAgain (HelloRequest) returns (HelloReply) {} } message HelloRequest { string name = 1; } message HelloReply {string message = 1; // Field information of our request to move the amount of packets}Copy the code

Here is:

2: Step 2 – Compile the proto file

PS: it is recommended to pay attention to the current we need to enter the proto file and then execute the command:

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. hello.proto
Copy the code

Some notes on the above orders:

  • Grpc_tools. protoc: depends on the grpcio-tools we installed above

  • –python_out=. : indicates the path to the protobuf file generated by our output compilation,. The dot indicates the current directory (where the generated files are placed)

  • — grpc_PYTHon_out =. : Indicates the path to the GRPC file generated by our output compilation,. The dot indicates the current directory

  • -I. : Input from that directory to find our xx.proto file. Click to find from the current directory

PS: Only PY generates two files, other languages are just one file like GO

Result of the preceding command:

PS: need to pay attention to the point, generated file introduced package path problems!

Description of the generated file:

  • Hello_pb2.py: encapsulates the number of equal parameters of the request and response defined in our protobuf, using the inside of the request body parameters and response body parameters to instantiate the operation, etc.

  • Hello_pb2_grpc. py: this file is used to generate the GRPC service. When you need to generate the server or client, you need to rely on this file.

3: Step 3 – Write the server side of the GRPC (multithreaded mode to handle concurrency) :

  • 1: based on the interface defined in our hello_PB2_grPC implementation

Define a service name that inherits our hello_PB2_grPC, help us generate the service name, and implement all the methods

2: registers the service with the RPC service

3: Do some startup configuration processing for our RPC service

Ps: There are multiple ways to start RPC services:

Method 1:

def serve(): Instantiate an RPC service Using a thread pool way to start our service server = GRPC. Server (futures. ThreadPoolExecutor (max_workers = 10)) # add our service hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), Add_insecure_port ('[::]:50051') # start server.start(), # -- cycle - specifies the start of the main process after the target is started. Try: while True: time.sleep(60 * 60 * 24) # one day in seconds except KeyboardInterrupt: server.stop(0)Copy the code

Method 2:

def serve(): Instantiate an RPC service Using a thread pool way to start our service server = GRPC. Server (futures. ThreadPoolExecutor (max_workers = 10)) # add our service hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), Add_insecure_port ('[::]:50051') # Service to be started server.start() # wait_for_termination - The main process ends directly after the target is started! A loop is required to run the process server.wait_for_termination()Copy the code

PS: wait_for_termination blocks the current thread until the server stops.

This is an experimental API.

Wait does not consume computing resources during blocking, it blocks until one of two conditions is met:

  1. Stop or terminate the server;
  2. If there is no timeout, a timeout occurs. No.

Server – The complete server instance code is:

From Concurrent import futures import time import GRPC import hello_pb2 import hello_pb2_grPC # implement as defined in the proto file GreeterServicer class Greeter(hello_PB2_grpc.greeterServicer): Def SayHello(self, request, context): Return hello_pb2.HelloReply(message='hello {MSG}'. Format (MSG =request.name)) def SayHelloAgain(self, request, context): Return hello_pb2.HelloReply(message='hello {MSG}'. Format (MSG =request.name)) def serve(): Instantiate an RPC service Using a thread pool way to start our service server = GRPC. Server (futures. ThreadPoolExecutor (max_workers = 10)) # add our service hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), Add_insecure_port ('[::]:50051') # Service to be started server.start() # wait_for_termination - The main process ends directly after the target is started! A loop is required to run the process server.wait_for_termination() if __name__ == '__main__': serve()Copy the code

4: Step 4 – Write the client-grPC client to call our server:

#! /usr/bin/evn python # coding=utf-8 import grpc import hello_pb2 import hello_pb2_grpc def run(): With grpc.insecure_channel('localhost:50051') as channel: # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2 SayHello(hello_pb2.HelloRequest(name=' xiaobell ')) print("SayHello" "+ response.message) response = stub.sayHelloagain (hello_pb2.HelloRequest(name=' welcome next time ')) Print ("SayHelloAgain return: "+ response.message) if __name__ == '__main__': run()Copy the code

5: Step 5 – Service startup:

  • Starting the server
  • Restart the client

The final output from the client is:

SayHelloAgain: Hello welcome to visit next timeCopy the code

Summary steps:

  • 1: Write a.proto file defining the service (which defines the message body and service interface)
  • 2: compile the. Proto file to generate specific service information
  • 3: write client and server

4: GRPC 4 communication modes (Python implementation)

Different business demand scenarios, different business models, different communication modes:

  • Simple pattern: The request responds to one invocation (that is, the client requests once and the server responds once)

    PS: Simple mode can also be called unary RPC mode

  • Server-side streaming mode: the client requests one request, and the server responds with multiple streams (the client sends an object and the server returns a Stream).

  • Client flow mode: After the client sends multiple streams of requests, the server responds once (the client reports data).

  • Two-way flow mode: multiple streaming requests from the client and multiple streaming replies from the server (similar to WebSocket (long connection), the client can request messages from the server and the server can also request messages from the client).

Since the simple mode above a demonstration, so I will not demonstrate here, the following example I also from the official website of the example, I am mainly separated for practical experience.

In general, the downstream mode is used in the following scenarios:

  • Large packet
  • Real-time scenario data transmission

4.1 Example of server flow Mode

Definition:

  • Server-side streaming mode: the client requests one request, and the server responds with multiple streams (the client sends an object and the server returns a Stream).

1: Step 1: Write the ServerStrem.proto file to define the service (which defines the message body and service interface)

syntax = "proto3"; RPC SayHello(HelloRequest) returns (stream HelloReply) {}} Message HelloRequest {string name = 1; } message HelloReply {string message = 1; // Field information of our request to move the amount of packets}Copy the code

2: Step 2 – Compile the ServerStrem.proto file

PS: it is recommended to pay attention to the current we need to enter the proto file and then execute the command (currently my example adjustment, adjust to Demo2 package) :

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. serverstrem.proto
Copy the code

3: Step 3 – Write serverStrem_grPC_server.py server for GRPC:

from concurrent import futures import grpc from demo2 import serverstrem_pb2_grpc, Serverstrem_pb2 import threading Import time Import Random # Implement the GreeterServicer interface class defined in proto Greeter(ServerStreM_PB2_grpc.greeterServicer): # Implement an RPC call def SayHello(self, request, context) defined in proto: # check if the client is still connected while context.is_active(): Client_name = request.name # Use the generator to send a message to the client Serverstrem_pb2. HelloReply (message = f "{client_name}! I'm your uncle! {random.sample('zyxwvutsrqponmlkjihgfedcba',5)}") def serve(): Instantiate an RPC service Using a thread pool way to start our service server = GRPC. Server (futures. ThreadPoolExecutor (max_workers = 2)) # add our service serverstrem_pb2_grpc.add_GreeterServicer_to_server(Greeter(), Add_insecure_port ('[::]:50051') # Service to be started server.start() # wait_for_termination - The main process ends directly after the target is started! A loop is required to run the process server.wait_for_termination() if __name__ == '__main__': serve()Copy the code

The above stream service implementation uses a generator to return our data stream:

while context.is_active(): Client_name = request.name # Use the generator to send a message to the client Serverstrem_pb2. HelloReply (message = f "{client_name}! I'm your uncle! {random.sample('zyxwvutsrqponmlkjihgfedcba',5)}")Copy the code

PS: To demonstrate the thread pool problem, we set up only 2 threads, which means that in this server stream mode, we can only handle 2 client connections!! More than 2, there is no way!! Need to wait!

4: Step 4 – Write serverStrem_grPC_client.py GRPC client to call our server:

The client has a stub (a stub is just a client in some languages) that provides the same methods as the server

import grpc from demo2 import serverstrem_pb2, serverstrem_pb2_grpc def run(): With grpc.insecure_channel('localhost:50051') as channel: GreeterStub(channel) # The body of the argument that needs to be passed when generating the function that requests our service, which is placed in hello_pb2 - the body of the request is: SayHello(serverStrem_pb2. HelloRequest(name='小风学'))) for item in response: Print ("SayHello returns: : "+ item.message) if __name__ == '__main__': run()Copy the code

Note: above we receive data from the server using a loop to receive! :

SayHello(serverStrem_pb2. HelloRequest(name=' hello ')) for item in response: print("SayHello" " + item.message)Copy the code

When multiple clients are started, our client eventually outputs the following information:

If there are more than three, the output cannot be processed and one client must be shut down.

Conclusion:

1: the server stream actually uses some kind of loop iteration to send our data iteration only! 3. If the server is required to actively close the connection, use context.cancel().Copy the code

Here’s an example of a server actively shutting down:

GreeterServicer class Greeter(ServerStreM_PB2_grpc.greeterServicer) Def SayHello(self, request, context): Idnex = 1 while context.is_active(): Idnex =idnex +1 print(" ",idnex) client_name = request. Name # Use the generator to send a message back to the client time.sleep(1) # If you need to shut down the server, you can use: Context.cancel () yield ServerSTREm_pb2. HelloReply(message=f"{client_name}! I'm your uncle! {random.sample('zyxwvutsrqponmlkjihgfedcba',5)}")Copy the code

When our server actively closes the connection: the client throws an exception:

4.2 Client Flow Mode Example

Definition:

  • After the client sends multiple streaming requests, the server responds once (the client reports data).

1: Step 1: Write the ServerStrem.proto file to define the service (which defines the message body and service interface)

syntax = "proto3"; Service Greeter {// Implement RPC SayHello(HelloRequest) returns (stream HelloReply) {} // Add client flow mode RPC SayRequestStream(stream HelloRequest) returns (HelloReply) {} } message HelloRequest { string name = 1; } message HelloReply {string message = 1; // Field information of our request to move the amount of packets}Copy the code

2: Step 2 – Update and compile the ServerStrem. Proto file

PS: it is recommended to pay attention to the current we need to enter the proto file and then execute the command (currently my example adjustment, adjust to Demo2 package) :

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. serverstrem.proto
Copy the code

3: Step 3 – Update serverStrem_grPC_server.py GRPC server:

Just add the SayRequestStream method that needs to be implemented.

According to our definition of this pattern is:

  • After the client sends multiple streaming requests, the server responds once (the client reports data). Therefore, the server needs to design relevant conditions, end the client’s submission, and then return data, which needs to be dealt with in combination with its own real business scenarios.

Complete code:

from concurrent import futures import grpc from demo2 import serverstrem_pb2_grpc, Serverstrem_pb2 import threading Import time Import Random # Implement the GreeterServicer interface class defined in proto Greeter(ServerStreM_PB2_grpc.greeterServicer): # Implement an RPC call def SayHello(self, request, context) defined in proto: Idnex = 1 while context.is_active(): Idnex =idnex +1 print(" ",idnex) client_name = request. Name # Use the generator to send a message back to the client time.sleep(1) # If you need to shut down the server, you can use: Context.cancel () yield ServerSTREm_pb2. HelloReply(message=f"{client_name}! I'm your uncle! {the random sample (' zyxwvutsrqponmlkjihgfedcba '5)} ") # new processing on the client side flow pattern function, Def SayRequestStream(self, request_iterator, context) def SayRequestStream(self, request_iterator, context): For curr_request in request_iterator: Print (curr_request.name) if curr_request.name==" see you later ": Return serverstrem_pb2. HelloReply (message = f "{curr_request. Name =}! I'll see you soon!" Def serve() def serve(): Instantiate an RPC service Using a thread pool way to start our service server = GRPC. Server (futures. ThreadPoolExecutor (max_workers = 2)) # add our service serverstrem_pb2_grpc.add_GreeterServicer_to_server(Greeter(), Add_insecure_port ('[::]:50051') # Service to be started server.start() # wait_for_termination - The main process ends directly after the target is started! A loop is required to run the process server.wait_for_termination() if __name__ == '__main__': serve()Copy the code

Mainly new service function handling:

Logic description:

  • 1: the server always receives messages from the client, and when I receive them, the end return tells the client to terminate the submission!
  • 2: And XXXX ah! I’ll see you soon! Is returned to the client.

4: Step 4 – Write serverStrem_grPC_client.py GRPC client to call our server:

At this point it is our client that streams the data to our server, so we also design an iterative way to submit our own New Year data:

#! /usr/bin/evn python # coding=utf-8 import grpc from demo2 import serverstrem_pb2, Serverstrem_pb2_grpc import time def run(): # with grpc.insecure_channel('localhost:50051') as channel: GreeterStub(channel) # The body of the argument that needs to be passed when generating the function that requests our service, which is placed in hello_pb2 - the body of the request is: SayHello(serverStrem_pb2. HelloRequest(name='小 小 classmate ')) # for item in response: # print("SayHello "return: :" + item.message) def send_action(): for send_name in [' I am your father ', 'I am your father ',' I am your father ', 'I will see you later']: Print ("send_name:",send_name) time.sleep(1) yield ServerSTREM_pb2. HelloRequest(name=send_name) # response = stub.SayRequestStream(send_action()) print(response.message) if __name__ == '__main__': run()Copy the code

5: Step 5 – Service startup:

  • Starting the server
  • Restart the client

The final output from the client is:

Send_name: I'm your grandpa send_name: I'm your little grandpa Send_name: I'm your brother-in-law Send_name: See you soon! I'll see you soon!Copy the code

Server output:

I'm your uncle I'm your brother-in-law I'll see you laterCopy the code

4.2 Bidirectional flow mode example

Definition:

  • Multiple streaming requests from the client and multiple streaming replies from the server (similar to WebSocket, where the client can request messages from the server and the server can request messages from the client)

1: Step 1: Add interface – Write the ServerStrem. proto file to define the service (which defines the message body and service interface)

syntax = "proto3"; Service Greeter {// Implement RPC SayHello(HelloRequest) returns (stream HelloReply) {} // Add client flow mode RPC SayRequestStream(stream HelloRequest) returns (HelloReply) {} } message HelloRequest { string name = 1; } message HelloReply {string message = 1; // Field information of our request to move the amount of packets}Copy the code

2: Step 2 – Update and compile the ServerStrem. Proto file

PS: it is recommended to pay attention to the current we need to enter the proto file and then execute the command (currently my example adjustment, adjust to Demo2 package) :

python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. serverstrem.proto
Copy the code

3: Step 3 – Update serverStrem_grPC_server.py GRPC server:

Complete code:

from concurrent import futures import grpc from demo2 import serverstrem_pb2_grpc, Serverstrem_pb2 import threading Import time Import Random # Implement the GreeterServicer interface class defined in proto Greeter(ServerStreM_PB2_grpc.greeterServicer): # Implement an RPC call def SayHello(self, request, context) defined in proto: Idnex = 1 while context.is_active(): Idnex = idnex + 1 print(" ", idnex) client_name = request. Name # Use the generator to send a message back to the client time.sleep(1) # If you need to shut down the server, you can use: Context.cancel () yield ServerSTREm_pb2. HelloReply(message=f"{client_name}! I'm your uncle! {the random sample (' zyxwvutsrqponmlkjihgfedcba '5)} ") # new processing on the client side flow pattern function, Def SayRequestStream(self, request_iterator, context) def SayRequestStream(self, request_iterator, context): For curr_request in request_iterator: Print (curr_request.name) if curr_request.name == "see you later ": Return serverstrem_pb2. HelloReply (message = f "{curr_request. Name}! I'll see you soon!" Def SayRequestAndRespStream(self, request_iterator, context) def SayRequestAndRespStream(self, request_iterator, context): For curr_request in request_iterator: Print (curr_request.name) print(curr_request.name) print(curr_request.name Serverstrem_pb2. HelloReply (message = f "{curr_request. Name}! I'm replying from the server! Please accept!!" ) def serve(): Instantiate an RPC service Using a thread pool way to start our service server = GRPC. Server (futures. ThreadPoolExecutor (max_workers = 3)) # add our service serverstrem_pb2_grpc.add_GreeterServicer_to_server(Greeter(), Add_insecure_port ('[::]:50051') # Service to be started server.start() # wait_for_termination - The main process ends directly after the target is started! A loop is required to run the process server.wait_for_termination() if __name__ == '__main__': serve()Copy the code

Mainly new service function handling:

Logic description:

  • 1: The server always receives messages from the client

4: Step 4 – Write serverStrem_grPC_client.py GRPC client to call our server:

At this point it is our client that streams the data to our server, so we also design an iterative way to submit our own New Year data:

#! /usr/bin/evn python # coding=utf-8 import grpc from demo2 import serverstrem_pb2, Serverstrem_pb2_grpc import time def run(): # with grpc.insecure_channel('localhost:50051') as channel: GreeterStub(channel) # The body of the argument that needs to be passed when generating the function that requests our service, which is placed in hello_pb2 - the body of the request is: SayHello(serverStrem_pb2. HelloRequest(name='小 小 classmate ')) # for item in response: # print (" SayHello function call returns results: : "+ item. The message) # # = = = = = = = = = = = = client flow pattern # def send_action () : # for send_name in [' I'm your uncle ', 'I'm your uncle ',' I'll see you soon ']: Sleep (1) # yield ServerSTREM_pb2. HelloRequest(name=send_name) # print("send_name:",send_name) # time.sleep(1) # yield ServerSTREm_pb2. HelloRequest(name=send_name) # Response = stub.sayrequestStream (send_action()) # print(response.message) # ============ def send_action(): For send_name in [' I'm your uncle ', 'I'm your uncle ',' I'll see you soon ']: Sleep (1) yield ServerSTREM_pb2. HelloRequest(name=send_name) # response_iterator = stub.SayRequestAndRespStream(send_action()) for response in response_iterator: print(response.message) if __name__ == '__main__': run()Copy the code

5: Step 5 – Service startup:

  • Starting the server
  • Restart the client

The final output from the client is:

Send_name: I am your uncle send_name: I am your little uncle I am your uncle! I'm replying from the server! Please accept!! Send_name: I am your brother-in-law and I am your little grandpa! I'm replying from the server! Please accept!! Send_name: See you soon I'm your brother-in-law! I'm replying from the server! Please accept!! See you soon! I'm replying from the server! Please accept!!Copy the code

Server output:

I'm your uncle I'm your brother-in-law I'll see you laterCopy the code

5: safety certification

5.1 Supported Authorization Mechanisms

Here are the notes from the official documentation:

  • SSL/TLS

    • GRPc integrates SSL/TLS and improves the SSL/TLS used for server authorization, encrypting all data exchanged between client and server. Provides an optional mechanism for clients to provide credentials to obtain common authorization.
  • The 2.0

    • RPC provides a generic mechanism (described later) to attach metadata-based credentials to requests and replies. When accessing the Google API through gRPC, additional support for obtaining access tokens is provided for certain authorization processes, as shown in the following code example.warning: Google OAuth2 credentials should only be used to connect to Google services. Issuing a Google counterpart OAuth2 token to a non-Google service can result in the token being stolen and used as a fake client to access Google services.
  • API

    To reduce complexity and minimize confusion, gRPC works with a unified credential object. Certificates can be of the following two types:

    • Channel credentialsIs attached tochannelFor example, SSL credentials.
    • Call the credentialsIs appended to the call (or C++)Client context). Vouchers can be usedCombined channel certificateTo combine. aCombined channel certificateYou can combine aChannel credentialsAnd aCall the credentialsAssociation creates a new oneChannel credentials. The result is that each call on the channel sends a compositeCall the credentialsFor authorization data. For example, oneChannel credentialsCan be made up ofSsl certificatesAnd aAccess token credentialsGenerated. The result is that each call on the channel sends the corresponding access token.Call the credentialsYou can useCombination of credentialsTo assemble. After assemblyCall the credentialsApply to aClient contextWill trigger to send bothCall the credentialsAuthorization data of.

5.1 about the SSL

SSL is used for more secure data transmission. The following functions are available:

  1. Authenticate data (authenticate users and services)
  2. Encrypted transmission of data
  3. Maintain data integrity and ensure that data is not changed during transmission

5.2 Implementation with TSL (Python Implementation)

The sample code source: www.cnblogs.com/areful/p/10…

Example of starting GRPC services using SSL:

  • Server:
# -*- coding: utf-8 -*- # Author: areful # # pip install grpcio # pip install protobuf # pip install grpcio-tools # ... # Copyright 2015, Google Inc. # All rights reserved. """The Python implementation of the GRPC helloworld.Greeter server.""" import time from concurrent import futures from gj.grpc.helloworld.helloworld_pb2 import * from gj.grpc.helloworld.helloworld_pb2_grpc import * _ONE_DAY_IN_SECONDS = 60 * 60 * 24 class Greeter(GreeterServicer): def SayHello(self, request, context): return HelloReply(message='Hello, %s! ' % request.name) def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) add_GreeterServicer_to_server(Greeter(), server) with open('server.pem', 'rb') as f: private_key = f.read() with open('server.crt', 'rb') as f: certificate_chain = f.read() with open('ca.crt', 'rb') as f: root_certificates = f.read() server_credentials = grpc.ssl_server_credentials(((private_key, certificate_chain),), root_certificates,True) server.add_secure_port('localhost:50051', server_credentials) server.start() try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) if __name__ == '__main__': serve()Copy the code
  • Client:
# -*- coding: utf-8 -*-
# Author: areful
#
# pip install grpcio
# pip install protobuf
# pip install grpcio-tools
#
# Copyright 2015, Google Inc.
# All rights reserved.
# ...
 
"""The Python implementation of the GRPC helloworld.Greeter client."""
 
from __future__ import print_function
 
from gj.grpc.helloworld.helloworld_pb2 import *
from gj.grpc.helloworld.helloworld_pb2_grpc import *
 
 
def run():
    with open('client.pem', 'rb') as f:
        private_key = f.read()
    with open('client.crt', 'rb') as f:
        certificate_chain = f.read()
    with open('ca.crt', 'rb') as f:
        root_certificates = f.read()
    creds = grpc.ssl_channel_credentials(root_certificates, private_key, certificate_chain)
    channel = grpc.secure_channel('localhost:50051', creds)
    stub = GreeterStub(channel)
    response = stub.SayHello(HelloRequest(name='world'))
    print("Greeter client received: " + response.message)
 
 
if __name__ == '__main__':
    run()
Copy the code

6: Content related to GRPC context objects

6.1 Abstract Base Class:

```
class RpcContext(six.with_metaclass(abc.ABCMeta)):
    """Provides RPC-related information and action."""

    @abc.abstractmethod
    def is_active(self):
        """Describes whether the RPC is active or has terminated.

        Returns:
          bool:
          True if RPC is active, False otherwise.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def time_remaining(self):
        """Describes the length of allowed time remaining for the RPC.

        Returns:
          A nonnegative float indicating the length of allowed time in seconds
          remaining for the RPC to complete before it is considered to have
          timed out, or None if no deadline was specified for the RPC.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def cancel(self):
        """Cancels the RPC.

        Idempotent and has no effect if the RPC has already terminated.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def add_callback(self, callback):
        """Registers a callback to be called on RPC termination.

        Args:
          callback: A no-parameter callable to be called on RPC termination.

        Returns:
          True if the callback was added and will be called later; False if
            the callback was not added and will not be called (because the RPC
            already terminated or some other reason).
        """
        raise NotImplementedError()
```
Copy the code

6.2 Implementation Class:

As you can see from the above example, almost every SRV service has a built-in context object in its function, let’s take a look at its source: the first class to implement RpcContext

class ServicerContext(six.with_metaclass(abc.ABCMeta, RpcContext)):
Copy the code

The subclass:

class _Context(grpc.ServicerContext): def __init__(self, rpc_event, state, request_deserializer): self._rpc_event = rpc_event self._state = state self._request_deserializer = request_deserializer def is_active(self): with self._state.condition: return _is_rpc_state_active(self._state) def time_remaining(self): return max(self._rpc_event.call_details.deadline - time.time(), 0) def cancel(self): self._rpc_event.call.cancel() def add_callback(self, callback): with self._state.condition: if self._state.callbacks is None: return False else: self._state.callbacks.append(callback) return True def disable_next_message_compression(self): with self._state.condition: self._state.disable_next_compression = True def invocation_metadata(self): return self._rpc_event.invocation_metadata def peer(self): return _common.decode(self._rpc_event.call.peer()) def peer_identities(self): return cygrpc.peer_identities(self._rpc_event.call) def peer_identity_key(self): id_key = cygrpc.peer_identity_key(self._rpc_event.call) return id_key if id_key is None else _common.decode(id_key) def auth_context(self): return { _common.decode(key): value for key, value in six.iteritems( cygrpc.auth_context(self._rpc_event.call)) } def set_compression(self, compression): with self._state.condition: self._state.compression_algorithm = compression def send_initial_metadata(self, initial_metadata): with self._state.condition: if self._state.client is _CANCELLED: _raise_rpc_error(self._state) else: if self._state.initial_metadata_allowed: operation = _get_initial_metadata_operation( self._state, initial_metadata) self._rpc_event.call.start_server_batch( (operation,), _send_initial_metadata(self._state)) self._state.initial_metadata_allowed = False self._state.due.add(_SEND_INITIAL_METADATA_TOKEN) else: raise ValueError('Initial metadata no longer allowed! ') def set_trailing_metadata(self, trailing_metadata): with self._state.condition: self._state.trailing_metadata = trailing_metadata def trailing_metadata(self): return self._state.trailing_metadata def abort(self, code, details): # treat OK like other invalid arguments: fail the RPC if code == grpc.StatusCode.OK: _LOGGER.error( 'abort() called with StatusCode.OK; returning UNKNOWN') code = grpc.StatusCode.UNKNOWN details = '' with self._state.condition: self._state.code = code self._state.details = _common.encode(details) self._state.aborted = True raise Exception() def abort_with_status(self, status): self._state.trailing_metadata = status.trailing_metadata self.abort(status.code, status.details) def set_code(self, code): with self._state.condition: self._state.code = code def code(self): return self._state.code def set_details(self, details): with self._state.condition: self._state.details = _common.encode(details) def details(self): return self._state.details def _finalize_state(self): passCopy the code

6.3 Shared Context and Server Context methods

There are probably some methods in the implementation class that we need to know about:

  • Is_active () : Checks whether the client is still alive
  • Time_remaining: indicates the remaining timeout period. The value can be obtained if the timeout period is specified for the request
  • Cancel Cancels the current request. When the server calls this function, the client will directly throw the following exception:
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.CANCELLED details = "Cancelled" debug_error_string = "{" created", "@ 1636954326.072000000", "description" : "the Error received from peersCopy the code
  • Add_callback () : Adds a callback function to call when RPC terminates (not if the link breaks)

  • Disable_next_message_compression: Disables compression of the next response message, which overrides any compression configuration set set during server creation or when invoked

  • Invocation_metadata: get the current custom metadata information, actually get the request header information

  • Set_compression: Sets the compression related algorithm for data transmission at the time

  • Send_initial_metadata: sends metadata information

  • Set_trailing_metadata () : Sets the current transmission of custom response header metadata information

  • Trailing_metadata: Obtaining metadata

  • Abort (self, code, details): Breaks the connection

  • abort_with_status

  • Set_code Sets the status code thrown when an exception occurs

  • Code Gets the status code for the thrown exception

  • Set_details and details sets and gets exception information

6.4 Client Context Methods

There are examples of calls to methods in the context below, such as getting exception information!

  • Code () : a method, not a property, that returns the server’s response code
  • Details () : Returns a description of the server’s response
  • Initial_metadata () Obtain the metadata information sent by the server.
  • Trailing_metadata () accesses trace metadata sent by the server.

Both methods will block until the value is available

6.5 Abnormal Status Code

Analysis of relevant status codes:

@enum.unique
class StatusCode(enum.Enum):
    """Mirrors grpc_status_code in the gRPC Core.

    Attributes:
      OK: Not an error; returned on success
      CANCELLED: The operation was cancelled (typically by the caller).
      UNKNOWN: Unknown error.
      INVALID_ARGUMENT: Client specified an invalid argument.
      DEADLINE_EXCEEDED: Deadline expired before operation could complete.
      NOT_FOUND: Some requested entity (e.g., file or directory) was not found.
      ALREADY_EXISTS: Some entity that we attempted to create (e.g., file or directory)
        already exists.
      PERMISSION_DENIED: The caller does not have permission to execute the specified
        operation.
      UNAUTHENTICATED: The request does not have valid authentication credentials for the
        operation.
      RESOURCE_EXHAUSTED: Some resource has been exhausted, perhaps a per-user quota, or
        perhaps the entire file system is out of space.
      FAILED_PRECONDITION: Operation was rejected because the system is not in a state
        required for the operation's execution.
      ABORTED: The operation was aborted, typically due to a concurrency issue
        like sequencer check failures, transaction aborts, etc.
      UNIMPLEMENTED: Operation is not implemented or not supported/enabled in this service.
      INTERNAL: Internal errors.  Means some invariants expected by underlying
        system has been broken.
      UNAVAILABLE: The service is currently unavailable.
      DATA_LOSS: Unrecoverable data loss or corruption.
    """
    OK = (_cygrpc.StatusCode.ok, 'ok')
    CANCELLED = (_cygrpc.StatusCode.cancelled, 'cancelled')
    UNKNOWN = (_cygrpc.StatusCode.unknown, 'unknown')
    INVALID_ARGUMENT = (_cygrpc.StatusCode.invalid_argument, 'invalid argument')
    DEADLINE_EXCEEDED = (_cygrpc.StatusCode.deadline_exceeded,
                         'deadline exceeded')
    NOT_FOUND = (_cygrpc.StatusCode.not_found, 'not found')
    ALREADY_EXISTS = (_cygrpc.StatusCode.already_exists, 'already exists')
    PERMISSION_DENIED = (_cygrpc.StatusCode.permission_denied,
                         'permission denied')
    RESOURCE_EXHAUSTED = (_cygrpc.StatusCode.resource_exhausted,
                          'resource exhausted')
    FAILED_PRECONDITION = (_cygrpc.StatusCode.failed_precondition,
                           'failed precondition')
    ABORTED = (_cygrpc.StatusCode.aborted, 'aborted')
    OUT_OF_RANGE = (_cygrpc.StatusCode.out_of_range, 'out of range')
    UNIMPLEMENTED = (_cygrpc.StatusCode.unimplemented, 'unimplemented')
    INTERNAL = (_cygrpc.StatusCode.internal, 'internal')
    UNAVAILABLE = (_cygrpc.StatusCode.unavailable, 'unavailable')
    DATA_LOSS = (_cygrpc.StatusCode.data_loss, 'data loss')
    UNAUTHENTICATED = (_cygrpc.StatusCode.unauthenticated, 'unauthenticated')
Copy the code

The abnormality represented by the relevant status code is described as follows:

  • OK is always this by default, when the call returns a success
  • CANCELLED means the error state that the link has been broken
  • UNKNOWN indicates an UNKNOWN error. When an UNKNOWN error occurs on the server, such as web500, this error occurs when about is used as a normal parameter.
  • INVALID_ARGUMENT: failed to verify the parameters submitted by the client
  • DEADLINE_EXCEEDED An error that indicates a request timeout
  • NOT_FOUND Indicates that the requested function or resource could not be found
  • ALREADY_EXISTS indicates an error when a request processing resource already exists, similar to a unique index to a database
  • PERMISSION_DENIED Permission error, no access permission
  • UNAUTHENTICATED indicates that authentication fails and invalid information is authenticated
  • RESOURCE_EXHAUSTED: Indicates that the requested resource is exhausted and no resource is available
  • FAILED_PRECONDITION means the request processing is rejected
  • ABORTED Indicates that a request is interrupted, terminates the request, or an operation is ABORTED, usually due to concurrency problems, such as sequential check failures, transaction aborts, etc.
  • UNIMPLEMENTED says that such requests are not supported or cannot be processed at this time
  • INTERNAL means unexpected exception error which is similar to UNKNOWN
  • UNAVAILABLE The UNAVAILABLE service is UNAVAILABLE
  • DATA_LOSS Indicates data loss

6.6 Exception Handling Examples

Server throw exception:

GreeterServicer class Greeter(hello_PB2_grpc.greeterServicer) Def SayHello(self, request, context): Return hello_pb2.HelloReply(message='hello {MSG}'. Format (MSG =request.name)) def SayHelloAgain(self, request, context): # returns is our definition of the object of the response body # set the abnormal status code context. Set_code (GRPC. StatusCode. PERMISSION_DENIED) context. Set_details (" you don't have the access permissions ") raise context return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name))Copy the code

Client receiving exception:

Client processing exception:

#! /usr/bin/evn python # coding=utf-8 import grpc import hello_pb2 import hello_pb2_grpc def run(): With grpc.insecure_channel('localhost:50051') as channel: # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2 Hello_pb2. Try HelloRequest objects: Response = stub.sayHelloagain (hello_pb2.HelloRequest(name=' welcome next time ')) print("SayHelloAgain " + response.message) except grpc._channel._InactiveRpcError as e: print(e.code()) print(e.details()) if __name__ == '__main__': run()Copy the code

Final output:

6.6 initial_metadata and trailing_metadata

  • Initial metadata Initial_metadata
    • The initial metadata initial_metadata can actually understand the client request header information
  • Trailing metadata trailing_metadata
    • Trailing_metadata follows metadata in response to header information

6.6.1 Setting the response Header on the Server

In general, if we have a special need to return the response header information, we can take a similar approach to meet the requirements:

For example, the server returns a response message:

def set_trailing_metadata(self, trailing_metadata):
    with self._state.condition:
        self._state.trailing_metadata = trailing_metadata
Copy the code

Source code analysis: passed parameter format:

Oh, for god’s sake, it’s a Tuple.

  • I need to preach a tuple objects, (MetadataKey MetadataValue)
  • * ARgs indicates that my I can receive multiple values

Hence the following server example:

However, when the client request is started, the client is stuck and has not responded! The output of the server is as follows:

Validate_metadata: {" created ":" @ 1636960201.178000000 ", "description" : "Illegal header value", "filCopy the code

Your element check failed!! When I change Chinese to other, I wipe, !!!!! Have can pass!!

It seems that there is a problem with our Chinese support. It seems that our header file is not set in Chinese!! So hehe! I blame myself!

6.6.2 The Client Obtains response Header Information

Reference: cn.voidcc.com/question/p-…

So there is the following treatment:

Def run(): # with grpc.insecure_channel('localhost:50051') as channel: # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2 Hello_pb2. Try HelloRequest objects: Response,callbask = stub.sayHelloagain. with_call(hello_pb2.HelloRequest(name=' welcome next time ')) print("SayHelloAgain "+ response.message) print("SayHelloAgain " ,callbask.trailing_metadata()) except grpc._channel._InactiveRpcError as e: print(e.code()) print(e.details())Copy the code

6.6.3 Obtaining the Server Obtain the client request header information

Server:

#! /usr/bin/evn python # -*- coding: Utf-8 -*- from concurrent import futures import time import GRPC import hello_pb2 import hello_pb2_grPC # implement as defined in proto file GreeterServicer class Greeter(hello_PB2_grpc.greeterServicer): Def SayHello(self, request, context): Return hello_pb2.HelloReply(message='hello {MSG}'. Format (MSG =request.name)) def SayHelloAgain(self, request, context): # returns is our definition of the object of the response body. # # # set the abnormal status code context set_code (GRPC. StatusCode. PERMISSION_DENIED) # context. Set_details (" you don't have the access permissions)" Print (" Received request header metadata ",context.invocation_metadata()) print(" received request header metadata" context.set_trailing_metadata((('name','223232'),('sex','23232'))) return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def serve(): Instantiate an RPC service Using a thread pool way to start our service server = GRPC. Server (futures. ThreadPoolExecutor (max_workers = 10)) # add our service hello_pb2_grpc.add_GreeterServicer_to_server(Greeter(), Add_insecure_port ('[::]:50051') # Service to be started server.start() # wait_for_termination - The main process ends directly after the target is started! A loop is required to run the process server.wait_for_termination() if __name__ == '__main__': serve()Copy the code

The client submits a custom request header:

#! /usr/bin/evn python # -*- coding: utf-8 -*- import grpc import hello_pb2 import hello_pb2_grpc def run(): With grpc.insecure_channel('localhost:50051') as channel: # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2 Hello_pb2. Try HelloRequest objects: reest_header = ( ('mesasge', '1010'), ('error', 'No Error') ) response, Callbask = stub.sayHelloagain. with_call(request=hello_pb2.HelloRequest(name=' welcome next time ')), Metadata =reest_header,) print("SayHelloAgain ") "+ response.message) print("SayHelloAgain ", callbask.trailing_metadata()) except grpc._channel._InactiveRpcError as e: print(e.code()) print(e.details()) if __name__ == '__main__': run()Copy the code

The output is:

Client:

SayHelloAgain returns the result of the SayHelloAgain call: (_Metadatum(key='name', value='223232'), _Metadatum(key='sex', value='23232'))Copy the code

Server:

The received request header metadata information (_Metadatum(key=' mesASGE ', value='1010'), _Metadatum(key='error', value='No error'), _Metadatum (key = 'the user-agent, value =' GRPC - python / 1.41.1 GRPC - c / 19.0.0 (Windows; chttp2)'))Copy the code

6.7 Modifying the Data Transfer Size and Decompressing Data

Generally, our service sets the upper limit of the data it can receive and the upper limit of the data it can send, so we can limit the size of the data that can be transmitted by both our server and client.

In addition, if the transmission data is too large, our communication will also carry out relevant decompression of the data to speed up the efficient transmission of data. For the server side we can set global compression and local compression.

  • Server-side data compression and data limitation:
#! /usr/bin/evn python # -*- coding: Utf-8 -*- from concurrent import futures import time import GRPC import hello_pb2 import hello_pb2_grPC # implement as defined in proto file GreeterServicer class Greeter(hello_PB2_grpc.greeterServicer): Def SayHello(self, request, context): Return hello_pb2.HelloReply(message='hello {MSG}'. Format (MSG =request.name)) def SayHelloAgain(self, request, context): # returns is our definition of the object of the response body. # # # set the abnormal status code context set_code (GRPC. StatusCode. PERMISSION_DENIED) # context. Set_details (" you don't have the access permissions)" Print (" Received request header metadata ", Set_trailing_metadata ((('name', '223232'), ('sex', '23232'))) # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Gzip Deflate = Context. Set_compression (grpc.Com compression.Gzip) return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def serve(): Instantiate an RPC service Options = [('grpc.max_send_message_length', 60 * 1024 * 1024), Max_receive_message_length ', 60 * 1024 * 1024), # limit to receive the size of the largest data] # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Gzip Deflate = Gzip # configure the service to enable global data transmission compression = grpc.Com compression.Gzip server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options, Hello_pb2_grpc.add_greeterservicer_to_server (Greeter()), Add_insecure_port ('[::]:50051') # Service to be started server.start() # wait_for_termination - The main process ends directly after the target is started! A loop is required to run the process server.wait_for_termination() if __name__ == '__main__': serve()Copy the code
  • Client-side data compression and data limitation:
#! /usr/bin/evn python # -*- coding: utf-8 -*- import grpc import hello_pb2 import hello_pb2_grpc def run(): Options = [('grpc.max_send_message_length', 60 * 1024 * 1024), Max_receive_message_length ', 60 * 1024 * 1024), # limit to receive the size of the largest data] # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Gzip Deflate = Gzip # specifies the compression mechanism used by the service to enable global data transmission.Gzip = grpc.Com compression Description Configure some client parameters with grpc.insecure_channel(target='localhost:50051', options=options, compression = compression ) as channel: # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2 Hello_pb2. Try HelloRequest objects: reest_header = ( ('mesasge', '1010'), ('error', 'No Error') ) response, Callbask = stub.sayHelloagain. with_call(request=hello_pb2.HelloRequest(name=' welcome next time ')), Metadata =reest_header,) print("SayHelloAgain ") "+ response.message) print("SayHelloAgain ", callbask.trailing_metadata()) except grpc._channel._InactiveRpcError as e: print(e.code()) print(e.details()) if __name__ == '__main__': run()Copy the code

6.8 Client Retry Mechanism

The retry mechanism refers to the retry mechanism when the client request service does not respond. However, the retry cycle is not infinite, and a certain degree is required.

The following reference some information from: blog.csdn.net/chinesehuaz…

Some configuration parameters are described as follows:

  • Grpc. max_send_message_length Specifies the maximum amount of data to be sent
  • Grpc. max_receive_message_length Specifies the maximum amount of data to be received
  • Grpc. enable_retries Transparent retry mechanism. The default value is 1
  • Grpc. service_config – Configures the retry mechanism policy
{" attempts ":{"retryPolicy":{" attempts ": 4, "initialBackoff": "0.1s", "maxBackoff": "1s", "backoffMutiplier": 2, "retryableStatusCodes": [ "UNAVAILABLE" ] } }Copy the code

PS: retryableStatusCodes Sets retry error codes, which trigger retries only when UNAVAILABLE.

You can specify the number of retries and so on. For specific parameter meanings, please refer to the official website for a brief introduction:

-maxAttempts must be an integer greater than 1, for values greater than 5 are treated as 5-initialBackoff and maxBackoff must be specified, and have a multiplier greater than 0-backoffmultiplier must be specified, And greater than zero - retryableStatusCodes must be formulated as status codes for data, cannot be empty, and no status codes must be valid gPRC status codes, can be integers, and are case insensitiveCopy the code

6.9 Client Retry Policy for Hedging

Hedging is to point to

  • If a method uses a hedging strategy, the first request is sent as a normal RPC call, if no response is received within the configured time, then the second request is sent directly, and so on until the maxAttempts are sent

  • In the case of multiple retries, note the idempotency problem in the case of back-end load balancing

6.10 The Client retries the Traffic limiting Policy

  • When the client failure to success ratio exceeds a certain threshold, gRPC disables these retry policies to prevent server overload due to retries
  • The actual traffic limiting parameters are determined based on server performance resources

Current limiting instructions:

  • For each server, the gRPC client maintains a token_count variable, initially set to maxToken, with values ranging from 0 to maxToken

  • Token_count has an effect on each RPC request

    • Each failed RPC request decrements token_count 1
    • Successful RPC will increment toKEN_count and tokenRatio If token_count <= (maxTokens/2), retry policy will be turned off until token_count > (maxTokens/2)

Configure the following information in servie config:

{“retryThrottling”:{“maxTokens”: 10, “tokenRatio”: 0.1}

7: Use signals to monitor the end of the GRPC service process

Usually, when we use GRPC to do SRV of microservices, we need a mechanism to monitor the situation of our service process, the discovery and registration of user services have been unregistered.

If the service is not in the registry and unregistered, the request is sent to the wrong backend.

In fact, we mainly use the signal mechanism to listen to our service.

PS: Windows has limited support for signals, KeyboardInterrupt cannot be caught, and terminating a process directly from the process manager is unknown

Complete example:

` ` ` #! /usr/bin/evn python # -*- coding: utf-8 -*- import sys from concurrent import futures import time import grpc import hello_pb2 import hello_pb2_grpc Import signal # GreeterServicer class Greeter(hello_PB2_grpc.greeterServicer): Def SayHello(self, request, context): Return hello_pb2.HelloReply(message='hello {MSG}'. Format (MSG =request.name)) def SayHelloAgain(self, request, context): # returns is our definition of the object of the response body. # # # set the abnormal status code context set_code (GRPC. StatusCode. PERMISSION_DENIED) # context. Set_details (" you don't have the access permissions)" Print (" Received request header metadata ", Set_trailing_metadata ((('name', '223232'), ('sex', '23232'))) # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Gzip Deflate = Context. Set_compression (grpc.Com compression.Gzip) return hello_pb2.HelloReply(message='hello {msg}'.format(msg=request.name)) def serve(): Instantiate an RPC service Options = [('grpc.max_send_message_length', 60 * 1024 * 1024), Max_receive_message_length ', 60 * 1024 * 1024), # limit to receive the size of the largest data] # # compression mechanism of three kinds of processing NoCompression = _compression. # NoCompression Deflate = _compression. # Gzip Deflate = Gzip # configure the service to enable global data transmission compression = grpc.Com compression.Gzip server = grpc.server(futures.ThreadPoolExecutor(max_workers=10), options=options, Hello_pb2_grpc.add_greeterservicer_to_server (Greeter()), Add_insecure_port ('[::]:50051') # def stop_serve(signum, frame): Print (" Process terminated !!!!" ) # sys.exit(0) raise KeyboardInterrupt # cancel the associated signal # SIGINT corresponding to windos command CTRL + C # SIGTERM corresponding to Linux kill command signal.signal(signal.SIGINT, stop_serve) # signal.signal(signal.SIGTERM, Stop_serve) # wait_for_termination A loop is required to run the process server.wait_for_termination() if __name__ == '__main__': serve() 'Copy the code

8: Use coroutine to start the service

8.1 Installing dependency packages

While the above examples handle concurrency in a thread pool-based manner, the following examples use coroutines.

First install the new dependency package:

The relevant version should correspond to:

Grpcio - reflection = = 1.41.1 PIP install grpcio - reflection - I https://pypi.tuna.tsinghua.edu.cn/simpleCopy the code

After final installation:

8.2 Modifying the Server Startup

Modify our server code (modified from section 3.3) :

#! /usr/bin/evn python # -*- coding: utf-8 -*- import grpc import hello_pb2 import hello_pb2_grpc from grpc_reflection.v1alpha import reflection import Asyncio # implements GreeterServicer's interface class Greeter(hello_PB2_grpc.greeterServicer) defined in proto: Async def SayHello(self, request, context) def SayHello(self, request, context) Hello_pb2. HelloReply(message='hello {MSG}'. Format (MSG =request.name)) async def SayHelloAgain(self, request, context): Return hello_pb2.HelloReply(message='hello {MSG}'. Format (MSG =request.name)) async def serve(): Instantiate an RPC service Start our service_names = (Hello_pb2.descriptor.services_by_name ["Greeter"].full_name, Reflection.SERVICE_NAME, Hello_pb2_grpc.add_greeterservicer_to_server (Greeter()), server) reflection.enable_server_reflection(service_names, Add_insecure_port ('[::]:50051') await server.start() await server.wait_for_termination() if __name__ == '__main__': asyncio.run(serve())Copy the code

8.3 Starting client Invocation

Our client remains the same as in section 3.3:

#! /usr/bin/evn python # coding=utf-8 import grpc import hello_pb2 import hello_pb2_grpc def run(): With grpc.insecure_channel('localhost:50051') as channel: # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2_grpc.greeterStub (channel) # Hello_pb2 SayHello(hello_pb2.HelloRequest(name=' xiaobell ')) print("SayHello" "+ response.message) response = stub.sayHelloagain (hello_pb2.HelloRequest(name=' welcome next time ')) Print ("SayHelloAgain return: "+ response.message) if __name__ == '__main__': run()Copy the code

Direct startup also enables normal communication with the server.

3. Summary


The above is just a personal combination of their own actual needs, do study practice notes! If there are clerical errors! Welcome criticism and correction! Thank you!

At the end

END

Jane: www.jianshu.com/u/d6960089b…

The Denver nuggets: juejin. Cn/user / 296393…

Public account: wechat search [children to a pot of wolfberry wine tea]

Let students | article | QQ: welcome to learn communication 】 【 308711822