takeaway

GRPC is a high-performance, universal open source RPC framework, which is developed by Google mainly for mobile applications and designed based on THE HTTP/2 Protocol standard. It is developed based on the Protocol Buffers (ProtoBuf) serialization Protocol, and supports many development languages.

GRPC can flexibly implement load balancing, call chain, health check, permission authentication and other modules through its plug-in mechanism. This paper mainly introduces how to implement load balancing function through the interface defined by gRPC.

Load balancing scheme

In addition to solving the problem of cross-language invocation and module decoupling, an important point of RPC service is that the nodes of RPC service can be horizontally extended through module microsertization, and the application layer can asynchronously invoke multiple services to reduce the delay of the application layer.

Because RPC services are stateless between them, machines can be horizontally added to extend their service capabilities. But how do you leverage multiple nodes?

Generally, you can add a loadBalance (LB) to the front of RPC and a service node to the back of LB. The application layer directly accesses the LB, and the RPC node is hidden behind the LB and invisible to the application layer. The LB address and port are provided to the application layer. The application layer does not care about the LB algorithm, which greatly reduces the complexity of application invocation. This approach is a good solution for short-connection applications such as the Web, because the service at the back end can be re-selected when the LB connection is re-established.

However, this is problematic for long-linked services. For example, if A long-link application A connects to the LB, the LB randomly forwards the request to the back-end RPC service B. Since A is A long-link application, all subsequent requests will be forwarded to THE RPC service B. In this case, load balancing cannot be implemented. You might think that it would be better to keep a long connection to the RPC service instead of connecting and releasing the connection every time you call it. GRPC does not provide this kind of load balancing component, but exposes the interface for load balancing. The extends NameResolverProvider class implements interface methods to easily implement load balancing modules.

For a basic introduction to gRPC load balancing, see juejin.cn/post/684490…

The following describes how to implement the load balancing NameResolver using ZooKeeper

ZkNameResolverProvider implementation

public class ZkNameResolverProvider extends NameResolverProvider {
    @Override
    protected boolean isAvailable() {
        return true;
    }

    @Override
    protected int priority() {
        return 5;
    }

    @Nullable
    @Override
    public NameResolver newNameResolver(URI targetUri, Attributes params) {
        return new ZkNameResolver(targetUri);
    }

    @Override
    public String getDefaultScheme() {
        return "zk"; }}Copy the code

ZkNameResolver implementation

public class ZkNameResolver extends NameResolver implements Watcher {
    private URI zkUri;
    private ZooKeeper zoo;
    private Listener listener;
    private final int ZK_CONN_TIMEOUT = 3000;
    private final String ZK_PATH = "/grpc_server_list";

    ZkNameResolver(URI zkUri) {
        this.zkUri = zkUri;
    }

    @Override
    public String getServiceAuthority() {
        return zkUri.getAuthority();
    }

    @Override
    public void start(Listener listener) {
        this.listener = listener;
        final CountDownLatch latch = new CountDownLatch(1);
        String zkAddr = zkUri.getHost() + ":" + zkUri.getPort();
        System.out.printf("connect to zookeeper server %s", zkAddr);
        try {
            this.zoo = new ZooKeeper(zkAddr, ZK_CONN_TIMEOUT, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); }}}); } catch (IOException e) { System.out.printf(e); System.out.printf("connect to zookeeper failed, JVM exited [%s]", e.getMessage());
            System.exit(1);
        }
        try {
            latch.await();
            System.out.printf("connect to zookeeper succeed");
        } catch (InterruptedException e) {
            System.out.printf(e);
            System.out.printf("CountDownLatch interrupted, JVM exited [%s]", e.getMessage());
            System.exit(1);
        }
        try {
            Stat stat = zoo.exists(ZK_PATH, true);
            if (stat == null) {
                System.out.printf("%s not exists", ZK_PATH);
            } else {
                System.out.printf("%s exists", ZK_PATH);
            }
        } catch (KeeperException | InterruptedException e) {
            System.out.printf(e);
        }

        try {
            List<String> children = zoo.getChildren(ZK_PATH, this);
            addServersToListener(children);
        } catch (KeeperException | InterruptedException e) {
            System.out.printf(e);
            System.out.printf("get children of %s failed [%s], JVM exited", ZK_PATH, e.getMessage()); System.exit(1); }} // Use the zookeeper ZK_PATH child node as the RPC node address, Private void addServersToListener(List<String> Servers) {system.out.printf ("rpc servers:%s", servers);
        ArrayList<EquivalentAddressGroup> addressGroups = new ArrayList<EquivalentAddressGroup>();
        for (String server : servers) {
            List<SocketAddress> socketAddresses = new ArrayList<SocketAddress>();
            String[] address = server.split(":");
            socketAddresses.add(new InetSocketAddress(address[0], Integer.parseInt(address[1])));
            addressGroups.add(new EquivalentAddressGroup(socketAddresses));
        }
        if (addressGroups.size() > 0) {
            listener.onAddresses(addressGroups, Attributes.EMPTY);
        } else {
            System.out.printf("No servers find, keep looking");
        }
    }

    @Override
    public void shutdown() {
        try {
            zoo.close();
        } catch (InterruptedException e) {
            System.out.printf(e);
        }
    }

    @Override
    public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.None) {
            System.out.printf("Zookeeper connection expired");
        } else {
            try {
                List<String> children = zoo.getChildren(ZK_PATH, false); addServersToListener(children); zoo.getChildren(ZK_PATH, this); } catch (Exception e) { System.out.printf(e); }}}}Copy the code
  • Set RPC service host:port as the path of ZooKeeper (/grpc_server_list/rpc_host:50010), And ZooKeeper listens for events such as path creation and deletion
  • When RPC nodes go online or offline, node information is dynamically added or deleted from ZooKeeper
  • throughlistener.onAddressesRegister the RPC service address with the gRPC load balancer

The channel to create

This. Channel = ManagedChannelBuilder"zk://zkhost:2181") // Configure the NameResolverProvider implementation class.nameresolverFactory (new ZkNameResolverProvider()).enableretry ().maxRetryAttempts(5) .keepAliveTime(5, TimeUnit.MINUTES) .keepAliveWithoutCalls(true) .keepAliveTimeout(10, TimeUnit.MINUTES) .idleTimeout(24, TimeUnit. / / polling HOURS) strategy. LoadBalancerFactory (RoundRobinLoadBalancerFactory. GetInstance ()). UsePlaintext (). The build ();Copy the code
  • forTarget("zk://zkhost:2181")The ZooKeeper link address was set
  • nameResolverFactory(new ZkNameResolverProvider())configurationNameResolverProviderImplementation class, let gRPC throughZkNameResolverProviderFind available service node addresses
  • The RPC service is invoked according to the configurationloadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())The policy polling invokes the corresponding back-end service

conclusion

GRPC provides a very flexible load balancing interface, through the implementation of the interface, can be very convenient to achieve load balancing. Through the customized load balancing mechanism, the caller can keep long links with each RPC, greatly increasing the network overhead of RPC, and polling each RPC service to expand the response capability of RPC. Zookeeper enables the watch mechanism to monitor the increase or deletion of specific path(/grpc_server_list) child nodes and dynamically register services offline, greatly improving the convenience of back-end service level expansion.