RPC Demo (2) Service discovery based on Zookeeper


Introduction to the

Based on the previous: RPC Demo(a) Netty RPC Demo implementation

The second part is to realize the use of Zookeeper as the service registry, and remove the display and pass parameters in RPC calls

Complete project address: RpcDemoJava

To improve that

In the client call, we need to display the address of the incoming back-end server, which is a little inconvenient, the code is roughly as follows:

UserService userService = jdk.create(UserService.class, "http://localhost:8080/");
Copy the code

Using Zookeeper as the registry, the client can obtain the server address related to the interface implementation from Zookeeper, without passing in the address explicitly. The improvement is roughly as follows:

UserService userService = jdk.create(UserService.class);
Copy the code

Coding idea

After the research and thinking, the realization of the ideas and steps are roughly as follows:

  • 1. The server registers the Provider with Zookeeper
  • 2. The client pulls all Provider information to the local PC and sets up a mapping between the Consumer interface and the Provider list
  • 3. The client can monitor the addition, deletion, modification, and query of the Provider on the server and synchronize the information to the client so that the changed Provider information can be deleted and updated
  • 4. The client retrieves the url from the Provider list, accesses the Provider, and returns the result

To start zK locally, use docker.

#Start ZK by pulling the ZK image. The following three commands are based on running this command
docker run -dit --name zk -p 2181:2181 zookeeper
#View ZK run logs
docker logs -f zk
#Restart the ZK
docker restart zk
#Start the ZK
docker start zk
#Stop the ZK
docker stop zk
Copy the code

Provider Information structure convention

We agreed a Provider information as follows:

@Data
public class ProviderInfo {

    /** * Provider ID: the ZK generates an ID after registration * When the Client obtains the Provider list, it sets this ID to the ID generated by the ZK */
    String id;

    /** * Provider specifies the back-end server address */
    String url;

    /** * tag: used for simple routes */
    List<String> tags;

    /** * Weight: used for weighted load balancing */
    Integer weight;

    public ProviderInfo(a) {}

    public ProviderInfo(String id, String url, List<String> tags, int weight) {
        this.id = id;
        this.url = url;
        this.tags = tags;
        this.weight = weight; }}Copy the code

1. The server registers the Provider with Zookeeper

First, we specify the Provider name, grouping, version, label, and weight for each interface implementation, which we implement using annotations

/** * RPC Provider service initialization comments ** Group,version, and targs have default values to be compatible with previous versions **@author lw1243925457
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ProviderService {

    /** * Corresponds to the API interface name *@return API service
     */
    String service(a);

    /** * group *@return group
     */
    String group(a) default "default";

    /**
     * version
     * @return version
     */
    String version(a) default "default";

    /** * tags: Used for simple routing * Multiple tags are separated by commas *@return tags
     */
    String tags(a) default "";

    /** * Weight: used for weighted load balancing *@return* /
    int weight(a) default 1;
}
Copy the code

Next, referring to the idea of setting the package scanning path in Mybatis, write a class that scans all the classes under the specified package path, determine whether it is a Provider after obtaining the class (with corresponding annotations), if so, extract the information and register it in ZK, the general code is as follows:

/** * Provides the initialization of the RPC Provider. * The initialization instance is placed in the Map to facilitate the subsequent acquisition of **@author lw1243925457
 */
@Slf4j
public class ProviderServiceManagement {

    /** * Service :group:version --> class */
    private static Map<String, Object> proxyMap = new HashMap<>();

    /** * Initialization: obtain all implementation classes by scanning the package path, register them in ZK * obtain the Provider annotation on the implementation class, obtain the service name, group, version * call ZK service registration, register the Provider in ZK *@paramPackageName Specifies the package path * of the interface implementation class@paramPort Specifies the port on which the service listens@throws Exception exception
     */
    public static void init(String packageName, int port) throws Exception {
        System.out.println("\n-------- Loader Rpc Provider class start ----------------------\n");

        DiscoveryServer serviceRegister = new DiscoveryServer();

        Class[] classes = getClasses(packageName);
        for (Class c: classes) {
            ProviderService annotation = (ProviderService) c.getAnnotation(ProviderService.class);
            if (annotation == null) {
                continue;
            }
            String group = annotation.group();
            String version = annotation.version();
            List<String> tags = Arrays.asList(annotation.tags().split(","));
            String provider = Joiner.on(":").join(annotation.service(), group, version);
            int weight = annotation.weight();

            proxyMap.put(provider, c.newInstance());

            serviceRegister.registerService(annotation.service(), group, version, port, tags, weight);

            log.info("load provider class: " + annotation.service() + ":" + group + ":" + version + "... "" + c.getName());
        }
        System.out.println("\n-------- Loader Rpc Provider class end ----------------------\n");
    }

    /**
     * Scans all classes accessible from the context class loader which belong to the given package and subpackages.
     *
     * @param packageName The base package
     * @return The classes
     * @throws ClassNotFoundException exception
     * @throws IOException exception
     */
    private static Class[] getClasses(String packageName) throws ClassNotFoundException, IOException {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        assertclassLoader ! =null;
        String path = packageName.replace('. '.'/');
        Enumeration<URL> resources = classLoader.getResources(path);
        List<File> dirs = new ArrayList<>();
        while (resources.hasMoreElements()) {
            URL resource = resources.nextElement();
            dirs.add(new File(resource.getFile()));
        }
        ArrayList<Class> classes = new ArrayList<>();
        for (File directory : dirs) {
            classes.addAll(findClasses(directory, packageName));
        }
        return classes.toArray(new Class[0]);
    }

    /**
     * Recursive method used to find all classes in a given directory and subdirs.
     *
     * @param directory   The base directory
     * @param packageName The package name for classes found inside the base directory
     * @return The classes
     * @throws ClassNotFoundException ClassNotFoundException
     */
    private static List<Class> findClasses(File directory, String packageName) throws ClassNotFoundException {
        List<Class> classes = new ArrayList<>();
        if(! directory.exists()) {return classes;
        }
        File[] files = directory.listFiles();
        assertfiles ! =null;
        for (File file : files) {
            if (file.isDirectory()) {
                assert! file.getName().contains(".");
                classes.addAll(findClasses(file, packageName + "." + file.getName()));
            } else if (file.getName().endsWith(".class")) {
                classes.add(Class.forName(packageName + '. ' + file.getName().substring(0, file.getName().length() - 6))); }}returnclasses; }}Copy the code

Next, write the relevant code of ZK service registration, this check information can be written, roughly as follows:

/** * ZK client, used to connect to ZK **@author lw1243925457
 */
@Slf4j
public class ZookeeperClient {

    static final String REGISTER_ROOT_PATH = "rpc";

    protected CuratorFramework client;

    ZookeeperClient() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000.3);
        this.client = CuratorFrameworkFactory.builder()
                .connectString("localhost:2181")
                .namespace(REGISTER_ROOT_PATH)
                .retryPolicy(retryPolicy)
                .build();
        this.client.start();

        log.info("zookeeper service register init"); }}/** * Service discovery server: Used to register Provider **@author lw1243925457
 */
public class DiscoveryServer extends ZookeeperClient {

    private List<ServiceDiscovery<ProviderInfo>> discoveryList = new ArrayList<>();

    public DiscoveryServer(a) {}/** * Generate information about the Provider and register it with ZK *@param service Service impl name
     * @param group group
     * @param version version
     * @param port service listen port
     * @param tags route tags
     * @param weight load balance weight
     * @throws Exception exception
     */
    public void registerService(String service, String group, String version, int port, List<String> tags,
                                int weight) throws Exception {
        ProviderInfo provider = new ProviderInfo(null.null, tags, weight);

        ServiceInstance<ProviderInfo> instance = ServiceInstance.<ProviderInfo>builder()
                .name(Joiner.on(":").join(service, group, version))
                .port(port)
                .address(InetAddress.getLocalHost().getHostAddress())
                .payload(provider)
                .build();

        JsonInstanceSerializer<ProviderInfo> serializer = new JsonInstanceSerializer<>(ProviderInfo.class);
        ServiceDiscovery<ProviderInfo> discovery = ServiceDiscoveryBuilder.builder(ProviderInfo.class)
                .client(client)
                .basePath(REGISTER_ROOT_PATH)
                .thisInstance(instance)
                .serializer(serializer)
                .build();
        discovery.start();

        discoveryList.add(discovery);
    }

    public void close(a) throws IOException {
        for(ServiceDiscovery<ProviderInfo> discovery: discoveryList) { discovery.close(); } client.close(); }}Copy the code

At this point, the core code of the server side is basically finished, add corresponding annotations to the interface implementation class, and start the server:

/ * * *@author lw
 */
@ProviderService(service = "com.rpc.demo.service.UserService", group = "group2", version = "v2", tags = "tag2")
public class UserServiceV2Impl implements UserService {

    @Override
    public User findById(Integer id) {
        return new User(id, "RPC group2 v2"); }}public class ServerApplication {

    public static void main(String[] args) throws Exception {
        BackListFilter.addBackAddress("172.21.16.1");

        final int port = 8080;
        ProviderServiceManagement.init("com.rpc.server.demo.service.impl", port);

        final RpcNettyServer rpcNettyServer = new RpcNettyServer(port);

        try {
            rpcNettyServer.run();
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ rpcNettyServer.destroy(); }}}Copy the code

2. Write corresponding codes for the client

  • 2. The client pulls all Provider information to the local PC and sets up a mapping between the Consumer interface and the Provider list
  • 3. The client can monitor the addition, deletion, modification, and query of the Provider on the server and synchronize the information to the client so that the changed Provider information can be deleted and updated
  • 4. The client retrieves the url from the Provider list, accesses the Provider, and returns the result

The above are the functions that the client needs to add, we directly write a service discovery client, in which to achieve the relevant functions, the approximate code is as follows:

/** * The service finds the client * gets the Provider list * listens for Provider updates * finds the Provider returned to the interface (tag route first, load balancing later) **@author lw1243925457
 */
@Slf4j
public class DiscoveryClient extends ZookeeperClient {

    private enum EnumSingleton {
        /** * enumerates singletons */
        INSTANCE;
        private DiscoveryClient instance;

        EnumSingleton(){
            instance = new DiscoveryClient();
        }
        public DiscoveryClient getSingleton(a){
            returninstance; }}public static DiscoveryClient getInstance(a){
        return EnumSingleton.INSTANCE.getSingleton();
    }

    /** * Provider cache list * server:group:version -> Provider instance list */
    private Map<String, List<ProviderInfo>> providersCache = new HashMap<>();

    private final ServiceDiscovery<ProviderInfo> serviceDiscovery;

    private final CuratorCache resourcesCache;

    private LoadBalance balance = new WeightBalance();

    private DiscoveryClient(a) {
        serviceDiscovery = ServiceDiscoveryBuilder.builder(ProviderInfo.class)
                .client(client)
                .basePath("/" + REGISTER_ROOT_PATH)
                .build();

        try {
            serviceDiscovery.start();
        } catch (Exception e) {
            e.printStackTrace();
        }

        try {
            getAllProviders();
        } catch (Exception e) {
            e.printStackTrace();
        }

        this.resourcesCache = CuratorCache.build(this.client, "/");
        watchResources();

        if (RpcClient.getBalanceAlgorithmName().equals(WeightBalance.NAME)) {
            this.balance = new WeightBalance();
        }
        else if (RpcClient.getBalanceAlgorithmName().equals(ConsistentHashBalance.NAME)) {
            this.balance = newConsistentHashBalance(); }}/** * Get the list of all providers from ZK and save *@throws Exception exception
     */
    private void getAllProviders(a) throws Exception {
        System.out.println("\n\n======================= init : get all provider");

        Collection<String>  serviceNames = serviceDiscovery.queryForNames();
        System.out.println(serviceNames.size() + " type(s)");
        for ( String serviceName : serviceNames ) {
            Collection<ServiceInstance<ProviderInfo>> instances = serviceDiscovery.queryForInstances(serviceName);
            System.out.println(serviceName);

            for ( ServiceInstance<ProviderInfo> instance : instances ) {
                System.out.println(instance.toString());

                String url = "http://" + instance.getAddress() + ":" + instance.getPort();
                ProviderInfo providerInfo = instance.getPayload();
                providerInfo.setId(instance.getId());
                providerInfo.setUrl(url);

                List<ProviderInfo> providerList = providersCache.getOrDefault(instance.getName(), new ArrayList<>());
                providerList.add(providerInfo);
                providersCache.put(instance.getName(), providerList);

                System.out.println("add provider: " + instance.toString());
            }
        }

        System.out.println();
        for(String key: providersCache.keySet()) {
            System.out.println(key + ":" + providersCache.get(key));
        }

        System.out.println("======================= init : get all provider end\n\n");
    }

    /** * According to the interface name, group, and version passed in, return a Provider server address * after the tag route and load balancing@param service service name
     * @param group group
     * @param version version
     * @param tags tags
     * @param methodName method name
     * @return provider host ip
     */
    public String getProviders(String service, String group, String version, List<String> tags, String methodName) {
        String provider = Joiner.on(":").join(service, group, version);
        if(! providersCache.containsKey(provider) || providersCache.get(provider).isEmpty()) {return null;
        }

        List<ProviderInfo> providers = FilterLine.filter(providersCache.get(provider), tags);
        if (providers.isEmpty()) {
            return null;
        }

        return balance.select(providers, service, methodName);
    }

    /** * listen for updates to the Provider */
    private void watchResources(a) {
        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forCreates(this::addHandler)
                .forChanges(this::changeHandler)
                .forDeletes(this::deleteHandler)
                .forInitialized(() -> log.info("Resources Cache initialized"))
                .build();
        resourcesCache.listenable().addListener(listener);
        resourcesCache.start();
    }

    /** * Add Provider *@param node new provider
     */
    private void addHandler(ChildData node) {
        System.out.println("\n\n=================== add new provider ============================");

        System.out.printf("Node created: [%s:%s]%n", node.getPath(), new String(node.getData()));
        if (providerDataEmpty(node)) {
            return;
        }

        updateProvider(node);
        
        System.out.println("=================== add new provider end ============================\n\n");
    }

    /** * Provider updates *@param oldNode old provider
     * @param newNode updated provider
     */
    private void changeHandler(ChildData oldNode, ChildData newNode) {
        System.out.printf("Node changed, Old: [%s: %s] New: [%s: %s]%n", oldNode.getPath(),
                new String(oldNode.getData()), newNode.getPath(), new String(newNode.getData()));

        if (providerDataEmpty(newNode)) {
            return;
        } 
        
        updateProvider(newNode);
    }

    /** * Add or update the local Provider *@param newNode updated provider
     */
    private void updateProvider(ChildData newNode) {
        String jsonValue = new String(newNode.getData(), StandardCharsets.UTF_8);
        JSONObject instance = (JSONObject) JSONObject.parse(jsonValue);
        System.out.println(instance.toString());

        String url = "http://" + instance.get("address") + ":" + instance.get("port");
        ProviderInfo providerInfo = JSON.parseObject(instance.get("payload").toString(), ProviderInfo.class);
        providerInfo.setId(instance.get("id").toString());
        providerInfo.setUrl(url);

        List<ProviderInfo> providerList = providersCache.getOrDefault(instance.get("name").toString(), new ArrayList<>());
        providerList.add(providerInfo);
        providersCache.put(instance.get("name").toString(), providerList);
    }

    /** * Delete Provider *@param oldNode provider
     */
    private void deleteHandler(ChildData oldNode) {
        System.out.println("\n\n=================== delete provider ============================");

        System.out.printf("Node deleted, Old value: [%s: %s]%n", oldNode.getPath(), new String(oldNode.getData()));
        if (providerDataEmpty(oldNode)) {
            return;
        }

        String jsonValue = new String(oldNode.getData(), StandardCharsets.UTF_8);
        JSONObject instance = (JSONObject) JSONObject.parse(jsonValue);
        System.out.println(instance.toString());

        String provider = instance.get("name").toString();
        int deleteIndex = -1;
        for (int i = 0; i < providersCache.get(provider).size(); i++) {
            if (providersCache.get(provider).get(i).getId().equals(instance.get("id").toString())) {
                deleteIndex = i;
                break; }}if(deleteIndex ! = -1) {
            providersCache.get(provider).remove(deleteIndex);
        }

        System.out.println("=================== delete provider end ============================\n\n");
    }

    private boolean providerDataEmpty(ChildData node) {
        return node.getData().length == 0;
    }

    public synchronized void close(a) { client.close(); }}Copy the code

It looks a bit much, but it’s not too complicated. I can write it myself

The following changes are made to the proxy request: RpcInvocationHandler, which removes the explicit URL pass parameter and changes the URL from DiscoveryClient, as follows:

public class RpcInvocationHandler implements InvocationHandler.MethodInterceptor {

    /** ** return ** after the request is sent to the server@param service service name
     * @param method service method
     * @param params method params
     * @return object
     */
    private Object process(Class
        service, Method method, Object[] params) {
        log.info("Client proxy instance method invoke");

        // custom Rpc request structure RpcRequest, put interface name, method name, parameters
        log.info("Build Rpc request");
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setServiceClass(service.getName());
        rpcRequest.setMethod(method.getName());
        rpcRequest.setArgv(params);
        rpcRequest.setGroup(group);
        rpcRequest.setVersion(version);

        // Obtain the request address of a Provider from DiscoveryClient
        String url = null;
        try {
            url = discoveryClient.getProviders(service.getName(), group, version, tags, method.getName());
        } catch (Exception e) {
            e.printStackTrace();
        }

        if (url == null) {
            System.out.println("\nCan't find provider\n");
            return null;
        }

        // The client uses netty to send the request to the server and get the result (custom structure: rpcfxResponse)
        log.info("Client send request to Server");
        RpcResponse rpcResponse;
        try {
            rpcResponse = RpcNettyClientSync.getInstance().getResponse(rpcRequest, url);
        } catch (InterruptedException | URISyntaxException e) {
            e.printStackTrace();
            return null;
        }

        log.info("Client receive response Object");
        assertrpcResponse ! =null;
        if(! rpcResponse.getStatus()) { log.info("Client receive exception");
            rpcResponse.getException().printStackTrace();
            return null;
        }

        // serialize to an object
        log.info("Response:: " + rpcResponse.getResult());
        returnJSON.parse(rpcResponse.getResult().toString()); }}Copy the code

The client code also removes the URL and is more concise, roughly as follows:

public class ClientApplication {

    public static void main(String[] args) {
        // fastjson auto setting
        ParserConfig.getGlobalInstance().addAccept("com.rpc.demo.model.Order");
        ParserConfig.getGlobalInstance().addAccept("com.rpc.demo.model.User");

        RpcClient client = new RpcClient();
        RpcClient.setBalanceAlgorithmName(ConsistentHashBalance.NAME);

        UserService userService = client.create(UserService.class, "group2"."v2");
        User user = userService.findById(1);
        if (user == null) {
            log.info("Clint service invoke Error");
        } else {
            System.out.println("\n\nuser1 :: find user id=1 from server: "+ user.getName()); }}}Copy the code