Dubbo Service directory

What is a service catalog

In a cluster, the number of the service provider is not immutable, sometimes there will be extend or shrink capacity, increase or reduce the number of the machine, the changes of information need to be synchronized to the consumer group, the service directory contains all of the information service providers, and according to the number of service providers or configuration change and dynamic change, And encapsulate it as an Invoker list

AbstractDirectory

This class implements the Directory interface and implements the List method, which gets the Invoker list. The implementation logic is a common template method in the subclasses StaticDirectory and RegistryDirectory.

public abstract class AbstractDirectory<T> implements Directory<T> {

    // logger
    private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);

    private final URL url;
	// Whether to be destroyed
    private volatile boolean destroyed = false;
	// Consumer group URL
    private volatile URL consumerUrl;
	// Service routing
    private volatile List<Router> routers;
	
    public AbstractDirectory(URL url) {
        this(url, null);
    }

    public AbstractDirectory(URL url, List<Router> routers) {
        this(url, url, routers);
    }

    public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {
        if (url == null)
            throw new IllegalArgumentException("url == null");
        this.url = url;
        this.consumerUrl = consumerUrl;
        setRouters(routers);
    }

    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        // It is up to subclasses to get the invoker directory logic
        List<Invoker<T>> invokers = doList(invocation);
        // Service routing is required after the invoker list is obtained
        List<Router> localRouters = this.routers; // local reference
        if(localRouters ! =null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        // Perform service routinginvokers = router.route(invokers, getConsumerUrl(), invocation); }}catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: "+ t.getMessage(), t); }}}return invokers;
    }
    protected void setRouters(List<Router> routers) {
        // Service routing list
        routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
        / / access service routing parameters of the router (tag, script, condition, mock)
        String routerkey = url.getParameter(Constants.ROUTER_KEY);
        // Obtain the route through SPI mechanism
        if(routerkey ! =null && routerkey.length() > 0) {
            RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);
            routers.add(routerFactory.getRouter(url));
        }
        // The default added types MockInvokersSelector and TagRouter
        routers.add(new MockInvokersSelector());
        routers.add(new TagRouter());
        Collections.sort(routers);
        this.routers = routers;
    }
    // omit other methods.protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
}
Copy the code

StaticDirectory

As you can see from the name, it’s a static directory that doesn’t change

public class StaticDirectory<T> extends AbstractDirectory<T> {
    private finalList<Invoker<T>> invokers; .@Override
    protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
        returninvokers; }... }Copy the code

You can see that it returns directly to the Invoker list

RegistryDirectory

RegistryDirectory is a dynamic directory. NotifyListener interface is implemented and NotifyListener interface is implemented to dynamically refresh the invoker list. Let’s start with doList logic

@Override
public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        // 1. The service provider is closed
        // 2. Service provider is disabled
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
            "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                    + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist).");
    }
    List<Invoker<T>> invokers = null;
    // Get the invokerk list corresponding to the method name from the cache
    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; 
    if(localMethodInvokerMap ! =null && localMethodInvokerMap.size() > 0) {
        // Get the method name
        String methodName = RpcUtils.getMethodName(invocation);
        // Get the method parameter list
        Object[] args = RpcUtils.getArguments(invocation);
        if(args ! =null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) {
            invokers = localMethodInvokerMap.get(methodName + "." + args[0]); 
        }
        if (invokers == null) {
            // Get the Invoker list by method name
            invokers = localMethodInvokerMap.get(methodName);
        }
        if (invokers == null) {
            // Get the Invoker list with * (generalization calls)invokers = localMethodInvokerMap.get(Constants.ANY_VALUE); }}// Returns the invoker list
    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
Copy the code

The Invoker list is retrieved from the local cache so where are the values in the local cache updated from? This class implements notify to dynamically refresh the Invoker list

@Override
public synchronized void notify(List<URL> urls) {
    / / store will, routers, configurators under the list of content
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        // Get the protocol name
        String protocol = url.getProtocol();
        // Get the category parameter value, which defaults to providers
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        / / according to different category type url encapsulated into routerUrls configuratorUrls, invokerUrls
        if (Constants.ROUTERS_CATEGORY.equals(category)
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer "+ NetUtils.getLocalHost()); }}// Package configuratorUrls as List
      
       configurators
      
    if(configuratorUrls ! =null && !configuratorUrls.isEmpty()) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    // Package routerUrls as List
      
       routers
      
    if(routerUrls ! =null && !routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);
        if(routers ! =null) { // null - do nothingsetRouters(routers); }}/ / the cache
    List<Configurator> localConfigurators = this.configurators; // local reference
    // merge override parameters
    this.overrideDirectoryUrl = directoryUrl;
    if(localConfigurators ! =null && !localConfigurators.isEmpty()) {
        for (Configurator configurator : localConfigurators) {
            / / configuration overrideDirectoryUrl
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); }}// Refresh the invoker list
    refreshInvoker(invokerUrls);
}
Copy the code
private void refreshInvoker(List<URL> invokerUrls) {
    // If the protocol header is Empty, set Forbidden to true, clear the cache, and destroy all invokers
    if(invokerUrls ! =null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; 
        this.methodInvokerMap = null; 
        destroyAllInvokers(); 
    } else {
        this.forbidden = false; 
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; 
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls ! =null) {
            // If the invokerUrls is empty but the cache is not empty, add the cache list to the invokerUrls directly
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            // If the cache is empty, add invokerUrls to the cache
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        // Convert the URL to Invoker
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
        // Convert newUrlInvokerMap to 
      
       >> mapping
      ,list
        // doList gets a List of methods based on their names from newMethodInvokerMap
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        // If there are multiple groups of providers, merge them into the cache
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            // Destroy useless invokers
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); 
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e); }}}Copy the code

If the protocol is Empty, clear the cache, destroy all invokers, install the URL as the Invoker List, and then convert the List into the mapping of <method,List< invoker >>. If there are multiple groups of service providers, merge them and add them to the cache. Finally empty the useless invoker. Let’s look at some of the specific operations

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
    if (urls == null || urls.isEmpty()) {
        return newUrlInvokerMap;
    }
    Set<String> keys = new HashSet<String>();
    // Get the protocol configured on the service consumer side
    String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
    for (URL providerUrl : urls) {
        // Check whether the protocol is supported by the consumer
        if(queryProtocols ! =null && queryProtocols.length() > 0) {
            boolean accept = false;
            String[] acceptProtocols = queryProtocols.split(",");
            for (String acceptProtocol : acceptProtocols) {
                if (providerUrl.getProtocol().equals(acceptProtocol)) {
                    accept = true;
                    break; }}If the service provider protocol header is not supported by the consumer, the current providerUrl is ignored
            if(! accept) {continue; }}// Ignore the empty protocol
        if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
            continue;
        }
        // SPI checks whether the server protocol is supported by the consumer. If not, an exception is thrown
        if(! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
                    + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
            continue;
        }
        / / merge url
        URL url = mergeUrl(providerUrl);

        String key = url.toFullString();
        // Filter duplicate urls
        if (keys.contains(key)) {
            continue;
        }
        keys.add(key);
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        // Create a new invoker if the cache does not hit
        if (invoker == null) { 
            try {
                boolean enabled = true;
                if(url.hasParameter(Constants.DISABLED_KEY)) { enabled = ! url.getParameter(Constants.DISABLED_KEY,false);
                } else {
                    enabled = url.getParameter(Constants.ENABLED_KEY, true);
                }
                if (enabled) {
                    // Create a new invoker
                    invoker = newInvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl); }}catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            if(invoker ! =null) { 
                // Add the newly created invoker to the cachenewUrlInvokerMap.put(key, invoker); }}else {
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}
Copy the code

The logic is simple: check if the protocol is supported by the consumer, merge the URL to retrieve the invoker from the cache, and create a new invoker if the cache misses

private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
    Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
    List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
    if(invokersMap ! =null && invokersMap.size() > 0) {
        for (Invoker<T> invoker : invokersMap.values()) {
            // Get the methods parameter value from the URL
            String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
            if(parameter ! =null && parameter.length() > 0) {
                // Get the list of methods
                String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
                if(methods ! =null && methods.length > 0) {
                    for (String method : methods) {
                        if(method ! =null && method.length() > 0
                                && !Constants.ANY_VALUE.equals(method)) {
                            // Get the invoker list from the cache. If it misses, create a new invoker list and add it to the cache
                            List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
                            if (methodInvokers == null) {
                                methodInvokers = newArrayList<Invoker<T>>(); newMethodInvokerMap.put(method, methodInvokers); } methodInvokers.add(invoker); } } } } invokersList.add(invoker); }}// Perform service level routing
    List<Invoker<T>> newInvokersList = route(invokersList, null);
    // Store <*, newInvokersList> mappings
    newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
    if(serviceMethods ! =null && serviceMethods.length > 0) {
        for (String method : serviceMethods) {
            List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
            if (methodInvokers == null || methodInvokers.isEmpty()) {
                methodInvokers = newInvokersList;
            }
            // Perform method level routingnewMethodInvokerMap.put(method, route(methodInvokers, method)); }}// Sort to immutable list
    for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
        List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
        Collections.sort(methodInvokers, InvokerComparator.getComparator());
        newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
    }
    return Collections.unmodifiableMap(newMethodInvokerMap);
}
Copy the code

The above logic is relatively simple: traverse the map, obtain the methods array, and store it in the map according to the mapping between method names and method names to the Invoker list. Route based on service and method levels, sort the Invoker list and transform it into an immutable list

private Map<String, List<Invoker<T>>> toMergeMethodInvokerMap(Map<String, List<Invoker<T>>> methodMap) {
    Map<String, List<Invoker<T>>> result = new HashMap<String, List<Invoker<T>>>();
    for (Map.Entry<String, List<Invoker<T>>> entry : methodMap.entrySet()) {
        String method = entry.getKey();
        List<Invoker<T>> invokers = entry.getValue();
        Map<String, List<Invoker<T>>> groupMap = new HashMap<String, List<Invoker<T>>>();
        for (Invoker<T> invoker : invokers) {
            // Get the group parameter value
            String group = invoker.getUrl().getParameter(Constants.GROUP_KEY, "");
            List<Invoker<T>> groupInvokers = groupMap.get(group);
            if (groupInvokers == null) {
                groupInvokers = new ArrayList<Invoker<T>>();
                groupMap.put(group, groupInvokers);
            }
            groupInvokers.add(invoker);
        }
        // If groupMap has only one set of key-value pairs
        if (groupMap.size() == 1) {
            result.put(method, groupMap.values().iterator().next());
        } else if (groupMap.size() > 1) {
            // Merge multiple invokers
            List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>();
            for (List<Invoker<T>> groupList : groupMap.values()) {
                groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList)));
            }
            result.put(method, groupInvokers);
        } else{ result.put(method, invokers); }}return result;
}
Copy the code

If there is only one pair of groupMap, the key-value pair is returned. If there are multiple groupMap pairs, each group of Invokers is merged into Result by cluster class

Finally, there is the logic of destruction

private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
    if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
        destroyAllInvokers();
        return;
    }
    // check deleted invoker
    List<String> deleted = null;
    if(oldUrlInvokerMap ! =null) {
        Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
        // Find the intersection of oldUrlInvokerMap and newUrlInvokerMap and store it in List
      
        deleted
      
        for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
            if(! newInvokers.contains(entry.getValue())) {if (deleted == null) {
                    deleted = newArrayList<String>(); } deleted.add(entry.getKey()); }}}// Remove invoker from the deleted collection from oldUrlInvokerMap
    if(deleted ! =null) {
        for (String url : deleted) {
            if(url ! =null) {
                // Remove invoker from oldUrlInvokerMap
                Invoker<T> invoker = oldUrlInvokerMap.remove(url);
                if(invoker ! =null) {
                    try {
                        / / destroy the invoker
                        invoker.destroy();
                        if (logger.isDebugEnabled()) {
                            logger.debug("destroy invoker[" + invoker.getUrl() + "] success. "); }}catch (Exception e) {
                        logger.warn("destroy invoker[" + invoker.getUrl() + "] faild. " + e.getMessage(), e);
                    }
                }
            }
        }
    }
}
Copy the code

The above logic is to find the URL to delete from oldUrlInvokerMap according to newUrlInvokerMap, remove it from oldUrlInvokerMap and destroy the Invoekr

conclusion

The logic of the whole service catalog has been seen, mainly to analyze the logic of the dynamic service catalog, and summarize:

  1. According to different category types the url url encapsulation into routerUrls, configuratorUrls
  2. Convert the URL to an Invoker list and a method-to-Invoker list mapping, or merge methodInvokerMap if there are multiple groups of providers
  3. Finally, destroy useless invokers