background

When Dubbo uses ZooKeeper (ZK) as its registry, it subscribs to ZK’s Watch listening mechanism to update the latest list of service providers. The general process is as follows.Today we’ll focus on red subscriptions and the logic of processing them.

To subscribe to

During the consumer startup phase, layer by layer according to protocol to registryProtocol.refer (),

org.apache.dubbo.registry.integration.RegistryProtocol#doRefer()
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        / / comment 1
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));//
        return invoker;
    }
Copy the code

In comment 1, we call subscribe on the Directory object, following this method.

org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe()
public void doSubscribe(final URL url, final NotifyListener listener) {
    List<URL> urls = new ArrayList<>();
    for (String path : toCategoriesPath(url)) {
        ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
        if (listeners == null) {
            zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
            listeners = zkListeners.get(url);
        }
        ChildListener zkListener = listeners.get(listener);
        if (zkListener == null) {
        / / comment 1
            listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
            zkListener = listeners.get(listener);
        }
        zkClient.create(path, false);
        List<String> children = zkClient.addChildListener(path, zkListener);
        if(children ! =null) {
            urls.addAll(toUrlsWithEmpty(url, path, children));
        }
    }
    notify(url, listener, urls);/ / comment 2
}
Copy the code

ZkClient (CuratorFramework client); zkClient (CuratorFramework client); zkClient (CuratorFramework client); The consumer’s call list is not updated if the producer is not started. Let’s see what the callback method does.

The callback

The callback method is called from zk client EventThread#processEvent() all the way to RegistryDirectory#notify()

//org.... RegistryDirectory#notify()
/ /.. Omit the part
refreshOverrideAndInvoker(providerURLs);/ / comment 1
Copy the code

This is the entry to refresh the call list.

//org.xx.RegistryDirectory#refreshInvoker()
private void refreshInvoker(List<URL> invokerUrls) {
	/ /...
 Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);/ / comment 1

    List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));

    routerChain.setInvokers(newInvokers);
    this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;/ / comment 2
    this.urlInvokerMap = newUrlInvokerMap;

    destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);/ / comment 3
}
Copy the code

Note 1 with zk of nodes returned url (dubbo: / / IP: 20880 / com. Poizon. Study. The API. The service…). Generate the invoker service invocation object, 2 in the latest call list assigned to invokers we here write down first, then came back to see this variable, comment 3 call list will expire, such as a provider of machine logged off (offline interested can look at the source code, is not immediately offline oh, strategy), We need to remove this from the caller list.

I’m not going to talk much about how invoker is generated, The general process is to call protocol.refer() Layers of packing the last of InvokerDelegate (ListenerInvokerWrapper (CallbackRegistrationInvoker (Filter (AsyncToSyncInvoker (DubboInvoker ())))) ).

Invoker use

When a consumer makes a request, MockClusterInvoker->FailoverClusterInvoker#doInvoke()->FailoverClusterInvoker#list() Let’s go inside the method.

//org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    return doList(invocation);/ / comment 1
}
//org.apache.dubbo.registry.integration.RegistryDirectory#doList
public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        throw new RpcException("No provider available");/ / comment 2
    }
    if (multiGroup) {
        return this.invokers == null ? Collections.emptyList() : this.invokers;/ / comment 3}}Copy the code

Note 3 returns the invokers variable that generated the new Invoekrs assignment after the subscription process.

conclusion

The author follows the process of ZK as the registration center. You can also change to NACOS for a look.