Registry – ZooKeeper

Objective: To explain the principles of the registry implemented for ZooKeeper and interpret the source code for Duubo-Registry-ZooKeeper

This is the final article on registries. This article is about dubbo’s registry implemented using ZooKeeper. This approach to implementing the registry is also recommended by Dubbo. In order to better understand the application of ZooKeeper in Dubbo, I will briefly introduce ZooKeeper first.

Because Dubbo is a distributed RPC open source framework, individual services are deployed separately, causing inconsistencies between resources. Zookeeper has features to ensure distributed consistency. ZooKeeper is a highly available, high-performance, and consistent open source coordination service designed for distributed applications. As to why Dubbo recommended ZooKeeper as its registry implementation, there are many books and blogs that explain the features and benefits of ZooKeeper. This is not the focus of this chapter, but the data structure of ZooKeeper. How the Dubbo service is stored and managed by zooKeeper’s data structure affects the interpretation of the source code below. Zookeeper uses a tree structure to organize data nodes, similar to a standard file system. Take a look at the following image:

This is a diagram from the official documentation that shows how Dubbo is stored in ZooKeeper and the node hierarchy,

  1. The Root layer of Dubbo is the Root directory. Use
    group to set the Root node of ZooKeeper. The default value is “dubbo”.
  2. The Service layer is the full name of the Service interface.
  3. The Type layer is categories. There are four categories, namely providers (list of service providers), consumers (list of service consumers), routes (list of routing rules), and Configurations (list of configuration rules).
  4. URL layer: According to different types of directories, there are service provider URL, service consumer URL, routing rule URL, and configuration rule URL. Different types focus on different urls.

Zookeeper divides zNodes of each layer with each slash. For example, the root node dubbo of the first layer is “/dubbo”, and the Service layer of the second layer is /com.foo.Barservice. Startup, such as service providers to dubbo/com. Foo Barservice/will write your own URL directory. For process invocation instructions, see the official documentation:

Document address: dubbo.apache.org/zh-cn/docs/…

Dubbo: ZooKeeper: Dubbo: ZooKeeper: Dubbo: ZooKeeper: Dubbo: ZooKeeper: Dubbo: ZooKeeper

The same directory as the previous three implementations, just two classes, looks very comfortable, so let’s parse these two classes.

(a) ZookeeperRegistry

This class inherits FailbackRegistry class, which is aimed at the core functions of the registry registration, subscription, unregistration, unsubscription, query registration list to expand, based on ZooKeeper.

1. The attribute

// Log
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);

// The default ZooKeeper port
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;

// The default ZooKeeper root node
private final static String DEFAULT_ROOT = "dubbo";

// Root node of ZooKeeper
private final String root;

// A collection of service interfaces
private final Set<String> anyServices = new ConcurrentHashSet<String>();

// Listener collection
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();

// ZooKeeper client instance
private final ZookeeperClient zkClient;
Copy the code

In fact, you will find that although ZooKeeper is the most recommended, its implementation logic is relatively simple. Because zooKeeper service components are called, much of the logic does not need to be implemented in Dubbo. The above properties are also very simple, do not need to say more, more calls to the ZooKeeper client.

2. Construction method

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // Get the packet configuration carried by the URL as the root node of ZooKeeper
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if(! group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; }this.root = group;
    // Create the ZooKeeper client
    zkClient = zookeeperTransporter.connect(url);
    // Add a state listener to call the recovery method when the state is reconnected
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    / / recovery
                    recover();
                } catch(Exception e) { logger.error(e.getMessage(), e); }}}}); }Copy the code

Here are a few concerns:

  1. Parameters of ZookeeperTransporter is an interface, and in dubbo ZkclientZookeeperTransporter and CuratorZookeeperTransporter two implementation class, ZookeeperTransporter is also an extensible interface. Based on the Dubbo SPI Adaptive mechanism, ZookeeperTransporter selects which implementation class to use according to the parameters carried in the URL.
  2. Above I showed that Dubbo has a root layer in the ZooKeeper node hierarchy, which is set through the group attribute.
  3. Add a listener to the client that calls the FailbackRegistry recovery method when the state is reconnected

3.appendDefaultPort

static String appendDefaultPort(String address) {
    if(address ! =null && address.length() > 0) {
        int i = address.indexOf(':');
        // If the address itself does not have a port, the default port 2181 is used
        if (i < 0) {
            return address + ":" + DEFAULT_ZOOKEEPER_PORT;
        } else if (Integer.parseInt(address.substring(i + 1)) = =0) {
            return address.substring(0, i + 1) + DEFAULT_ZOOKEEPER_PORT; }}return address;
}
Copy the code

This method uses the default ZooKeeper port. That is, the default port is used only when the address does not have a port.

4.isAvailable && destroy

@Override
public boolean isAvailable(a) {
    return zkClient.isConnected();
}

@Override
public void destroy(a) {
    super.destroy();
    try {
        zkClient.close();
    } catch (Exception e) {
        logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

Here, the two methods are to check whether ZooKeeper is connected and to destroy the connection, which are very simple, and both call the methods encapsulated by the ZooKeeper client.

5.doRegister && doUnregister

@Override
protected void doRegister(URL url) {
    try {
        // Create the URL node, that is, the node of the URL layer
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}@Override
protected void doUnregister(URL url) {
    try {
        // Delete a node
        zkClient.delete(toUrlPath(url));
    } catch (Throwable e) {
        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

The two methods, registration and unregistration, are also very simple. The client create and delete methods are called, one to create a node, and the other to delete a node, both at the URL level.

6.doSubscribe

@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // Processes all subscriptions initiated by the Service layer, such as those from the monitoring center
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            // Get the root directory
            String root = toRootPath();
            // Get the listener set corresponding to the URL
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            // Create a listener collection if it does not exist
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            // Get the node listener
            ChildListener zkListener = listeners.get(listener);
            // If the node listener is empty, it is created
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    @Override
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        // Iterate through the existing node, join the node if it is not in the existing service collection, and then subscribe to the node
                        for (String child : currentChilds) {
                            / / decoding
                            child = URL.decode(child);
                            if(! anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); }}}});// Refetch, to ensure consistency
                zkListener = listeners.get(listener);
            }
            // Create the service node, which is a persistent node
            zkClient.create(root, false);
            // Subscribe to the ZooKeeper service node to get the full name array of the Service interface
            List<String> services = zkClient.addChildListener(root, zkListener);
            if(services ! =null && !services.isEmpty()) {
                // Iterate through the array of Service interface names
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // Initiate a subscription to the service layer
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener); }}}else {
            // Handles initiated subscriptions to the specified Service layer, such as subscriptions to Service consumers
            List<URL> urls = new ArrayList<URL>();
            // Iterate over the classification array
            for (String path : toCategoriesPath(url)) {
                // Get the listener collection
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                // If not, create it
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                // Get the node listener
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            // NotifyListener callback of service changes
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); }});// Retrieve the node listener to ensure consistency
                    zkListener = listeners.get(listener);
                }
                // Create the type node, which is a persistent node
                zkClient.create(path, false);
                // Initiate a subscription to the Type node of ZooKeeper
                List<String> children = zkClient.addChildListener(path, zkListener);
                if(children ! =null) {
                    // Add to the subnode data arrayurls.addAll(toUrlsWithEmpty(url, path, children)); }}// Notify data changesnotify(url, listener, urls); }}catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

This method is a subscription. The logical implementation can be divided into two sections. The implementation here treats all subscriptions initiated by the Service layer separately from subscriptions initiated by the specified Service layer. All Service layers resemble subscriptions initiated by the monitoring center. A subscription initiated by a specified Service layer can be considered a subscription to a Service consumer. The general logic for subscriptions is similar, but there are a few differences:

  1. The ChildListener in all subscriptions initiated by the Service layer is decoded when the Service layer changes, and the anyServices attribute is used to determine whether it is a new Service. Finally, the subscribe subscription of the parent class is called. When the URL layer changes, the specified Service layer invokes notify and calls back the logic of NotifyListener to notify the Service of the change.
  2. The node created by the client in all Service layer initiated subscriptions is the Service node, which is the persistent node, and the node created in the specified Service layer initiated subscriptions is the Type node, which is also the persistent node. The persistent node of ZooKeeper exists after the node is created until the node is deleted. The persistent node will not disappear because the client session that created the node is invalid. The life cycle of the temporary node is bound to the client session. That is, if the client session fails, the node is automatically cleared. Note that session invalidation is mentioned here, not disconnection. Also, you cannot create child nodes under temporary nodes.
  3. The subscription initiated by the specified Service layer invokes notify twice. The first is incremental notification, that is, only notifying the added Service node, and the second is full notification.

7.doUnsubscribe

@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
    // Get the listener collection
    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
    if(listeners ! =null) {
        // Get a listener for the child node
        ChildListener zkListener = listeners.get(listener);
        if(zkListener ! =null) {
            // For all service interfaces, such as the monitoring center
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                // Get the root directory
                String root = toRootPath();
                // Remove the listener
                zkClient.removeChildListener(root, zkListener);
            } else {
                // Remove listeners by traversing the classification array
                for (String path : toCategoriesPath(url)) {
                    zkClient.removeChildListener(path, zkListener);
                }
            }
        }
    }
}
Copy the code

This method is used to unsubscribe, which is divided into two cases, all Service initiated unsubscribe or the specified Service initiated unsubscribe. As you can see, the unsubscribe of all services removes all listeners in the root directory, while the unsubscribe of the specified Service removes all listeners in the Type node below the Service layer. If you don’t understand, go back to that node hierarchy diagram.

8.lookup

@Override
public List<URL> lookup(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("lookup url == null");
    }
    try {
        List<String> providers = new ArrayList<String>();
        // Iterate over the grouping categories
        for (String path : toCategoriesPath(url)) {
            // Get the child node
            List<String> children = zkClient.getChildren(path);
            if(children ! =null) { providers.addAll(children); }}// Get an array of urls that match providers and consumer
        return toUrlsWithoutEmpty(url, providers);
    } catch (Throwable e) {
        throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

The method is to query the registered services that meet the criteria. ToUrlsWithoutEmpty is called, which we’ll talk about later.

9.toServicePath

private String toServicePath(URL url) {
    String name = url.getServiceInterface();
    // If all services are included, the root node is returned
    if (Constants.ANY_VALUE.equals(name)) {
        return toRootPath();
    }
    return toRootDir() + URL.encode(name);
}
Copy the code

The method is to get the service path, the concatenation rule: Root + Type.

10.toCategoriesPath

private String[] toCategoriesPath(URL url) {
    String[] categories;
    // If the category carried by the URL is set to *, an array is created containing all categories
    if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
        categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
                Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
    } else {
        // Returns the category configuration carried by the URL
        categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
    }
    String[] paths = new String[categories.length];
    for (int i = 0; i < categories.length; i++) {
        // Add the service path
        paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];
    }
    return paths;
}

private String toCategoryPath(URL url) {
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}
Copy the code

The first method is to get the classification array, which is an array of all Type nodes under the service carried by the URL. The second is to get the classification path, the classification path stitching rule: Root + Service + Type

11.toUrlPath

private String toUrlPath(URL url) {
    return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}
Copy the code

This method is to get the URL path, the concatenation rule is Root + Service + Type + URL

12.toUrlsWithoutEmpty && toUrlsWithEmpty

private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
    List<URL> urls = new ArrayList<URL>();
    if(providers ! =null && !providers.isEmpty()) {
        // Iterate over the service provider
        for (String provider : providers) {
            / / decoding
            provider = URL.decode(provider);
            if (provider.contains(": / /")) {
                // Convert the service to a URL
                URL url = URL.valueOf(provider);
                // Check whether it matches. If so, add it to the set
                if(UrlUtils.isMatch(consumer, url)) { urls.add(url); }}}}return urls;
}

private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
    // Returns the service provider URL that matches the service consumer
    List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
    // If it does not exist, the URL that created 'empty://' returns
    if (urls == null || urls.isEmpty()) {
        int i = path.lastIndexOf('/');
        String category = i < 0 ? path : path.substring(i + 1);
        URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
        urls.add(empty);
    }
    return urls;
}
Copy the code

The first toUrlsWithoutEmpty method gets the array of urls that match the providers’ consumer urls. The second toUrlsWithEmpty method calls the first method and adds if there is no match, The URL that created empty:// is returned. In this way, situations like an empty service provider can be handled.

(2) ZookeeperRegistryFactory

This class inherits the AbstractRegistryFactory class and implements the AbstractRegistryFactory createRegistry method.

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return newZookeeperRegistry(url, zookeeperTransporter); }}Copy the code

As you can see, ZookeeperRegistry is instantiated, so I won’t explain it here.

Afterword.

The source code for this section is github.com/CrazyHZM/in…

Dubbo uses ZooKeeper to implement the registry. The key is to understand the node level meaning of Dubbo storage in ZooKeeper, that is, root layer, service layer, Type layer and URL layer respectively. Other logic is not complicated. Most of them invoke the zooKeeper client. Interested students can also learn more about ZooKeeper. If I didn’t write enough or made mistakes in any part, please give me your advice.