preface

Hello everyone, I am fried fish. This chapter will introduce the flow of gRPC, which can be divided into three types:

  • Server-side Streaming RPC: Indicates the server-side streaming RPC
  • Client-side Streaming RPC: Indicates the Client streaming RPC
  • Bidirectional Streaming RPC: Indicates two-way streaming RPC

flow

Any technology, because there are pain points, so there is the necessity of existence. If you want to see streaming calls to gRPC, go ahead

figure

GRPC Streaming is based on HTTP/2, which will be explained in more detail in the following sections

Why not use Simple RPC

Why should streaming exist? Is there something wrong with Simple RPC? By simulating service scenarios, the following problems occur when Simple RPC is used:

  • Instantaneous pressure caused by too large packets
  • When receiving data packets, only after all data packets are accepted successfully and correctly, can the response be called back for service processing (the client cannot send the data packets while the server processes them).

Why Streaming RPC

  • Large packet
  • Real-time scene

Simulation scenario

Every day at 6 am, A batch of data sets of millions are synchronized from A to B. During synchronization, A series of operations (archiving, data analysis, profiling, logging, etc.) are performed. That’s a lot of data at one time

After the synchronization is completed, some people will immediately go to the data, for the new day preparation. Also in real time.

In contrast, Streaming RPC is more suitable for this scenario

gRPC

I’ll focus on the first section when I talk about gRPC streaming code specifically, because the three modes are really different combinations. Hope you can pay attention to understand, draw inferences by analogy, it is same knowledge point actually 👍

The directory structure

$├─ ├─ ├─ ├─ go │ ├─ $├─ go │ ├─ go │ ├─ $├─ go │ ├─ go │ ├─ ├─ ├─ class.go ├── search. Go ├─ class.go ├── search └ ─ ─ server. GoCopy the code

Stream_server, stream_client to store server and client files, proto/stream.proto to write IDL

IDL

In the stream. Proto file under the proto folder, write the following:

syntax = "proto3";

package proto;

service StreamService {
    rpc List(StreamRequest) returns (stream StreamResponse) {};

    rpc Record(stream StreamRequest) returns (StreamResponse) {};

    rpc Route(stream StreamRequest) returns (stream StreamResponse) {};
}


message StreamPoint {
  string name = 1;
  int32 value = 2;
}

message StreamRequest {
  StreamPoint pt = 1;
}

message StreamResponse {
  StreamPoint pt = 1;
}
Copy the code

Note the keyword stream, which is declared as a stream method. There are three methods involved here, and the corresponding relationship is

  • List: server-side streaming RPC
  • Record: client streaming RPC
  • Route: bidirectional streaming RPC

Base template + empty definition

Server

package main

import (
	"log"
	"net"

	"google.golang.org/grpc"

	pb "github.com/EDDYCJY/go-grpc-example/proto"
	
)

type StreamService struct{}

const (
	PORT = "9002"
)

func main() {
	server := grpc.NewServer()
	pb.RegisterStreamServiceServer(server, &StreamService{})

	lis, err := net.Listen("tcp".":"+PORT)
	iferr ! = nil { log.Fatalf("net.Listen err: %v", err)
	}

	server.Serve(lis)
}

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
	return nil
}

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
	return nil
}

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
	return nil
}
Copy the code

Before writing the code, you are advised to define the basic template and interface of the gRPC Server. If you are not clear, refer to the previous chapter for information

Client

package main

import (
    "log"
    
	"google.golang.org/grpc"

	pb "github.com/EDDYCJY/go-grpc-example/proto"
)

const (
	PORT = "9002"
)

func main() {
	conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
	iferr ! = nil { log.Fatalf("grpc.Dial err: %v", err)
	}

	defer conn.Close()

	client := pb.NewStreamServiceClient(conn)

	err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}})
	iferr ! = nil { log.Fatalf("printLists.err: %v", err)
	}

	err = printRecord(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Record", Value: 2018}})
	iferr ! = nil { log.Fatalf("printRecord.err: %v", err)
	}

	err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}})
	iferr ! = nil { log.Fatalf("printRoute.err: %v", err)
	}
}

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	return nil
}

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	return nil
}

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	return nil
}
Copy the code

1, The Server-side Streaming RPC: the Server-side streaming RPC

Server-side streaming RPC is obviously one-way flow, and refers to the Server as a Stream and the Client as a normal RPC request

In simple terms, the client initiates a common RPC request, the server sends the data set several times through streaming response, and the client Recv receives the data set. The general figure is as follows:

Server

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
	for n := 0; n <= 6; n++ {
		err := stream.Send(&pb.StreamResponse{
			Pt: &pb.StreamPoint{
				Name:  r.Pt.Name,
				Value: r.Pt.Value + int32(n),
			},
		})
		iferr ! = nil {return err
		}
	}

	return nil
}
Copy the code

On the Server, focus on the stream.send method. It looks like it can be sent N times, right? Is there a size limit?

type StreamService_ListServer interface {
	Send(*StreamResponse) error
	grpc.ServerStream
}

func (x *streamServiceListServer) Send(m *StreamResponse) error {
	return x.ServerStream.SendMsg(m)
}
Copy the code

Through reading the source code, it is known that protoc generates various interface methods according to the definition when it is generated. Finally, the internal SendMsg method is unified, which involves the following process:

  • Message body (object) serialization
  • Compressing the serialized message body
  • Adds a 5-byte header to the body of the message being transmitted
  • Determines whether the total length of the compressed + serialized message body is greater than the default maxSendMessageSize (the default ismath.MaxInt32), an error message will be displayed if the number exceeds
  • A data set written to a stream

Client

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	stream, err := client.List(context.Background(), r)
	iferr ! = nil {return err
	}

	for {
		resp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		iferr ! = nil {return err
		}

		log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
	}

	return nil
}
Copy the code

On the Client side, focus on the stream.recv () method. When is io.eof? When is there an error message?

type StreamService_ListClient interface {
	Recv() (*StreamResponse, error)
	grpc.ClientStream
}

func (x *streamServiceListClient) Recv() (*StreamResponse, error) {
	m := new(StreamResponse)
	iferr := x.ClientStream.RecvMsg(m); err ! = nil {return nil, err
	}
	return m, nil
}
Copy the code

RecvMsg will read the entire gRPC message body from the stream.

(1) RecvMsg is blocked waiting

(2) RecvMsg Returns IO.EOF when the stream succeeds/terminates (Close is called)

(3) RecvMsg When any error occurs in the stream, the stream will be terminated, and the error message will contain RPC error code. The following error may occur in RecvMsg:

  • io.EOF
  • io.ErrUnexpectedEOF
  • transport.ConnectionError
  • google.golang.org/grpc/codes

Note that the default value of MaxReceiveMessageSize is 1024 * 1024 * 4. You are advised not to exceed the value

validation

Run stream_server/server. Go:

$ go run server.go
Copy the code

Run stream_client/client. Go:

$ go run client.go 
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024
Copy the code

2, Client-side Streaming RPC: Streaming RPC

Client streaming RPC, one-way flow, the client sends multiple RPC requests to the server through streaming, and the server sends a response to the client, as shown roughly:

Server

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
	for {
		r, err := stream.Recv()
		if err == io.EOF {
			return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}})
		}
		iferr ! = nil {return err
		}

		log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
	}

	return nil
}
Copy the code

SendAndClose = stream.sendandClose = stream.sendandClose = stream.sendandClose

In this program, we process each Recv, and when we find IO.EOF (stream closed), we need to send the final response to the client, and close the Recv that is waiting on the other side

Client

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	stream, err := client.Record(context.Background())
	iferr ! = nil {return err
	}

	for n := 0; n < 6; n++ {
		err := stream.Send(r)
		iferr ! = nil {return err
		}
	}

	resp, err := stream.CloseAndRecv()
	iferr ! = nil {return err
	}

	log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)

	return nil
}
Copy the code

Stream. CloseAndRecv and stream.SendAndClose are the companion stream methods, and you are sure to know what they do

validation

Restart stream_server/server.go and run stream_client/client.go again:

Stream_client:
$ go run client.go
2018/09/24 16:23:03 resp: pj.name: gRPC Stream Server: Record, pt.value: 1
Copy the code
Stream_server:
$ go run server.go
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
Copy the code

3, Bidirectional Streaming RPC

Two-way streaming RPC, as the name implies, is two-way streaming. The client initiates the request in a streaming manner, and the server responds to the request in a streaming manner

The first request must be initiated by the Client, but the interaction mode (who comes first and who comes after, how much to send at a time, how much to respond to, and when to close) depends on how the program is written (can be combined with coroutines).

If the bidirectional stream is sent in sequence, it looks like this:

Again, it is important to emphasize that two-way flows vary greatly and vary from program to program. Bidirectional flow diagrams cannot be applied to different scenarios

Server

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
	n := 0
	for {
		err := stream.Send(&pb.StreamResponse{
			Pt: &pb.StreamPoint{
				Name:  "gPRC Stream Client: Route",
				Value: int32(n),
			},
		})
		iferr ! = nil {return err
		}

		r, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		iferr ! = nil {return err
		}

		n++

		log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
	}

	return nil
}
Copy the code

Client

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
	stream, err := client.Route(context.Background())
	iferr ! = nil {return err
	}

	for n := 0; n <= 6; n++ {
		err = stream.Send(r)
		iferr ! = nil {return err
		}

		resp, err := stream.Recv()
		if err == io.EOF {
			break
		}
		iferr ! = nil {return err
		}

		log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
	}

	stream.CloseSend()

	return nil
}
Copy the code

validation

Restart stream_server/server.go and run stream_client/client.go again:

stream_server
$ go run server.go
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
Copy the code
stream_client
$ go run client.go
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6
Copy the code

conclusion

In this paper, three types of flow interaction modes are introduced. You can choose the appropriate mode according to the actual business scenarios. You’ll get twice the result with half the effort.

?

If you have any questions or mistakes, welcome to raise questions or give correction opinions on issues. If you like or are helpful to you, welcome Star, which is a kind of encouragement and promotion for the author.

My official account

reference

Sample code for this series

  • go-grpc-example