Write elegant gRPC microservices in Go-Kit

Incognito has been the talk of the town for years. Microservice is a software architecture that disassembles a large and aggregated business project into several small and independent business modules. Modules are services, and each service uses an efficient protocol (protobuf, JSON, etc.) to call each other, namely RPC. This way of splitting the code base has the following characteristics:

  • Each service should operate as a small, independent business module and be deployed independently
  • Each service should be automated testing and (distributed) deployment without affecting other services
  • Careful error checking and handling is done internally within each service

This chapter describes how to use GRPC to develop microservice projects, but also need to support HTTP RESTful way.

Introduction to the

Github: https://github.com/icowan/grpc-world

Design a simple data storage service, get data through GET, set data through PUT, similar to Redis key,value storage.

The service has two apis:

  • get: Obtains content by key
  • put: Sets the content based on the key

Define a Service that implements both functions:

type Service interface {
	Get(ctx context.Context, key string) (val string, err error)
	Put(ctx context.Context, key, val string) (err error)
}
Copy the code

The following is all implemented through some components of go-Kit.

Prior to the start

The proto command needs to be installed on your development machine before creating the GRPC service, mainly to generate pb files from the.proto file.

During MacOS installation

$ brew install autoconf automake libtool
Copy the code

Protoc-gen-go command is required in addition to proto because it is based on go:

$ go get -u google.golang.org/grpc
$ cd $GOPATH/bin/
$ ls
protoc-gen-go
Copy the code

Need to make sure thatprotoc-gen-goinGOPATH/bindirectory

Start stroking the Server code

Go-kit components to implement the server side, Go-Kit is very suitable for micro service components, it has very good code specifications can greatly reduce the probability of developers to make mistakes, it may feel very complicated at first touch, but after a long time you will find it is really convenient. Another advantage of using Go-Kit is that if we change to other frameworks in the future, under the good architecture of Go-Kit, we only need to peel off the Transport and Endpoint layer, leaving the Service to be easily integrated into the new framework

Go-kit mainly uses three layers:

  • Transport: Communication layer, can use a variety of different communication methods, such as HTTP RESTful interface or gRPC interface (this is a great advantage, easy to switch to any communication protocol)
    • http: HTTP transfer processing
    • grpc: GRPC transmission processing
  • Endpoint: The terminal layer, which implements the handler of various interfaces and converts REQ/RESP formats
  • Service: The Service layer, where business logic is implemented

Elegant directory structure

. ├ ─ ─ Dockerfile ├ ─ ─ a Makefile ├ ─ ─ the README. Md ├ ─ ─ client │ ├ ─ ─ GRPC │ │ └ ─ ─ client. Go │ └ ─ ─ HTTP │ └ ─ ─ client. Go ├ ─ ─ CMD │ ├ ─ ─ main. Go │ └ ─ ─ service │ └ ─ ─ service. Go ├ ─ ─ docker - compose. Yaml ├ ─ ─. Mod ├ ─ ─. Sum └ ─ ─ PKG ├ ─ ─ encode │ └ ─ ─ Response. Go ├ ─ ─ the endpoint │ └ ─ ─ the endpoint. Go ├ ─ ─ GRPC │ ├ ─ ─ handler. Go │ └ ─ ─ pb │ ├ ─ ─ service. The pb. Go │ └ ─ ─ service. The proto ├─ HTTP │ ├─ ├─ go ├─ go ├─ go ├─ go ├─ go ├─ go ├─ go ├─ go ├─ go ├─ go ├─ goCopy the code
  • /client/The demo I used for my presentation, I don’t have to use it
    • /client/http/: HTTP example
    • /client/grpc/: GRPC example
  • /cmd: stores the entry to be started from the command line
    • /cmd/service/: The real entry point is here, where the service is initialized
  • /pkg/All the tools and services are here
    • /pkg/encode/The encode tool
    • /pkg/endpoint/: Endpoint layer of go-Kit
    • /pkg/gprc or httpTransport layer, where input and output parameters are processed
    • /pkg/repository/: warehouse, where I only store data in memory and do not persist
    • /pkg/service/: Business logic is implemented here

Service

A Service is a concrete business implementation, where you only need to focus on the business logic rather than the framework itself. You can pass in whatever you need, which is convenient for later migration.

A Service defines an interface and provides methods that need to be implemented. If there are future upgrades or compatibizations, it can be implemented as a Service2, eliminating the need to modify the upper-level logic and providing forward compatibility.

import (
	"context"
	"github.com/go-kit/kit/log"
	"github.com/icowan/grpc-world/pkg/repository"
)
type Service interface {
	Put(ctx context.Context, key, val string) (err error)
}
type service struct {
	logger     log.Logger
	repository repository.Repository
}
func (s *service) Put(ctx context.Context, key, val string) (err error) {
	return s.repository.Put(key, val)
}
func New(logger log.Logger, repository repository.Repository) Service {
	return &service{logger: logger, repository: repository}
}

Copy the code

Endpoint

The primary function of an endpoint is to type the Request content sent by Transport and transfer the data to the Service and process the returned content or conversion. It is simply a bridge to the Transport root Service.

package endpoint
import (
	"context"
	"github.com/go-kit/kit/endpoint"
	"github.com/icowan/grpc-world/pkg/encode"
	"github.com/icowan/grpc-world/pkg/service"
)
type GetRequest struct {
	Key string `json:"key"`
	Val string `json:"val"`
}
type Endpoints struct {
	GetEndpoint endpoint.Endpoint
}
func NewEndpoint(s service.Service, mdw map[string][]endpoint.Middleware) Endpoints {
	eps := Endpoints{
		GetEndpoint: func(ctx context.Context, request interface{}) (response interface{}, err error) {
			req := request.(GetRequest)
			val, err := s.Get(ctx, req.Key)
			return encode.Response{
				Error: err,
				Data:  val,
			}, err
		},
	}
	for _, m := range mdw["Get"] {
		eps.GetEndpoint = m(eps.GetEndpoint)
	}
	return eps
}

Copy the code

Middleware

The functions of the middleware are mainly used for logging, traffic limiting, distributed tracking, permission verification, and so on. Each API can be customized according to the needs of the middleware.

Logging

Logging can also be typed as end. Middleware, and I’ll show you two modes.

import (
	"context"
	"github.com/go-kit/kit/log"
	"github.com/go-kit/kit/log/level"
	"time"
)
type loggingService struct {
	logger log.Logger
	next   Service
}
func NewLoggingService(logger log.Logger, s Service) Service {
	return &loggingService{level.Info(logger), s}
}
func (l *loggingService) Put(ctx context.Context, key, val string) (err error) {
	defer func(begin time.Time) {
		_ = l.logger.Log(
			"method"."Put"."key", key,
			"val", val,
			"took", time.Since(begin),
			"err", err,
		)
	}(time.Now())
	return l.next.Put(ctx, key, val)
}
Copy the code

Limiter

Examples of middleware for limiting traffic are shown below:

import (
	"context"
	"errors"
	"github.com/go-kit/kit/endpoint"
	"golang.org/x/time/rate"
)
func TokenBucketLimitter(bkt *rate.Limiter) endpoint.Middleware {
	return func(next endpoint.Endpoint) endpoint.Endpoint {
		return func(ctx context.Context, request interface{}) (response interface{}, err error) {
			if! bkt.Allow() {return nil, errors.New("Rate limit exceed!")}return next(ctx, request)
		}
	}
}
Copy the code

Repository

The warehouse only realized a simple get and PUT function, data did not land, the process stopped the data is gone.

package repository
import (
	"errors"
	"sync"
	"time"
)
type Store struct {
	Key       string
	Val       string
	CreatedAt time.Time
}
type StoreKey string
var ErrUnknown = errors.New("unknown store")
type Repository interface {
	Put(key, val string) error
	Get(key string) (res *Store, err error)
}
type store struct {
	mtx    sync.RWMutex
	stores map[StoreKey]*Store
}

func (s *store) Put(key, val string) error {
	s.mtx.Lock()
	defer s.mtx.Unlock()
	s.stores[StoreKey(key)] = &Store{
		Key:       key,
		Val:       val,
		CreatedAt: time.Now(),
	}
	return nil
}
// omit part of it
func New(a) Repository {
	return &store{
		stores: make(map[StoreKey]*Store),
	}
}
Copy the code

Transport

After the transport layer is stripped out, the transmission mode of the application can be written arbitrarily, which can be HTTP, gRPC or other methods. They are connected to the Endpoint, so the Endpoint and Service can directly obtain more transmission modes without any adjustment.

HTTP

Here is how HTTP Transport is implemented:

import (
	"context"
	"encoding/json"
	kithttp "github.com/go-kit/kit/transport/http"
	"github.com/gorilla/mux"
	"github.com/icowan/grpc-world/pkg/encode"
	ep "github.com/icowan/grpc-world/pkg/endpoint"
	"github.com/pkg/errors"
	"net/http"
)
func MakeHTTPHandler(eps ep.Endpoints, opts ... kithttp.ServerOption) http.Handler {
	r := mux.NewRouter()
	r.Handle("/get/{key}", kithttp.NewServer( eps.GetEndpoint, decodeGetRequest, encode.JsonResponse, opts... , )).Methods(http.MethodGet)return r
}
func decodeGetRequest(_ context.Context, r *http.Request) (request interface{}, err error) {
	vars := mux.Vars(r)
	key, ok := vars["key"]
	if! ok {return nil, errors.New("route bad")}return ep.GetRequest{Key: key}, nil
}
Copy the code

gRPC

GRPC Transport is a bit more cumbersome than HTTP, mainly because gRPC also needs to implement a grpcServer interface, otherwise almost the same as HTTP implementation.

Define proto file and generate PB file

Before implementing grpcServer, you need to define the interface:

syntax = "proto3";
package pb;
service Service {
    rpc Get (GetRequest) returns (ServiceResponse) {}
    rpc Put (GetRequest) returns (ServiceResponse) {}
}
message GetRequest {
    string key = 1;
    string val = 2;
}
message ServiceResponse {
    bool success = 1;
    int64 code = 2;
    string data = 3;
    string err = 4;
}
Copy the code

Generate pb file:

Go to the PKG/GRPC /pb/ directory and run the following command:

$ protoc service.proto --go_out==plugins=grpc:.
$ cd. && tree. ├ ─ ─ handler. Go └ ─ ─ pb ├ ─ ─ service. The pb. Go └ ─ ─ service. The proto 1 directory, 3 filesCopy the code

GRPC implementation code reference:

import (
	"context"
	"github.com/go-kit/kit/transport/grpc"
	"github.com/icowan/grpc-world/pkg/encode"
	ep "github.com/icowan/grpc-world/pkg/endpoint"
	"github.com/icowan/grpc-world/pkg/grpc/pb"
)
type grpcServer struct {
	get grpc.Handler
}
func (g *grpcServer) Get(ctx context.Context, req *pb.GetRequest) (*pb.ServiceResponse, error) {
	_, rep, err := g.get.ServeGRPC(ctx, req)
	iferr ! =nil {
		return nil, err
	}
	return rep.(*pb.ServiceResponse), nil
}
func MakeGRPCHandler(eps ep.Endpoints, opts ... grpc.ServerOption) pb.ServiceServer {
	return&grpcServer{ get: grpc.NewServer( eps.GetEndpoint, decodeGetRequest, encodeResponse, opts... ),}}func decodeGetRequest(_ context.Context, r interface{}) (interface{}, error) {
	return ep.GetRequest{
		Key: r.(*pb.GetRequest).Key,
		Val: r.(*pb.GetRequest).Val,
	}, nil
}
func encodeResponse(_ context.Context, r interface{}) (interface{}, error) {
	resp := r.(encode.Response)
	/ /... omit
	return &pb.ServiceResponse{
		Success: resp.Success,
		Code:    int64(resp.Code),
	}, err
}
Copy the code

Entrance to the Run ()

To enable the HTTP and gRPC services at the same time, two ports need to be enabled. By default, ports 8080 and 8081 are enabled. You can customize ports by sending parameters.

In Run, you need to initialize data warehouse Repository, initialize Service, initialize Endpoint, and initialize Transport. After initialization, you need to use two Transport modes.

Finally, there is a function to listen for an exit signal, which can be handled as needed.

CMD /service/server.go

const rateBucketNum = 20
var (
	logger log.Logger
	fs       = flag.NewFlagSet("world", flag.ExitOnError)
	httpAddr = fs.String("http-addr".": 8080"."HTTP listen address")
	grpcAddr = fs.String("grpc-addr".": 8081"."gRPC listen address"))func Run(a) {
	if err := fs.Parse(os.Args[1:]); err ! =nil {
		panic(err)
	}
	logger = log.NewLogfmtLogger(os.Stderr)
	store := repository.New() // Initialize the repository
	svc := service.New(logger, store) // 
	svc = service.NewLoggingService(logger, svc)
	ems := []endpoint.Middleware{
		service.TokenBucketLimitter(rate.NewLimiter(rate.Every(time.Second*1), rateBucketNum)), / / current limit
	}
	eps := ep.NewEndpoint(svc, map[string][]endpoint.Middleware{
		"Put": ems,
	})
	g := &group.Group{}
	initHttpHandler(eps, g)
	initGRPCHandler(eps, g)
	initCancelInterrupt(g)
	_ = level.Error(logger).Log("exit", g.Run())
}
func initCancelInterrupt(g *group.Group) {
	cancelInterrupt := make(chan struct{})
	g.Add(func(a) error {
		c := make(chan os.Signal, 1)
		signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
		select {
		case sig := <-c:
			return fmt.Errorf("received signal %s", sig)
		case <-cancelInterrupt:
			return nil}},func(error) {
		close(cancelInterrupt)
	})
}
func initHttpHandler(endpoints ep.Endpoints, g *group.Group) {
	opts := []kithttp.ServerOption{
		kithttp.ServerErrorHandler(transport.NewLogErrorHandler(level.Error(logger))),
		kithttp.ServerErrorEncoder(encode.JsonError),
	}
	httpHandler := http.MakeHTTPHandler(endpoints, opts...)
	httpListener, err := net.Listen("tcp", *httpAddr)
	g.Add(func(a) error {
		return netHttp.Serve(httpListener, httpHandler)
	}, func(error) {/ / a little... })

}
func initGRPCHandler(endpoints ep.Endpoints, g *group.Group) {
	grpcOpts := []kitgrpc.ServerOption{
		kitgrpc.ServerErrorHandler(transport.NewLogErrorHandler(logger)),
	}
	grpcListener, err := net.Listen("tcp", *grpcAddr)
	g.Add(func(a) error{ baseServer := googleGrpc.NewServer() pb.RegisterServiceServer(baseServer, grpc.MakeGRPCHandler(endpoints, grpcOpts...) )return baseServer.Serve(grpcListener)
	}, func(error) {/ / just...).
}
Copy the code

GRPC Client

HTTP HTTP HTTP HTTP HTTP HTTP HTTP HTTP HTTP HTTP HTTP HTTP HTTP HTTP HTTP

import (
	"context"
	"github.com/icowan/grpc-world/pkg/grpc/pb"
	"google.golang.org/grpc"
	"log"
	"time"
)
func main(a) {
	conn, err := grpc.Dial("127.0.0.1:8081", grpc.WithInsecure(), grpc.WithBlock())
	iferr ! =nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer func(a) {
		_ = conn.Close()
	}()
	svc := pb.NewServiceClient(conn)
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := svc.Put(ctx, &pb.GetRequest{
		Key: "hello",
		Val: "world",})iferr ! =nil {
		log.Fatalf("could not put: %v", err)
	}
	log.Println(r.GetSuccess())
}
Copy the code

test

$ make run
GO111MODULE=on /usr/local/go/bin/go run./ CMD /main. go-http-addr: 8080-grpc-addr :8081 level=error ts= 2020-03-28T10:45:05.923565zcaller=service.go:106 transport=HTTP addr=:8080
Copy the code

Execute client test command:

/client/ GRPC /client.go $go run./client/ HTTP /client.go level=info ts= 2020-03-28T10:45:44.793353zcaller=logging. Go :41 method=Put key= Hello val=world took=2.142µs err=nullCopy the code

The tail

The test code used in this chapter has been updated to Github, if you think it is valuable, you can clone the code, preferably to give a star.

Making: https://github.com/icowan/grpc-world source: lattecake.com/post/20140

Thank you for the

If WHAT I write is useful to you, thank you