GRPC

This paper mainly introduces the application example and principle of GRPC, and how to integrate GRPC with CONSUL

What is gRPC?

GRPC is also based on the idea of defining a service and specifying methods (including parameters and return types) that can be called remotely. Implement this interface on the server side and run a gRPC server to handle the client calls. Having a stub on the client side can act like a server-side method.

In gRPC, the client application can directly call the methods of the server application on a different machine like calling the local object, making it easier to create distributed applications and services.

Reference: gRPC Python Quickstart

Before starting, ensure that the grpciO-tools and GRpcio packages have been installed

There are three steps to define a GRPC:

  1. Define a message type
  2. Compile the proto file
  3. Write server-side code
  4. Write client code

Let’s take a GRPC that implements an Echo as an example.

Define a message type

First define the message format (format of protobuf messages) for communication parties (i.e. client and server) to interact, and then define the echo service as follows:

syntax = "proto3"; // Declare using proto3 syntax // define the protobuf format for client requests, as shown below, containing a string field q message Req {string q = 1; } // Define the corresponding protobuf format for the server, as shown below, containing a string field A message Resp {string a = 1; } // Define the echo service, as shown below, which contains an RPC service Echoer{RPC echo (Req) returns (Resp) {}}Copy the code

Compile with the following command:

python -m grpc_tools.protoc -I./ --python_out=. --grpc_python_out=. ./Echoer.proto
Copy the code

Generate two py files

  • Echoer_pb2.py This file contains the generated Request (Req) and Response (Resp) classes.
  • Echoer_pb2_grpc.py This file contains the classes for the generated client (EchoerStub) and server (EchoerServicer)

Create server-side code

Creating and running an Echoer service can be divided into two parts:

  • The generated service interface that implements our service definition: the function that does the actual “work” of our service.
  • Run a gRPC server that listens for requests from clients and transmits responses from the service.

In the current directory, create the file echoer_server.py and implement a new function:

from concurrent import futures
import time

import grpc

import Echoer_pb2
import Echoer_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24


class Echoer(Echoer_pb2_grpc.EchoerServicer):
	# work function
    def SayHello(self, request, context):
        return Echoer_pb2.Resp(a="echo")


def serve(a):
    # gRPC server
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server)
    server.add_insecure_port('[: :] : 50051')
    server.start()  # start() does not block, and if your code has nothing else to do at runtime, you may need to wait in a loop.
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        server.stop(0)

if __name__ == '__main__':
    serve()
Copy the code

Create the client code

In the current directory, open the file echoer_client.py and implement a new function:

from __future__ import print_function

import grpc

import Echoer_pb2
import Echoer_pb2_grpc


def run(a):
    channel = grpc.insecure_channel('localhost:50051') Create channel
    stub = Echoer_pb2_grpc.EchoerStub(channel) Get the credentials through the channel, i.e. Stub
    response = stub.echo(Echoer_pb2.Req(q='echo')) Call RPC to get the response
    print("Echoer client received: " + response.a)

if __name__ == '__main__':
    run()
Copy the code

Run the code

Run the server-side code first

python Echoer_server.py
Copy the code

Copy the code and run the client code

python Echoer_client.py
# output
Echoer client received: echo
Copy the code

The advanced

Check out the reference blog

For communication security, GRPC provides TSl\SSL support.

Start by creating a self-signed certificate using OpenSSL

$ openssl genrsa -out server.key 2048 Generating RSA private key, 2048 bit long modulus (2 primes) ............................................................ +++++ . . +++++ e is 65537 (0x010001) $ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 You are about to  be asked to enter information that will be incorporated into your certificate request. What you are about to enter is what is called a Distinguished Name or a DN. There are quite a few fields but you can leave some blank For some fields there will be a default value, If you enter'. ', the field will be left blank.
-----
Country Name (2 letter code) [AU]:
State or Province Name (full name) [Some-State]:
Locality Name (eg, city) []:
Organization Name (eg, company) [Internet Widgits Pty Ltd]:
Organizational Unit Name (eg, section) []:
Common Name (e.g. server FQDN or YOUR name) []:Echoer
Email Address []:
Copy the code

The server.key and server. CRT files are generated. Both files are required on the server and only the CRT file is required on the client

Modify the server code

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Echoer_pb2_grpc.add_EchoerServicer_to_server(Echoer(), server)
# fetch key and certificate
with open(os.path.join(os.path.split(__file__)[0].'server.key')) as f:
    private_key = f.read().encode()
with open(os.path.join(os.path.split(__file__)[0].'server.crt')) as f:
    certificate_chain = f.read().encode()
Create the server credentials
server_creds = grpc.ssl_server_credentials(((private_key, certificate_chain,),))
# call add_secure_port instead of add_insesure_port
server.add_secure_port('localhost:50051', server_creds)
Copy the code

Modify the client code

Read the certificate
with open('server.crt') as f:
    trusted_certs = f.read().encode()
# to create credentials
credentials = grpc.ssl_channel_credentials(root_certificates=trusted_certs)
Use secure_channel instead of insecure_channel
channel = grpc.secure_channel('localhost:50051', credentials)
Copy the code

After the server is started and the client is started, the following error occurs:

grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Connect Failed"
        debug_error_string = "{"created":"@ 1547552759.642000000","description":"Failed to create subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line": 2721,"referenced_errors": [{"created":"@ 1547552759.642000000","description":"Pick Cancelled","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line": 241,"referenced_errors": [{"created":"@ 1547552759.642000000","description":"Connect Failed","file":"src/core/ext/filters/client_channel/subchannel.cc","file_line": 689,"grpc_status": 14."referenced_errors": [{"created":"@ 1547552759.642000000","description":"Peer name localhost is not in peer certificate","file":"src/core/lib/security/security_connector/security_connector.cc","file_line": 880}}}}]]]"
>
Copy the code

!!!!!!!!! Warning:

This is because in TSL\SSL mode, the client obtains the credentials of the service through the service name :port, not IP :port, so the client changes as follows:

# change before
channel = grpc.secure_channel('localhost:50051', credentials)
# modified
channel = grpc.secure_channel('Echoer:50051', credentials)
Copy the code

!!!!!!!!! Warning:

Then, in THE TSL\SSL mode, the client needs DNS support to resolve the service name :port. At present, we do not know how to solve the problem. We can only take the following measures to solve the problem, by modifying the Host file of Windows, using host to resolve the service name to IP address, open the Host file of Windows, C:\Windows\System32\drivers\etc\hosts

Name of the service
127.0.0.1 Echoer
Copy the code

Can be saved

After the modification, run it again to run it successfully

Note: The CA certificate and private key are compatible. If the CA certificate and key are incompatible, the verification fails

Combining with the consul

Note: Check that Consul has been properly started by going to http://ip:port:8500/ to view the consul status and ensure that the python-Consul library has been installed. Otherwise, consul cannot be operated

First imagine that our sample GRPC program was successful under certain conditions,

  • We know that the server has started normally
  • We know the IP and port of the server

In practice, however, it is often impossible to know for sure what consul’s IP address and port are, so Consul serves as a bridge between consul and consul as follows:

The service registry

Service registration, as the name implies, a service must be registered in Consul before it can be started.

Server: After the server is started, Consul establishes a connection with the service using the IP address and port obtained during service registration. The most important one is health Check, which is a heartbeat check. Consul uses a heartbeat check to determine if the service is running properly.

Client: The client uses Consul to query the IP address and port of the service to be queried. If the service has been registered on Consul and heartbeat detection is normal, the IP address and port information is returned to the client. Then the client can connect to the server using this information

Example service registration code is as follows:

def register(self, server_name, ip, port, consul_host=CONSUL_HOST):
    Port: port to which the service listens port consul_host: IP address of the consul server to which it is connected ""
    c = consul.Consul(host=consul_host) Get a connection to Consul
    print(F "Start registration service{server_name}")
    check = consul.Check.tcp(ip, port, "10s") Set the heartbeat detection timeout period and the corresponding IP and port
    c.agent.service.register(server_name, f"{server_name}-{ip}-{port}", address=ip, port=port, check=check) # registered
Copy the code

If there is a service registered, there will be a service unregistered. Example code is as follows:

def unregister(self, server_name, ip, port, consul_host=CONSUL_HOST):
    c = consul.Consul(host=consul_host)
    print(F "Starts to exit the service{server_name}")
    c.agent.service.deregister(f"{server_name}-{ip}-{port}")
Copy the code

Service query

The client needs to query the IP address and port of the corresponding service in Consul. In TSL/SSL mode, only the service name and port are required. Therefore, only the port port is required.

The client service queries are DNS queries and you must ensure that the DNSpython library is installed to create DNS queries

The sample code for a service query is as follows:

Create a resolver for consul DNS query
consul_resolver = resolver.Resolver()
consul_resolver.port = 8600
consul_resolver.nameservers = [consul_host]

def get_host_port(self, server_name):
    try:
        dns_answer_srv = consul_resolver.query(f"{server_name}.service.consul"."SRV") Select * from 'port' where 'port' = 'port'
    except DNSException as e:
        return None.None
    return server_name, dns_answer_srv[0].port Return the service name and port
Copy the code

GRPC flow pattern

GRPC provides a total of four data interaction modes:

  • Simpe simple mode RPC: that is, all GRPCS mentioned above
  • Server-side Streaming The server-side streaming RPC
  • Client-side Streaming Client Streaming RPC
  • Bidirectional Streaming gRPC

Because GRPC has a limit on the size of messages, the diFF data is too large to receive data. We use the flow mode to solve this problem. In this mode, the parameters passed by the client are changed from the concrete Protobuf to the iterator of the protobuf, and the response received by the client is also changed to the iterator. Obtaining a complete response requires iterative retrieval. The server response also becomes an iterator.

Modify the service definition file:

Service Echoer{RPC echo (Req) returns (Resp) {}} service Echoer{RPC echo (stream Req) returns (stream) Resp) {} }Copy the code

recompile

Modifying the Server

Change the worker function to look like this, that is, the worker function becomes an iterator:

def echo(self, request_iterator, context):
    for i in range(10) :yield Echoer_pb2.Resp(a="echo")
Copy the code

Modifying a Client

Echo: iterator echo: iterator

def qq(a):
    for i in range(10) :yield Echoer_pb2.Req(q="echo")
response = stub.echo(qq())
for resp in response:
    print("Echoer client received: " + response.a)
Copy the code

Rerun and receive the following result:

$ python Echoer_client.py
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Echoer client received: echo
Copy the code