My little sister asked me how to use gRPC, and I just threw this article to her

Simple gRPC services, stream processing mode, validators, Token authentication and certificate authentication.

The reading volume on many platforms has reached a new high, and it was recommended by the home page of OSChina. The reading volume has reached more than 1 million, which is the peak of my single reading.

It seems that if you put your heart into writing, you can still reap the rewards.

This article we still from the actual situation, mainly introduces gRPC publish and subscribe mode, REST interface and timeout control.

I will upload all relevant codes to GitHub, and interested partners can check or download them.

Publish and subscribe

Publish and subscribe is a common design pattern, and there are many implementations of this pattern in the open source community. The Docker project provides a minimal implementation of Pubsub. The following is a local publish and subscribe code based on pubsub package implementation:

package main

import (
	"fmt"
	"strings"
	"time"

	"github.com/moby/moby/pkg/pubsub"
)

func main(a) {
	p := pubsub.NewPublisher(100*time.Millisecond, 10)

	golang := p.SubscribeTopic(func(v interface{}) bool {
		if key, ok := v.(string); ok {
			if strings.HasPrefix(key, "golang:") {
				return true}}return false
	})
	docker := p.SubscribeTopic(func(v interface{}) bool {
		if key, ok := v.(string); ok {
			if strings.HasPrefix(key, "docker:") {
				return true}}return false
	})

	go p.Publish("hi")
	go p.Publish("golang: https://golang.org")
	go p.Publish("docker: https://www.docker.com/")
	time.Sleep(1)

	go func(a) {
		fmt.Println("golang topic:", <-golang)
	}()
	go func(a) {
		fmt.Println("docker topic:", <-docker)
	}()

	<-make(chan bool)}Copy the code

This code first creates an object through pubsub.newPublisher, then implements subscriptions through P.subscribetopic, and publishes messages through P.publish.

The execution effect is as follows:

docker topic: docker: https://www.docker.com/
golang topic: golang: https://golang.org
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
	/Users/zhangyongxin/src/go-example/grpc-example/pubsub/server/pubsub.go:43 +0x1e7
exit status 2
Copy the code

Subscription messages can be printed normally.

<-make(chan bool) But the subscription message cannot be printed without this statement.

Here is not very understand, have big guy know, welcome to leave a message, seek guidance.

Next, the publish-subscribe model is implemented with gRPC and PubSub packages.

Four parts need to be implemented:

  1. Proto file;
  2. Server: used to receive subscription requests, also receive publish requests, and forward publish requests to subscribers;
  3. Subscription client: used to subscribe messages from the server, processing messages;
  4. Publishing client: Used to send messages to the server.

Proto file

First define the proto file:

syntax = "proto3";

package proto;
 
message String {
    string value = 1;
}
 
service PubsubService {
    rpc Publish (String) returns (String);
    rpc SubscribeTopic (String) returns (stream String);
    rpc Subscribe (String) returns (stream String);
}
Copy the code

Define three methods, a Publish and two Subscribe and SubscribeTopic.

The Subscribe method receives all messages, and SubscribeTopic receives messages based on a particular Topic.

The service side

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"server/proto"
	"strings"
	"time"

	"github.com/moby/moby/pkg/pubsub"
	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
)

type PubsubService struct {
	pub *pubsub.Publisher
}

func (p *PubsubService) Publish(ctx context.Context, arg *proto.String) (*proto.String, error) {
	p.pub.Publish(arg.GetValue())
	return &proto.String{}, nil
}

func (p *PubsubService) SubscribeTopic(arg *proto.String, stream proto.PubsubService_SubscribeTopicServer) error {
	ch := p.pub.SubscribeTopic(func(v interface{}) bool {
		if key, ok := v.(string); ok {
			if strings.HasPrefix(key, arg.GetValue()) {
				return true}}return false
	})

	for v := range ch {
		if err := stream.Send(&proto.String{Value: v.(string)}); nil! = err {return err
		}
	}
	return nil
}

func (p *PubsubService) Subscribe(arg *proto.String, stream proto.PubsubService_SubscribeServer) error {
	ch := p.pub.Subscribe()

	for v := range ch {
		if err := stream.Send(&proto.String{Value: v.(string)}); nil! = err {return err
		}
	}
	return nil
}

func NewPubsubService(a) *PubsubService {
	return &PubsubService{pub: pubsub.NewPublisher(100*time.Millisecond, 10)}}func main(a) {
	lis, err := net.Listen("tcp".": 50051")
	iferr ! =nil {
		log.Fatalf("failed to listen: %v", err)
	}

	// Simple call
	server := grpc.NewServer()
	// Register the reflection service required by Grpcurl
	reflection.Register(server)
	// Register business services
	proto.RegisterPubsubServiceServer(server, NewPubsubService())

	fmt.Println("grpc server start ...")
	iferr := server.Serve(lis); err ! =nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
Copy the code

In contrast to the previous publish-subscription program, *pubsub.Publisher is actually a member of gRPC’s PubsubService structure.

Then, according to the development process of gRPC, three methods corresponding to the structure are realized.

Finally, when registering the service, the NewPubsubService() service is injected to implement the local publish and subscribe function.

Subscription client

package main

import (
	"client/proto"
	"context"
	"fmt"
	"io"
	"log"

	"google.golang.org/grpc"
)

func main(a) {
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
	iferr ! =nil {
		log.Fatal(err)
	}
	defer conn.Close()

	client := proto.NewPubsubServiceClient(conn)
	stream, err := client.Subscribe(
		context.Background(), &proto.String{Value: "golang:"},)if nil! = err { log.Fatal(err) }go func(a) {
		for {
			reply, err := stream.Recv()
			if nil! = err {if io.EOF == err {
					break
				}
				log.Fatal(err)
			}
			fmt.Println("sub1: ", reply.GetValue())
		}
	}()

	streamTopic, err := client.SubscribeTopic(
		context.Background(), &proto.String{Value: "golang:"},)if nil! = err { log.Fatal(err) }go func(a) {
		for {
			reply, err := streamTopic.Recv()
			if nil! = err {if io.EOF == err {
					break
				}
				log.Fatal(err)
			}
			fmt.Println("subTopic: ", reply.GetValue())
		}
	}()

	<-make(chan bool)}Copy the code

Create a New NewPubsubServiceClient object, then implement the client.Subscribe and client.SubscribeTopic methods, respectively, and receive messages continuously through the Goroutine.

Publishing client

package main

import (
	"client/proto"
	"context"
	"log"

	"google.golang.org/grpc"
)

func main(a) {
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
	iferr ! =nil {
		log.Fatal(err)
	}
	defer conn.Close()
	client := proto.NewPubsubServiceClient(conn)

	_, err = client.Publish(
		context.Background(), &proto.String{Value: "golang: hello Go"},)iferr ! =nil {
		log.Fatal(err)
	}

	_, err = client.Publish(
		context.Background(), &proto.String{Value: "docker: hello Docker"},)if nil! = err { log.Fatal(err) } }Copy the code

Create a new NewPubsubServiceClient object and Publish the message using the client.publish method.

When the code is all written, we open three terminals to test it:

Start the server on terminal 1:

go run main.go
Copy the code

Start the subscription client on terminal 2:

go run sub_client.go
Copy the code

Execute release client on terminal 3:

go run pub_client.go
Copy the code

Thus, there is a corresponding output on terminal 2:

subTopic:  golang: hello Go
sub1:  golang: hello Go
sub1:  docker: hello Docker
Copy the code

You can also open several more subscription terminals, so that each of them will have the same content output.

Source address: GitHub

REST interface

GRPC is generally used for intra-cluster communication. If services need to be provided externally, most of them are provided through REST interfaces. Grpc-gateway, an open source project, provides the ability to transform GRPC services into REST services that provide direct access to the GRPC API.

But I think, in fact, it should be relatively rare. It is much easier to write an HTTP service if you provide a REST interface.

Proto file

The first step is to create a proto file:

syntax = "proto3"; package proto; import "google/api/annotations.proto"; message StringMessage { string value = 1; } service RestService { rpc Get(StringMessage) returns (StringMessage) { option (google.api.http) = { get: "/get/{value}" }; } rpc Post(StringMessage) returns (StringMessage) { option (google.api.http) = { post: "/post" body: "*" }; }}Copy the code

Define a REST service, RestService, that implements GET and POST methods, respectively.

Install plug-in:

go get -u github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway
Copy the code

Generate the corresponding code:

protoc -I/usr/local/include -I. \ -I$GOPATH/pkg/mod \ - I$GOPATH/pkg/mod/github.com/grpc-ecosystem/[email protected]/third_party/googleapis \ - GRPC - gateway_out =. --go_out=plugins=grpc:.\ --swagger_out=. \ helloworld.protoCopy the code

The –grpc-gateway_out parameter generates the corresponding GW file, and the –swagger_out parameter generates the corresponding API file.

The two files generated here are as follows:

helloworld.pb.gw.go
helloworld.swagger.json
Copy the code

REST services

package main

import (
	"context"
	"log"
	"net/http"

	"rest/proto"

	"github.com/grpc-ecosystem/grpc-gateway/runtime"
	"google.golang.org/grpc"
)

func main(a) {
	ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	mux := runtime.NewServeMux()

	err := proto.RegisterRestServiceHandlerFromEndpoint(
		ctx, mux, "localhost:50051",
		[]grpc.DialOption{grpc.WithInsecure()},
	)
	iferr ! =nil {
		log.Fatal(err)
	}

	http.ListenAndServe(": 8080", mux)
}
Copy the code

Here is mainly by implementing gw file RegisterRestServiceHandlerFromEndpoint method to connect gRPC service.

GRPC service

package main

import (
	"context"
	"net"

	"rest/proto"

	"google.golang.org/grpc"
)

type RestServiceImpl struct{}

func (r *RestServiceImpl) Get(ctx context.Context, message *proto.StringMessage) (*proto.StringMessage, error) {
	return &proto.StringMessage{Value: "Get hi:" + message.Value + "#"}, nil
}

func (r *RestServiceImpl) Post(ctx context.Context, message *proto.StringMessage) (*proto.StringMessage, error) {
	return &proto.StringMessage{Value: "Post hi:" + message.Value + "@"}, nil
}

func main(a) {
	grpcServer := grpc.NewServer()
	proto.RegisterRestServiceServer(grpcServer, new(RestServiceImpl))
	lis, _ := net.Listen("tcp".": 50051")
	grpcServer.Serve(lis)
}
Copy the code

The gRPC service is implemented the same way as before.

That’s all the code, now let’s test it:

Start three terminals:

Terminal 1 Starts the gRPC service:

go run grpc_service.go
Copy the code

Terminal 2 starts the REST service:

go run rest_service.go
Copy the code

Terminal 3 requests the REST service:

$ curl localhost:8080/get/gopher
{"value":"Get hi:gopher"}

$ curl localhost:8080/post -X POST --data '{"value":"grpc"}'
{"value":"Post hi:grpc"}
Copy the code

Source address: GitHub

Timeout control

The last section covers timeout control, which is very important.

Normal WEB service apis, or Nginx, set a timeout period after which the server may either return a timeout error or the client may terminate the connection if no data has been returned.

Without this timeout, it can be quite dangerous. All requests are blocked on the server and consume a lot of resources, such as memory. If resources run out, the entire service may even crash.

So how do you set the timeout in gRPC? Mainly through the context context. context parameter, specifically the context.WithDeadline function.

Proto file

Create the simplest proto file. I won’t go into that.

syntax = "proto3";

package proto;

// The greeting service definition.
service Greeter {
    // Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}
Copy the code

The client

package main

import (
	"client/proto"
	"context"
	"fmt"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

func main(a) {
	// Simple call
	conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
	defer conn.Close()

	ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(3*time.Second)))
	defer cancel()

	client := proto.NewGreeterClient(conn)
	// Simple call
	reply, err := client.SayHello(ctx, &proto.HelloRequest{Name: "zzz"})
	iferr ! =nil {
		statusErr, ok := status.FromError(err)
		if ok {
			if statusErr.Code() == codes.DeadlineExceeded {
				log.Fatalln("client.SayHello err: deadline")
			}
		}

		log.Fatalf("client.SayHello err: %v", err)
	}
	fmt.Println(reply.Message)
}
Copy the code

Set a 3s timeout with the following function:

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Duration(3*time.Second)))
defer cancel()
Copy the code

The timeout error is then detected in the response error.

The service side

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"runtime"
	"server/proto"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/reflection"
	"google.golang.org/grpc/status"
)

type greeter struct{}func (*greeter) SayHello(ctx context.Context, req *proto.HelloRequest) (*proto.HelloReply, error) {
	data := make(chan *proto.HelloReply, 1)
	go handle(ctx, req, data)
	select {
	case res := <-data:
		return res, nil
	case <-ctx.Done():
		return nil, status.Errorf(codes.Canceled, "Client cancelled, abandoning.")}}func handle(ctx context.Context, req *proto.HelloRequest, data chan<- *proto.HelloReply) {
	select {
	case <-ctx.Done():
		log.Println(ctx.Err())
		runtime.Goexit() Exit the Go coroutine after timeout
	case <-time.After(4 * time.Second): // Simulate time-consuming operations
		res := proto.HelloReply{
			Message: "hello " + req.Name,
		}
		// // Determine the timeout before modifying the database
		// if ctx.Err() == context.Canceled{
		/ /...
		// // If you have timed out, exit
		// }
		data <- &res
	}
}

func main(a) {
	lis, err := net.Listen("tcp".": 50051")
	iferr ! =nil {
		log.Fatalf("failed to listen: %v", err)
	}

	// Simple call
	server := grpc.NewServer()
	// Register the reflection service required by Grpcurl
	reflection.Register(server)
	// Register business services
	proto.RegisterGreeterServer(server, &greeter{})

	fmt.Println("grpc server start ...")
	iferr := server.Serve(lis); err ! =nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
Copy the code

After(4 * time.second) indicates that the corresponding code will be executed After 4s, which is used to simulate the timeout request.

If the client timeout exceeds 4s, a timeout error is generated.

Here’s a simulation:

Server:

$ go run main.go
grpc server start ...
2021/10/24 22:57:40 context deadline exceeded
Copy the code

Client:

$ go run main.go
2021/10/24 22:57:40 client.SayHello err: deadline
exit status 1
Copy the code

Source address: GitHub

conclusion

This paper mainly introduces three parts of gRPC combat content, respectively:

  1. Publish and subscribe model
  2. REST interface
  3. Timeout control

In my opinion, timeout control is the most important and needs more attention in the normal development process.

Combined with the previous article, the gRPC combat content is finished, all executable code, also uploaded to GitHub.

If you have any questions, please leave a message to me. If you feel good, please follow and forward.


Source code address:

  • Github.com/yongxinz/go…
  • Github.com/yongxinz/go…

Recommended reading:

  • GRPC, critical praise
  • Run the grpcurl command to access the gRPC service
  • I’ve heard that 99% of Go programmers have been cheated by Defer

Reference:

  • Chai2010. Cn/advanced – go…
  • Codeleading.com/article/946…
  • Juejin. Cn/post / 684490…
  • www.cnblogs.com/FireworksEa…