Different from Nginx, Lvs, or F5, gRPC uses client-side load balancing. For a system that uses server load balancing, clients will first access the domain name /IP address of the load balancing, and then the load balancing will distribute the request to a specific back-end service node according to the policy. For client load balancing, the client selects a node from the list of available back-end service nodes to directly connect to the back-end server based on its own load balancing policy.

Naming resolver in the Etcd software package and gRPC’s own RoundRobin polling and scheduling load balancer, users can easily build a set of service registration/discovery and load balancing systems. If the polling schedule does not meet the scheduling requirements or you do not want to use Etcd as the service registry and name Resolver, you can write code to implement the Resolver and Balancer interfaces defined by gRPC to meet the system’s custom requirements.

The source code quoted in this article is gRPC V1.2. x, Etcd v3.3

If you are not familiar with gRPC and Etcd, you can check out my getting Started with gRPC and GETTING Started with Etcd series a long time ago.

GRPC service registration found

Let’s start with a brief explanation of how to use Etcd for service registration and discovery. The process of service registration and discovery can be briefly described as follows:

Above service contains two nodes, A service on A node starts, will be to contain the IP service and the node’s unique identifier as A Key (such as/service/A / 114.128.45.117), IP and port information service node as the value stored in the Etcd. These keys are all keys with lease, which requires our service to renew the lease regularly. Once the service node itself breaks down, for example, the service on Node2 breaks down and the lease cannot be renewed, its corresponding Key is as follows: / service/a / 114.128.45.117 will expire, the client is unable to get the service from the Etcd node information.

At the same time, the client will also use the Watch function of Etcd to monitor all Key changes with /servive/ A as the prefix. If there is an event of adding or deleting node keys, the Etcd will be sent to the client through WatchChan. The programming language implementation of WatchChan is the Go Channel.

The service registry

Regarding Etcd service registration, there is no unified registration function for calling in the official package. So how do we store the information of the new service node on Etcd and notify the naming resolver? Naming /grpc.go of the Etcd source package provides an Update method, which can perform both add and delete operations:

func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ... etcd.OpOption) (err error) {
	switch nm.Op {
	case naming.Add:
		var v []byte
		ifv, err = json.Marshal(nm); err ! =nil {
			return status.Error(codes.InvalidArgument, err.Error())
		}
		_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
	case naming.Delete:
		_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
	default:
		return status.Error(codes.InvalidArgument, "naming: bad naming op")}return err
}
Copy the code

After a service is started, it can use the Update method to Put its service address and port into a key prefixed with target. In the example above, target should be the service name /service/a. In general, the specific practice is to encapsulate the Update method to complete the service registration according to the requirements of the system, as well as the regular renewal of the Key of the service node on Etcd. The practice of each company is different, so I will not put the specific code. Leases are generally renewed through Lease.KeepAlive methods in Etcd leases.

Service discovery

How does the client know when a new node is registered or when the original node has stopped? The function of Resolver can be understood as mapping from a string to a set of IP ports and other information.

The gRPC interface to Resolver is defined as follows:

type Resolver interface {
	// Resolve creates a Watcher for target.
	Resolve(target string) (Watcher, error)
}
Copy the code

The Resolve method of the named resolver returns a Watcher that listens for a change in the address of the server that the name resolver sends to the Balancer to dynamically add or remove the address.

The Watcher interface is defined as follows:

/ / source address https://github.com/grpc/grpc-go/blob/v1.2.x/naming/naming.go
type Watcher interface {
	Next() ([]*Update, error)
	// Close closes the Watcher.
	Close()
}
Copy the code

Etcd provides implementations for both interfaces:

/ / source address: https://github.com/etcd-io/etcd/blob/release-3.3/clientv3/naming/grpc.go

// GRPCResolver implements the naming.Resolver interface of GRPC
type GRPCResolver struct {
	// Client is an initialized etcd client.
	Client *etcd.Client
}

func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
	ctx, cancel := context.WithCancel(context.Background())
	w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
	return w, nil
}

// Implementing the Naming.Watcher interface of GRPC
type gRPCWatcher struct {
	c      *etcd.Client
	target string
	ctx    context.Context
	cancel context.CancelFunc
	wch    etcd.WatchChan
	err    error
}

func (gw *gRPCWatcher) Next(a) ([]*naming.Update, error) {
	if gw.wch == nil {
		// first Next() returns all addresses
		return gw.firstNext()
	}

	// process new events on target/*
	wr, ok := <-gw.wch
	if! ok { ... updates :=make([]*naming.Update, 0.len(wr.Events))
	for _, e := range wr.Events {
		var jupdate naming.Update
		var err error
		switch e.Type {
		case etcd.EventTypePut:
			err = json.Unmarshal(e.Kv.Value, &jupdate)
			jupdate.Op = naming.Add
		case etcd.EventTypeDelete:
			err = json.Unmarshal(e.PrevKv.Value, &jupdate)
			jupdate.Op = naming.Delete
		default:
			continue
		}
		if err == nil {
			updates = append(updates, &jupdate)
		}
	}
	return updates, nil
}
  
func (gw *gRPCWatcher) firstNext(a) ([]*naming.Update, error) {
  // Get the values of all keys prefixed with gw.target and place them in the existing array
	resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
	ifgw.err = err; err ! =nil {
		return nil, err
	}

	updates := make([]*naming.Update, 0.len(resp.Kvs))
	for _, kv := range resp.Kvs {
		var jupdate naming.Update
		iferr := json.Unmarshal(kv.Value, &jupdate); err ! =nil {
			continue
		}
		updates = append(updates, &jupdate)
	}

	opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
  // Watch listens for these Key changes, including the addition of new keys with the same prefix
	gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
	return updates, nil
}

func (gw *gRPCWatcher) Close(a) { gw.cancel() }
Copy the code

Each method of the GRPCResolver and gRPCWatcher types in this section is more closely integrated with RoundRobin gRPC Balancer, which I plan to put below with the load balancing source implementation.

Load balancing

First let’s take a look at the gRPC interface definition for load balancing:

type Balancer interface {

	Start(target string, config BalancerConfig) error

	Up(addr Address) (down func(error))

	Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)

	Notify() <-chan []Address
	// Close shuts down the balancer.
	Close() error
}
Copy the code

The WithBalancer method can be used to specify load balancing components in DiaplOption in the Dail method called when the gRPC client is connected to the server:

  client, err := etcd.Client()
	...
	resolver := &naming.GRPCResolver{Client: client}
	b := grpc.RoundRobin(resolver)
	opt0 := grpc.WithBalancer(b)

  grpc.Dial(target, opt0 , opt1, ...) //
Copy the code

RoundRobin implements the Balancer interface of gRPC. RoundRobin implements the baldrobin interface of gRPC. RoundRobin implements the baldrobin interface of gRPC and RoundRobin implements the baldrobin interface of gRPC. GRPCResolver: RoundRobin: RoundRobin: RoundRobin: RoundRobin: RoundRobin: RoundRobin: RoundRobin: RoundRobin: RoundRobin: RoundRobin: RoundRobin

RoundRobin

The gRPC package provides RoundRobin code implementation, mainly focusing on load balancing and the use of Resolver service discovery and node update these two functions of the code implementation principle

The RoundRobin structure is defined as follows:

/ / the source code at: https://github.com/grpc/grpc-go/blob/v1.2.x/balancer.go
type roundRobin struct {
	r      naming.Resolver
	w      naming.Watcher
	addrs  []*addrInfo // The client can try to connect to all the addresses
	mu     sync.Mutex
	addrCh chan []Address // This is used to inform the channel within the gRPC that the client can connect to the address
	next   int            // index of the next address to return for Get()
	waitCh chan struct{}  // the channel to block when there is no connected address available
	done   bool           // The Balancer is closed.
}
Copy the code
  • R is the named parser, and you can define your own named parser, such as the Etcd named parser. If r is nil, then the parameter target in Dial is added directly to addrs as the addressable address.
  • W is the watcher returned by the Resolve method of the name resolver. The watcher can listen for address changes sent by the name resolver and notify roundRobin to dynamically add and delete addresses in addrs.
  • Addrs gets an array of address information from the named parser, and each address has not only the address information, but also a flag indicating whether the gRPC has created a ready connection with that address.
  • AddrCh is a Channel of address array, which will notify all address updates to lbWatcher inside gRPC every time the address information changes sent by the naming parser. LbWatcher is a coroutine that manages the address connection state uniformly, and is responsible for the connection of new addresses and the closure of deleted addresses.
  • Next is roundRobin’s Index, which is where in the ADDRS array the polling schedule traverses.
  • WaitCh is called by GRPC when the address in adDRS is emptyGet()The failfast () method expects a connection to target, if gRPC failfast is set to falseGet()Method blocks on this Channel until there is a ready connection.

Start the RoundRobin

The BalancerWrapperBuilder, which initially assigned the load Balancer to grpc.WithBalancer, triggers the BalancerWrapperBuilder to create BalancerWrapper:

func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, Opts balancer.BuildOptions) balancer. balancer {// Start balancer.b.start (opts.target.endpoint, BalancerConfig{ DialCreds: opts.DialCreds, Dialer: opts.Dialer, }) _, pickfirst := bwb.b.(*pickFirst) bw := &balancerWrapper{ ...... } cc.UpdateBalancerState(connectivity.Idle, bw) go bw. LbWatcher ()Copy the code

The main function of the Start method is to get the Watcher that listens for changes at the back end of the named parser through RoundRobin’s Resolve method. At the same time, a new addrChan will be created to push the address changes that The Watcher is listening for to the lbWatcher inside the gRPC.

func (rr *roundRobin) Start(target string, config BalancerConfig) error {
    rr.mu.Lock()
    defer rr.mu.Unlock()
    if rr.done {
        return ErrClientConnClosing
    }
    if rr.r == nil {
        // If there is no parser, add target directly to the ADDRS address array
        rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
        return nil
    }
    // The Resolve interface returns a watcher that listens for changes in the address of the parser
    w, err := rr.r.Resolve(target)
    iferr ! =nil {
        return err
    }
    rr.w = w
    // Create a channel. When Watcher is listening for an address change, notify GRPC internal lbWatcher to connect to the address
    rr.addrCh = make(chan []Address, 1)
    // go creates a new coroutine to monitor watcher for address changes.
    go func(a) {
        for {
            iferr := rr.watchAddrUpdates(); err ! =nil {
                return}}} ()return nil
}
Copy the code

After addrCh is created, a Goroutine is opened at the end of the Start method. This goroutine will loop through watchAddrUpdates to see if there are any updates from Watcher naming the parser.

Listen for server address updates

In the watchAddrUpdates method, the Next method of Resolver Watcher (Resolver Watcher) is used to listen for updates on the Etcd server node. The implementation of this Watcher is the gRPCWatcher type provided in the Etcd package described in the service Discovery section above. Its Next method does this by listening for changes in the Key of the service name on the Etcd. This information is then passed to the addrChan channel created in the Start method above.

func (rr *roundRobin) watchAddrUpdates(a) error {
    // Watcher's next method blocks until an address change message arrives, updates being the change message
    updates, err := rr.w.Next()
    iferr ! =nil {
        return err
    }
    // For adDRS address array operations, it is obvious that there are multiple goroutines operating at the same time
    rr.mu.Lock()
    defer rr.mu.Unlock()
    for _, update := range updates {
        addr := Address{
            Addr:     update.Addr,
            Metadata: update.Metadata,
        }
        switch update.Op {
        case naming.Add:
        // For new types of addresses, note that this is not repeated.
            var exist bool
            for _, v := range rr.addrs {
                if addr == v.addr {
                    exist = true
                    break}}if exist {
                continue
            }
            rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
        case naming.Delete:
        // Delete the address directly in addrs
            for i, v := range rr.addrs {
                if addr == v.addr {
                    copy(rr.addrs[i:], rr.addrs[i+1:])
                    rr.addrs = rr.addrs[:len(rr.addrs)- 1]
                    break}}default:
            grpclog.Errorln("Unknown update.Op ", update.Op)
        }
    }
    // Copy the entire adDRS address array, then drop it into the addrCh channel to inform GRPC internal lbWatcher,
    // lbWatcher will close the deleted address and connect to the new address.
    // After a connection is ready, a dedicated Goroutine calls the Up method to change the state of the address in adDRS.
    open := make([]Address, len(rr.addrs))
    for i, v := range rr.addrs {
        open[i] = v.addr
    }
    if rr.done {
        return ErrClientConnClosing
    }
    select {
    case <-rr.addrCh:
    default:
    }
    rr.addrCh <- open
    return nil
}
Copy the code

Establish a connection

The Up method is called by the gRPC internal load balker Watcher, which reads the global connection status queue and changes the connection status in RoundRobin’s connection list (there are separate goroutine attempts to connect to the target service, If the connection is in the connected state, the Up method is called to change the state of the address in the ADDRS address array to connected.

func (rr *roundRobin) Up(addr Address) func(error) {
    rr.mu.Lock()
    defer rr.mu.Unlock()
    var cnt int
    // Set addr in the address array to connected state so that the address can be used by the client.
    for _, a := range rr.addrs {
        if a.addr == addr {
            if a.connected {
                return nil
            }
            a.connected = true
        }
        if a.connected {
            cnt++
        }
    }
    // When there is an available address, which may have been 0 before, many clients may be blocked to obtain the connection address, so all clients are notified that there is an available connection.
    // Why is it only equal to 1? The client does not block when the number of available addresses is greater than 1.
    if cnt == 1&& rr.waitCh ! =nil {
        close(rr.waitCh)
        rr.waitCh = nil
    }
    // Returns a method to disable the address
    return func(err error) {
        rr.down(addr, err)
    }
}
Copy the code

Close the connection

To close the connection, use the Down method. This method is as simple as finding addr and setting it as unavailable.

func (rr *roundRobin) down(addr Address, err error) {
    rr.mu.Lock()
    defer rr.mu.Unlock()
    for _, a := range rr.addrs {
        if addr == a.addr {
            a.connected = false
            break}}}Copy the code

The client obtains the connection

When gRPC Method is invoked, Get() returns an error from RoundRobin’s connection pool ADdrs if the adDRS are empty or if none of the addrs addresses are available. But if failfast = false is set, the Get() method blocks on the waitCh channel until the Up method notifies it and polls the schedule for available addresses.

func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(a).err error) {
    var ch chan struct{}
    rr.mu.Lock()
    if rr.done {
        rr.mu.Unlock()
        err = ErrClientConnClosing
        return
    }
 
    if len(rr.addrs) > 0 {
        // The length of addrs may vary. If the next value is exceeded, set it to 0 and start scheduling from the beginning.
        if rr.next >= len(rr.addrs) {
            rr.next = 0
        }
        next := rr.next
        // Traverses the entire ADDRS array until an available address is selected
        for {
            a := rr.addrs[next]
            // Next incremented by 1, of course, to 0 after len(addrs)
            next = (next + 1) % len(rr.addrs)
            if a.connected {
                addr = a.addr
                rr.next = next
                rr.mu.Unlock()
                return
            }
            if next == rr.next {
                // Do not find it
                break}}}if! opts.BlockingWait {// If there is no address available, an error is reported
        if len(rr.addrs) == 0 {
            rr.mu.Unlock()
            err = status.Errorf(codes.Unavailable, "there is no address available")
            return
        }
        // Returns the next addr on rr.addrs for failfast RPCs.
        addr = rr.addrs[rr.next].addr
        rr.next++
        rr.mu.Unlock()
        return
    }
    // Wait on rr.waitCh for non-failfast RPCs.
    // In blocking mode, block on waitCh until notified by the Up method
    if rr.waitCh == nil {
        ch = make(chan struct{})
        rr.waitCh = ch
    } else {
        ch = rr.waitCh
    }
    rr.mu.Unlock()
    for {
        select {
        case <-ctx.Done():
            err = ctx.Err()
            return
        case <-ch:
            rr.mu.Lock()
            if rr.done {
                rr.mu.Unlock()
                err = ErrClientConnClosing
                return
            }
 
            if len(rr.addrs) > 0 {
                if rr.next >= len(rr.addrs) {
                    rr.next = 0
                }
                next := rr.next
                for {
                    a := rr.addrs[next]
                    next = (next + 1) % len(rr.addrs)
                    if a.connected {
                        addr = a.addr
                        rr.next = next
                        rr.mu.Unlock()
                        return
                    }
                    if next == rr.next {
                        // The address that was just Up was down, so wait again.
                        break}}}// The newly added addr got removed by Down() again.
            if rr.waitCh == nil {
                ch = make(chan struct{})
                rr.waitCh = ch
            } else {
                ch = rr.waitCh
            }
            rr.mu.Unlock()
        }
    }
}
Copy the code

conclusion

The whole gRPC based on Etcd service registration/discovery and load balancing process and key source code implementation is finished, in fact, the source code implementation details are far more complex than I listed here, the purpose of this article is also hoping to record a learning and practice of gRPC load balancing and service parsing some key path. Note also that gRPC v1.2.x code is used in this article. After version 1.3, the directory and package name have been reconfigured, which is somewhat different from the source code listed in this article and the use of Balancer. However, the principle is basically the same, but each version has been evolving on this basis.

If you like my article, please give me a thumbs up. I will share what I have learned and seen and first-hand experience through technical articles every week. Thank you for your support. Wechat search public account “NMS bi Bi” to get my article push in the first time.