This article is from the huawei cloud community “What is GRPC bidirectional flow? 2 pieces of code to tell you” by the author of breakDawn.

Why GRPC bidirectional flow?

Sometimes the request call and return process, which is not simply a question and answer process, may involve sending once, returning several times in batches, or sending randomly to each other.

Therefore, simple restful model cannot meet the above common requirements. GRPC bidirectional flow emerges and realizes bidirectional asynchronous IO communication through a TPC link.

GRPC two-way flow

A two-way streaming RPC is a sequence of messages sent by both parties using a read/write stream.

The two streams operate independently, so the client and server can read and write in any order they like: for example, the server can wait to receive all the client messages before writing the response or it can alternate reading and writing messages or other combinations of reads and writes.

  • It can be understood as the use of asynchronous IO in common IO models

The order of messages in each flow is reserved. You can specify the type of method by predating the request and response with the stream keyword.

  // Accepts a stream of RouteNotes sent while a route is being traversed,
  // while receiving other RouteNotes (e.g. from other users).
  rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
Copy the code

Two-way flow calls from the client

  1. Define a reponseOberserver, or response observer, that defines how to process messages returned by the server. Typically, messages are put into some blocking queue or a single capacity queue SettableFuture.
  2. Calling stub.sendMessage(reponseOberserver) tells the GRPC framework that I’m going to use this reponseOberserver to handle the sendMessage response.

Note that the name of the sendMesage method depends on how it is defined in our proto.

  1. The stub.sendMessage() method then returns a requestObserver that lets us use the observer.onNext() to send the request as many times as we want, all at once.
  2. When no more needs to be sent, onCompleted can be called to tell the other party that it is finished.

Here is a sample code excerpt from the official website:

public void routeChat() throws Exception { info("*** RoutChat"); final SettableFuture<Void> finishFuture = SettableFuture.create(); StreamObserver reponseObserver = new StreamObserver<RouteNote>() {@override public void onNext(RouteNote note) { info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation() .getLatitude(), note.getLocation().getLongitude()); } @Override public void onError(Throwable t) { finishFuture.setException(t); } @override public void onCompleted() {finishFuture.set(null) {finishFuture.set(null); }}; // The framework returns me a request flow observer and tells me to use this observer onNext (message) to send the request, which is bound to the responseServer I sent to it. StreamObserver<RouteNote> requestObserver = asyncStub.routeChat(); try { RouteNote[] requests = {newNote("First message", 0, 0), newNote("Second message", 0, 1), newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)}; for (RouteNote request : requests) { info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation() .getLatitude(), request.getLocation().getLongitude()); requestObserver.onNext(request); } requestObserver.onCompleted(); finishFuture.get(); info("Finished RouteChat"); } catch (Exception t) { requestObserver.onError(t); logger.log(Level.WARNING, "RouteChat Failed", t); throw t; }}Copy the code

Server-side processing:

  1. When we set up the server, we need to call nettyServer, set up the Netty service, and bind an xxxServiceImpl abstract class. This xxxServiceImpl is the server structure we defined in Proto that supports processing the messages we defined.
  2. In xxxServiceImpl, there are many overridden methods that require you to define how to handle incoming requests and how to send responses to clients. The action to send the response is requestObserver.onNext(response message) in the parameter.
  3. The returned xxxService class is provided to the Netty and GRPC framework in the first step. Upon receiving the message, it will use its asynchronous mechanism to separate the network thread from the business thread and walk over to the execution place.

Here is a sample code excerpt from the official website:

class	xxxService extend   xxxServiceImpl{
     @Override
    public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
      int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
      int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
      int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
      int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

      for (Feature feature : features) {
        if (!RouteGuideUtil.exists(feature)) {
          continue;
        }

        int lat = feature.getLocation().getLatitude();
        int lon = feature.getLocation().getLongitude();
        if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
          responseObserver.onNext(feature);
        }
      }
      responseObserver.onCompleted();
    }
}
Copy the code

Click to follow, the first time to learn about Huawei cloud fresh technology ~