Why do we need interceptors

Let’s get the question out of the way.

  1. Procedure panic head pain.
  2. Parameter verification code everywhere bloom, can not appreciate it.
  3. There is no timeout for one request. The server is overloaded with requests.
  4. False unification is flying around.
  5. Logging and business code are intertwined.
  6. . The above things do not want to do, 🤮.

Program was created to panic

func (g greeterImpl) OutOfIndex(ctx context.Context, request *pb.OutOfIndexRequest) (*pb.OutOfIndexResponse, error) {
	request.Ids = make([]int64.0)
	request.Ids[1] = 1
	return &pb.OutOfIndexResponse{Data: "ok"}, nil
}
Copy the code

The above code, once run the entire program Gg, how to deal with it? Docker-compose Doc alway Restart can make the docker-compose doc continue to run, but there are problems with that. There seems to be no problem. Isn’t it to let the program that has already died resume? There’s nothing wrong with it. What about other obligations that have been partially addressed but not yet completed? This is a really big question.

That’s ok, do a panic intercept in the code, don’t let the program throw errors to the top of the call. Let’s change the code to look like this.

func (g greeterImpl) OutOfIndex(ctx context.Context, request *pb.OutOfIndexRequest) (*pb.OutOfIndexResponse, error) {
	defer func(a) {
		// Interception error
		if err:=recover(a); err ! =nil {
			glog.Errorf("panic:%s\n".string(debug.Stack()))
		}
	}() // Basically add the processing of the defer function
	request.Ids = make([]int64.0)
	request.Ids[1] = 1
	return &pb.OutOfIndexResponse{Data: "ok"}, nil
}
Copy the code

If it is really written like this, it is really a big head… If there are 10 such examples, do you really want to repeat 10 such examples? It’s really heady. What? I’ve mastered CTRL command + C && CTRL Command + V. That’s all right. Maybe it should have been that way. But maybe things don’t need to be this complicated, maybe they aren’t….

How to gracefully intercept panic

Use the interceptors provided by GRPC. Server. There are two interceptors provided by GRPC, respectively corresponding to request-Response mode and stream mode (generally used for large file transfer type requests)

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
Copy the code

How to use it?

Use the UnaryInterceptor for an example

srv := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
	defer func(a) {
		if err := recover(a); err ! =nil {
			glog.Errorf("method:%s, time:%s, err:%v, fatal%s", info.FullMethod, time.Now().Format("20060102-15:04:05:06"), err,string(debug.Stack()))
		}
	}() // Intercepts the exception thrown by the downlink here
	// Execute the corresponding business method
	resp, err = handler(ctx, req)
	return resp, err
}))
Copy the code

Of course, you can add some other operations, such as request logging to ES, etc., such as parameter verification, etc. To avoid variable contamination, the variables of parameter verification are contained in a closure function

srv := grpc.NewServer(grpc.UnaryInterceptor(func(a) grpc.UnaryServerInterceptor {
	var (
		validate = validator.New()
		uni      = ut.New(zh.New())
		trans, _ = uni.GetTranslator("zh")
	)
	err := zh_translations.RegisterDefaultTranslations(validate, trans)
	iferr ! =nil {
		panic(err)
	}

	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		fmt.Printf("ctx %#v, req:%v, info:%#v",ctx,req,info)
		defer func(a) {
			if err := recover(a); err ! =nil {
				glog.Errorf("method:%s, time:%s, err:%v, fatal%s", info.FullMethod, time.Now().Format("20060102-15:04:05:06"), err,string(debug.Stack()))
			}
		}()
	
		// Check parameters
		iferr := validate.Struct(req); err ! =nil {
			if transErr, ok := err.(validator.ValidationErrors); ok {
				translations := transErr.Translate(trans)
				var buf bytes.Buffer
				for _, s2 := range translations {
					buf.WriteString(s2) // This function is written with an error return.......
				}
				err = status.New(codes.InvalidArgument, buf.String()).Err()
				return resp, err
			}
			err = status.New(codes.Unknown, fmt.Sprintf("error%s", err)).Err()
			return resp, err
		}

		resp, err = handler(ctx, req)
		return resp, err
	}
}()))
Copy the code

Expand the writing of middleware

If the task is simple enough, the above notation is sufficient, but if it is not, such as the following:

srv := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		fmt.Printf("ctx %#v, req:%v, info:%#v", ctx, req, info)
		defer func(a) {
			if err := recover(a); err ! =nil {
				glog.Errorf("method:%s, time:%s, err:%v, fatal%s", info.FullMethod, time.Now().Format("20060102-15:04:05:06"), err, string(debug.Stack()))
			}
		}()
		// If no timeout is set, set a timeout of 6 s
		if_, ok := ctx.Deadline(); ! ok {var cancel context.CancelFunc
			ctx, cancel = context.WithTimeout(ctx, time.Second*6)
			defer cancel()
		}
		data, err := json.Marshal(req)
		iferr ! =nil {
			err = status.New(codes.Internal, err.Error()).Err()
			return resp, err
		}
		jData := string(data)
		glog.Errorf("method:%s, request:%v", info.FullMethod, jData)
		// Check parameters
		iferr := validate.Struct(req); err ! =nil {
			if transErr, ok := err.(validator.ValidationErrors); ok {
				translations := transErr.Translate(trans)
				var buf bytes.Buffer
				for _, s2 := range translations {
					buf.WriteString(s2)
				}
				err = status.New(codes.InvalidArgument, buf.String()).Err()
				return resp, err
			}
			err = status.New(codes.Unknown, fmt.Sprintf("error%s", err)).Err()
			return resp, err
		}
		start := time.Now()
		resp, err = handler(ctx, req)
		glog.Infof("method:%s, request:%#v, resp:%#v, latency:%v, status:%v", info.FullMethod, req, resp, time.Now().Sub(start), status.Convert(err))
		return resp, err
	}))
Copy the code

Also do not have what problem actually, can finish the job like…. But it can be done much better.. For example,….

package main

import (
	"bufio"
	"bytes"
	"context"
	"encoding/json"
	"flag"
	"fmt"
	"net"
	"os"
	"time"

	"github.com/go-playground/locales/zh"
	ut "github.com/go-playground/universal-translator"
	"github.com/go-playground/validator/v10"
	zh_translations "github.com/go-playground/validator/v10/translations/zh"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"mio_grpc/pb"
)

type greeterImpl struct{}// Call the chain handler
type HandlerFunc func(*Context)// RepackagedContext;
type Context struct {
	req         interface{}            // Enter parameters
	resp        interface{}            // Output parameters
	info        *grpc.UnaryServerInfo  // Service information
	ctx         context.Context        // Context information for the service method
	handler     grpc.UnaryHandler      // Request processing for the corresponding service
	err         error                  / / error
	reqJsData   string                 // Enter the parameter as a string in js format
	respJsData  string                 // The input parameter is a string in js format
	handlerFunc []HandlerFunc          // There is a handler by default
	index       int                    // The level of the current callback
	data        map[string]interface{} // 设置的 data
}

Create a new context
// @param context. context The context requested by the GRPC method
// @param req requests the input parameter of the method
// @param resp request method output parameters
// @param info GRPC method name information, which contains the requested method name
// @param handler this argument is a function that calls the corresponding GRPC method
//
// @ret *Context returns the rewrapped Context
func newContext(ctx context.Context, req, resp interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) *Context {
	data, _ := json.Marshal(req) // Serialize the request parameters
	return &Context{
		req:         req,
		resp:        resp,
		info:        info,
		err:         nil,
		ctx:         ctx,
		reqJsData:   string(data),
		respJsData:  "",
		handlerFunc: make([]HandlerFunc, 0),
		handler:     handler,
		data:        make(map[string]interface{}, 16),}}// Check parameters
func validate(a) HandlerFunc {
	var (
		validate = validator.New() / / validator
		uni      = ut.New(zh.New())
		trans, _ = uni.GetTranslator("zh") // Noon translator
	)
	// Associate validator and translator
	err := zh_translations.RegisterDefaultTranslations(validate, trans)
	iferr ! =nil {
		panic(err)
	}
	return func(c *Context) {
		// Check parameters
		iferr := validate.Struct(c.req); err ! =nil {
			// Check whether the error is a validation field error
			// If the parameter verification fails, translate the error message into the corresponding description
			if transErr, ok := err.(validator.ValidationErrors); ok {
				translations := transErr.Translate(trans)
				var buf bytes.Buffer
				for _, s2 := range translations {
					buf.WriteString(s2)
				}
				// The GRPC returned an error
				err = status.New(codes.InvalidArgument, buf.String()).Err()
				// Abort the call early
				c.AbortWith(err)
				return
			}
			// If an error is encountered while validating GRPC input parameters, but the error is not translated as an error, return unknown error
			err = status.New(codes.Unknown, fmt.Sprintf("error%s", err)).Err()
			// Abort the call early
			c.AbortWith(err)
			return}}}// Get the request parameters
func (c *Context) GetReq(a) interface{} {
	return c.req
}

// Get the js format of the request parameters
func (c *Context) GetReqJsData(a) string {
	if c == nil {
		return ""
	}
	return c.reqJsData
}

/ / set the JsData
func (c *Context) SetReqJsData(str string) {
	if c == nil {
		return
	}
	if json.Valid([]byte(str)) {
		c.reqJsData = str
	}
}

// Sets the js string that returns the request parameters
func (c *Context) SetRespJsData(str string) {
	if c == nil {
		return
	}
	if json.Valid([]byte(str)) {
		c.respJsData = str
	}
}

// Get the name of the method that the current GRPC needs to request
func (c *Context) FullMethod(a) string {
	if c == nil || c.info == nil {
		return ""
	}
	return c.info.FullMethod
}

func (c *Context) SetData(key string, value interface{}) {
	if c == nil {
		return
	}
	c.data[key] = value
}

func (c *Context) GetData(key string) interface{} {
	if c == nil {
		return nil
	}
	return c.data[key]
}

// The next level in the hierarchy where the chain method is currently called
func (c *Context) Next(a) {
	if c == nil {
		return
	}
	c.index++
	for (c.index) < len(c.handlerFunc) {
		c.handlerFunc[c.index](c)
		c.index++
	}

}

// Terminate all calls early,
func (c *Context) AbortWith(err error) {
	const (
		abortLevel = 1 << 32
	)
	c.err = err
	c.index = abortLevel
}

// Simulate log output to es
func log2es(a) HandlerFunc {
	// Simulate the input log to es
	file, err := os.OpenFile("./my.txt", os.O_CREATE|os.O_RDWR|os.O_APPEND, 0766)
	iferr ! =nil {
		panic(err)
	}
	w := bufio.NewWriter(file)
	defer w.Flush()
	return func(c *Context) {
		start := time.Now()

		c.Next() // Request the next method
		_, _ = file.WriteString(
			fmt.Sprintf(
				"method:%s, status:%v, latency:%v, req:%s, resp:%s\n", c.FullMethod(), status.Convert(c.err).Code().String(), time.Now().Sub(start), c.GetReqJsData(), ""))
		//fmt.Println("log2es after ", time.Now(), writeString, err2, c.FullMethod())}}// Call the handler function, which is called by default, when all calls are complete
func procHandler(ctx *Context) {
	ctx.resp, ctx.err = ctx.handler(ctx.ctx, ctx.req)
}

// The package handles multiple handler funcs
func WrapperHandler(handFunc ... HandlerFunc) grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		// Generate a new Context by taking the argument from the GRPC interceptor's callback
		c := newContext(ctx, req, resp, info, handler)

		// Construct a default callback handler that intercepts Panic as the first business handler
		c.handlerFunc = append(c.handlerFunc, func(c *Context) {
			defer func(a) {
				if err := recover(a); err ! =nil {
					c.AbortWith(status.New(codes.Internal, fmt.Sprintf("errors:%v", err)).Err())
				}
			}()
			c.Next()
		})
		// Take the processing of user input as the intermediate processing
		c.handlerFunc = append(c.handlerFunc, handFunc...)
		// When the user is done, the real service method is called
		c.handlerFunc = append(c.handlerFunc, procHandler)

		// Start invoking the service
		for c.index = 0; c.index < len(c.handlerFunc); c.index++ {
			c.handlerFunc[c.index](c)
		}
		// Return to services RESP and ERR
		return c.resp, c.err
	}
}

func (g greeterImpl) OutOfIndex(ctx context.Context, request *pb.OutOfIndexRequest) (resp *pb.OutOfIndexResponse, err error) {
	fmt.Println("OutOfIndex", request)
	time.Sleep(time.Second * 4)
	//defer func() {
	// // interception error
	// if err := recover(); err ! = nil {
	// glog.Errorf("panic:%s\n", string(debug.Stack()))
	//	}
	/ /} ()
	resp = &pb.OutOfIndexResponse{Data: "ok"}
	request.Ids = make([]int64.0)
	request.Ids[1] = 1
	return
}

func (g greeterImpl) NilPointer(ctx context.Context, request *pb.NilPointerRequest) (*pb.NilPointerResponse, error) {
	request.Data.Data = "work man"
	return &pb.NilPointerResponse{Data: "ok"}, nil

}

func (g greeterImpl) Hello(ctx context.Context, request *pb.HelloRequest) (*pb.HelloResponse, error) {
	//panic("implement me")
	fmt.Println("Hello:", request)
	return &pb.HelloResponse{ErrCode: "err_code"}, nil
}

func main(a) {
	flag.Parse()
	srv := grpc.NewServer(grpc.UnaryInterceptor(WrapperHandler(log2es(), validate())))
	listen, err := net.Listen("tcp".": 8086")
	iferr ! =nil {
		panic(err)
	}
	pb.RegisterGreeterServer(srv, &greeterImpl{})
	iferr = srv.Serve(listen); err ! =nil {
		panic(err)
	}

}


Copy the code

It looks complicated. Well, it does, it looks complicated, mainly because it’s got more lines of code, but

	srv := grpc.NewServer(grpc.UnaryInterceptor(WrapperHandler(func(c *Context) {
		c.Next()
	}, log2es(), validate(), func(c *Context) {
		c.Next()
	}, func(c *Context) {
		fmt.Println("hello ")
		c.Next()
	})))
Copy the code

This is simpler, and you can handle more business logic

Pb. go tag protoc-go-inject-tag is used as an additional tool to replace pb.go tag, which is used to inject validate

message HelloRequest {
  //@inject_tag: validate:"required,gte=0"
  int64  id = 1;
  //@inject_tag:validate:"required"
  string user_name = 2;
  //@inject_tag:validate:"required"
  string user_address = 3;
  int64  book_time = 4;
  //@inject_tag:validate:"required"
  string random_str = 5;
}
Copy the code

The complete code is here, poke me straight up 🦀🦀