background

GRPC is often used in work and study. Usually, we use our own load balancing algorithm, but in some scenarios, we need to control the version of the service. For example, [App V2 can only link to User V3], in such cases, we can only choose a custom load balancing policy

The target

Implement the GRPC load balancer based on version. After understanding the process, you can realize more load balancing functions by yourself

  • The registry
    • Etcd Lease is a mechanism to detect client survival. The cluster grants leases with a time to live. If the ETCD cluster does not receive a keepAlive within the given TTL time, the lease expires. To bind a lease to a key-value store, at most one lease can be attached to each key
  • Service Registration (Registration Service)
    • Periodically register the local service (APP) address, version and other information to the server
  • Service Discovery (Client initiates service resolution request (APP))
    • Check what services are available under the registry (APP)
    • And establish long HTTP2 links to all services
    • Update links with changes via the Etcd Watch Monitor service (APP)
  • Load balancing (client initiates request (APP))
    • Load balancing Select appropriate services (APP HTTP2 long link)
    • A call

Service Registration (Registration Service)

Source register. Go

func NewRegister(opt ... RegisterOptions) (*Register, error) { s := &Register{ opts: newOptions(opt...) , } var ctx, cancel = context.WithTimeout(context.Background(), time.Duration(s.opts.RegisterTtl)*time.Second) defer cancel() data, err := json.Marshal(s.opts) if err ! = nil { return nil, err } etcdCli, err := clientv3.New(s.opts.EtcdConf) if err ! = nil {return nil, err} s.etcdcli = etcdCli // Apply for lease resp, err := etcdcli. Grant(CTX, S.pts.registerttl) if err! = nil {return s, err} s.name = fmt.Sprintf("%s/%s", s.pts.node.path, s.pts.node.id) // Register the Node _, err = etcdCli.Put(ctx, s.name, string(data), clientv3.WithLease(resp.ID)) if err ! KeepAlive(context.background (), resp.id) if err! = nil { return s, err } return s, nil }Copy the code

In the ETCD, we can see the following information: APP V1 service is in the key /hwholiday/ SRV/APP/app-beb3CB56-eb61-11eb-858d-2CF05dC7C711 of the node

{ "node": { "name": "app", "path": "/hwholiday/srv/app", "id": "app-beb3cb56-eb61-11eb-858d-2cf05dc7c711", "version": "V1 ", "address": "172.12.12.188:8089"}}Copy the code

The APP V2 service is in the key /hwholiday/ SRV/APP/app-Beb3CB56-eb61-11eb-858d-2CF05dC7C711 of the node

{ "node": { "name": "app", "path": "/hwholiday/srv/app", "id": "app-19980562-eb63-11eb-99c0-2cf05dc7c711", "version": "V2 ", "address": "172.12.12.188:8088"},}Copy the code

Service Discovery (Client initiates service resolution request (APP))

The source discovery.go implements the resolver.Builder interface in GRPC (Builder creates a parser for monitoring name resolution updates).

func NewDiscovery(opt ... ClientOptions) resolver.Builder { s := &Discovery{ opts: newOptions(opt...) , } etcdCli, err := clientv3.New(s.opts.EtcdConf) if err ! = nil {panic(err)} s.etcdcli = etcdCli return s} // Build Execute func (d *Discovery) Build(target) when 'grpc.dial ()' is called resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { d.cc = cc res, err := d.etcdCli.Get(context.Background(), d.opts.SrvName, clientv3.WithPrefix()) if err ! = nil { return nil, err } for _, v := range res.Kvs { if err = d.AddNode(v.Key, v.Value); err ! = nil { log.Println(err) continue } } go func(dd *Discovery) { dd.watcher() }(d) return d, // Attributes contains arbitrary data about the resolver intended for // The consumption by the load balancing Policy. // attribute contains any data about the parser used by the load balancing policy. //Attributes *attributes.Attributes func (d *Discovery) AddNode(key, val []byte) error { var data = new(register.Options) err := json.Unmarshal(val, data) if err ! = nil { return err } addr := resolver.Address{Addr: data.Node.Address} addr = SetNodeInfo(addr, data) d.Node.Store(string(key), addr) return d.cc.UpdateState(resolver.State{Addresses: d.GetAddress()}) }Copy the code

Load balancing (client initiates request (APP))

Source version_balancer. Go

  • GRPC provides a PickerBuilder and Picker interface for us to implement our own load balancing strategy
//PickerBuilder creates balancer.Picker. Type PickerBuilder interface {//Build returns a selector that gRPC will use to select a SubConn. Build(info PickerBuildInfo) balancer.Picker }Copy the code
//gRPC uses Picker to select a SubConn to send RPCS. // Each time the balancer's internal state changes, it generates a new selector from its snapshot. // The selector used by gRPC can be updated with clientConn.updatestate (). Type Picker interface {// select the appropriate sub-link to send the request Pick(info PickInfo) (PickResult, error)}Copy the code
  • From the above we know that where we can do the work is in the Build method or the Pick method (do Build first and then do Pick when you call gRPC)
    • Build(info PickerBuildInfo) balancer.Picker info contains links to services that have just been stored via the AddNode method.
    • The Pick(info PickInfo) (PickResult, error) info contains the name of the method that was called and the context. context we can get this from context. context to get the parameters that were put in when the request was made, So we have the flexibility to do different loads for each method and here we can do the load based on the GRPC-client-API layer
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } var scs = make(map[balancer.SubConn]*register.Options, len(info.ReadySCs)) for conn, addr := range info.ReadySCs { nodeInfo := GetNodeInfo(addr.Address) if nodeInfo ! = nil { scs[conn] = nodeInfo } } if len(scs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } return &rrPicker{ node: scs, } }Copy the code
func (p *rrPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { p.mu.Lock() defer p.mu.Unlock() version := info.Ctx.Value("version") var subConns []balancer.SubConn for conn, node := range p.node { if version ! = "" { if node.Node.Version == version.(string) { subConns = append(subConns, conn) } } } if len(subConns) == 0 { return balancer.PickResult{}, errors.New("no match found conn") } index := rand.Intn(len(subConns)) sc := subConns[index] return balancer.PickResult{SubConn: sc}, nil }Copy the code

Customers use the version load balancing policy we define

r := discovery.NewDiscovery( discovery.SetName("hwholiday.srv.app"), discovery.SetEtcdConf(clientv3.Config{ Endpoints: String [] {} "172.12.12.165:2379", DialTimeout: Conn, err := grpc.dial ("hwholiday. Srv. app", err := grpc.dial ("hwholiday. / / don't use this parameter GRPC. WithDefaultServiceConfig (FMT. Sprintf (` {" LoadBalancingPolicy ": "%s"}`, "version")), grpc.WithInsecure(), ) if err ! = nil { log.Fatalf("net.Connect err: %v", Err)} defer conn.close () := api.newapiclient (conn) CTX := context.withvalue (context.background (), "version", "v1") _, err = apiClient.ApiTest(ctx, &api.Request{Input: "v1v1v1v1v1"}) if err ! = nil { fmt.Println(err) }Copy the code

Running effect

The test source

  • Run APP service v1 and call grPC-client using V1
    • APP print
    • Successful startup === > 0.0.0.0:8089
    • input:”v1v1v1v1v1″
    • GRPC – client printing
    • === RUN TestClient
    • v1v1v1v1v1v1v1v1v1v1
  • Run APP service v1 and call grPC-client using v2
    • APP print
    • Successful startup === > 0.0.0.0:8089
    • GRPC – client printing
    • === RUN TestClient
    • rpc error: code = Unavailable desc = no match found conn

conclusion

Address source address: github.com/hwholiday/l… With learning, we can implement version-based load policies. This is just an idea of how to do that. Maybe my example is not suitable for this, but it provides an idea