“This article has participated in the good article call order activity, click to view the back end, big front end double track submission, 20,000 yuan prize pool for you to challenge!”

Well, it is a thankless long source code parsing article, liver make reading completion rate low into the bottom, but unexpectedly say good to write a series of dubbo, kneeling also to write, although low reading of the poor, but I believe that this a series of written, sui generis, should be reading came up, and should, shouldn’t, just when I didn’t say.

So what is dubbo? Dubbo is your banana. Look at the first two

Play Dubbo. Go through the door first

Play Dubbo, and the secret service is exposed

Ok, no more nonsense, also said so much, next serious talk about service introduction.

What is service introduction

I thought about it for a long time in my two-thousand-square-meter room, and found it a little difficult to answer the question.

Let’s just say, if you’re A Java student, you know the concept of an object introduction, A service introduction is basically bringing an object in and binding it to A service, so you think you’re calling object A, but it’s actually being proxied, and you end up calling an object on A server somewhere far, far away.

Coarse is also rough, the next or in accordance with my daily thinking about life posture, divided into several directions to talk about the introduction of service, I hope you can watch, and I, with the same posture to think about life.

The following source code sample is from Dubbo2.6x. Comments on the source code have been submitted to Github and can be cloned if needed:

Github.com/wiatingpub/…

When is the service introduction triggered

Again, start with the configuration of the service introduction

As mentioned in service exposure, Dubbo takes the classic XML configuration, and of course uses NamespaceHandlerSupport to map the node configuration in the XML to the corresponding object, which we’ve already seen. Let’s move on to see which object NamespaceHandlerSupport parses

Oh, it is;

See what’s so special about this object.

Wave Ctrl + H over the class and you can see the implementation structure of the class

Oh yes, you see that FactoryBean and InitializingBean are implemented,

First, you implement the interface InitializingBean, which provides the method afterPropertiesSet, to see what’s the use

As you can see, this is used for the hite mode. If preprocessing is configured in the configuration, it directly introduces the service, which is the init parameter

There’s a hungry guy mode, so where’s the lazy guy? Lazy bum where are you

This is where FactoryBean comes in, and look at the method provided by this interface, the getObject method

As you can see, the lazy mode is introduced when the ReferenceBean’s corresponding service is injected into another class.

Well, that’s the end of the article. I’m sleepy, too.

There must be a lot of jokes at this point. Is that it? Do you think I’m afraid? It’s impossible to tell you that almost no one will read the rest of this article, and probably no one will even see it here. Also, so, go to bed first, after all, it is 1:00 am now, it is better to go to bed early.

(it’s even… It’s three o ‘clock, can’t sleep, forget it, keep writing, can’t sleep anyway, as self entertainment can.

Seems to be missing a flow chart, then it is difficult to draw it, maybe the painting will want to sleep.

Again, those who want to see the overall flow chart can click the link: Flow chart link

Next, it is not suitable for children to destroy your outlook on life, world view, love view of the three ultra-long service introduction process, the process may cause fainting, uncomfortable people can get off in advance.

A wave of URL

It was mentioned in the last article, if you read it, you can ignore it, it’s exactly the same, and I put it here so that you don’t have to read the last article.

Before we talk about the introduction of the service, we must first mention the URL, otherwise the main line is lost, and it is difficult to follow.

Before I came into contact with Dubbo, MY URL positioning refers to the network address, while in Dubbo, it can be considered as a convention, almost all modules of Dubbo pass parameters through the URL, what is the benefit of this?

Well we can think about it, if there is no agreement, then the parameters of the interaction between different interface will go bad, for a while is a string, a moment is the map, and with the provisions of the uniform, the code will be more normative and unified, when we look at the code will be more clear, and easy to expand, for example, if you want to develop something, Just concatenate the parameters directly to the URL.

As you can see, except for a few basic parameters, many of the parameters actually end up in parameters.

In the project of our company, we refer to the DESIGN of URL to build a metadata structure, namely map, and pass some dynamic parameters of the service through map.

Service introduction process

The best way to look at code is to look at it with questions. – handsome rice meal

That makes sense. So when was the remote server connected when the service was introduced?

To answer that question, let’s go straight to the answer

Yeah, this is it. So what’s the process?

After reading such a long process, I know you are ready to close this damn article, don’t worry, eldest brother first have a drink of tea, I have sorted out the next few important steps, just need to figure out the following steps, the rest are all packaging and SPI search

  • Init done what
  • CreateProxy did what
  • RegistryProtocol did what
  • RegistryDirectory did what
  • DubboProtocol did what

What? There are more? The whole process can be boiled down to:

No skin, love to see not to see, anyway, after watching not to like, then talk in detail.

1. What init does

First of all, the service entry, whether it’s hungry mode or lazy mode, is going to get, and get eventually goes to init, so let’s start with init, let’s see what init does

Kill a person directly first do not compensate the code

private void init(a) {
    // If it has already been initialized, return it
    if (initialized) {
        return;
	/** Set parameters **/
    checkApplication();
    // Local stub validity check
    checkStub(interfaceClass);
    // Mock validity check
    checkMock(interfaceClass);
    // Use map to store the configuration
    Map<String, String> map = new HashMap<String, String>();
    Map<Object, Object> attributes = new HashMap<Object, Object>();
    // Indicates that this is the consumer side
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
    // Add the dubbo version
    map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
    // Add a timestamp
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
    if (ConfigUtils.getPid() > 0) {
        / / pid, etc
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
    }
    if(! isGeneric()) { String revision = Version.getVersion(interfaceClass, version);if(revision ! =null && revision.length() > 0) {
            // Set the version number
            map.put("revision", revision);
        }
        // Get all the methods and put all the method signatures into the map
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        } else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); }}// Add the service interface name
    map.put(Constants.INTERFACE_KEY, interfaceName);
    // Continue to add information to the map
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    String prefix = StringUtils.getServiceKey(map);
    if(methods ! =null && !methods.isEmpty()) {
        // Iterate over the method configuration
        for (MethodConfig method : methods) {
            // Put the method configuration into the map
            appendParameters(map, method, method.getName());
            // Generate a configuration key for retries
            String retryKey = method.getName() + ".retry";
            // If the configuration already exists, remove the configuration
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                // If false means no retry, set the retry count to 0
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries"."0");
                }
            }
            appendAttributes(attributes, method, prefix + "." + method.getName());
            // Set asynchronous configurationcheckAndConvertImplicitConfig(method, map, attributes); }}// Obtain the service consumer IP address, etc
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    if (hostToRegistry == null || hostToRegistry.length() == 0) {
        // You can see that if it is empty, the local IP is obtained
        hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
        throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
    }
    // Put the consumer IP into the map
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

    //attributes are stored by system context.
    StaticContext.getSystemContext().putAll(attributes);
    // Create a proxy object
    ref = createProxy(map);
    // Generate the ConsumerModel and put it into the AppliacitonModel
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}
Copy the code

As you can see, I’ve condensed some of the code into the next three steps

  • Check the validity of local stubs and mocks
  • Add protocol version, timestamp, release version, method configuration, retry configuration, asynchronous configuration, and consumer IP to map
  • Creating a proxy object

Supplementary flow chart

In our company’s RPC components, we also considered whether to support the introduction of the two modes of service, and finally gave up, thinking that it is not necessary. In our company’s project, we did the introduction of the service when we started, which is a kind of hungry Han mode.

2. What createProxy does

The next big thing is to create the proxy object, so let’s continue with the analysis

private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp"."localhost".0, map);
        final boolean isJvmRefer;
        // Check whether it is a local call according to the configuration
        if (isInjvm() == null) {
            if(url ! =null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            } else {
                isJvmRefer = false; }}else {
            isJvmRefer = isInjvm().booleanValue();
        }

        if (isJvmRefer) {
            // If the call is local, generate the URL
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // Use InjvmProtocol to generate InjvmInvoker instance
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service "+ interfaceClass.getName()); }}else {
            // If the url is not empty, it indicates that the call is made through the direct connection
            if(url ! =null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if(us ! =null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else{ urls.add(ClusterUtils.mergeUrl(url, map)); }}}}else {
                // Empty means that you want to use the registry to call, here is the registry URL
                List<URL> us = loadRegistries(false);
                if(us ! =null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if(monitorUrl ! =null) { map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString())); } urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); }}if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config 
       to your spring config."); }}if (urls.size() == 1) {
                // In the case of multiple URL connections, the Invoker instance is built by calling the Refer of RegistryProtocl
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else{ List<Invoker<? >> invokers =newArrayList<Invoker<? > > (); URL registryURL =null;
                // Iterate over all connected urls
                for (URL url : urls) {
                    // Build the Invoker instance by calling the Refer of RegistryProtocl
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if(Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; }}// Merge multiple Invokers by cluster
                if(registryURL ! =null) {
                    URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null&& consumer ! =null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        if(c && ! invoker.isAvailable()) {// make it possible for consumer to retry later if provider is temporarily unavailable
            initialized = false;
            throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        // Create the service proxy
        return (T) proxyFactory.getProxy(invoker);
    }
Copy the code

I don’t need to look at the code. It would be better to look at the comments. The reason why I post so much code is probably to make the article look more content.

The general logic of this method can be divided into the following steps:

  • Check whether the call is local. If yes, use the Refer method of InjvmProtocol to generate an Invoker instance
  • If the call is not local, merge the urls of the remote connection according to the direct connection configuration or registry configuration
  • Traverse each URL and add the configuration of the monitoring center
  • If it is a single URL, it directly introduces the Refer of the RegistryProtocol through SPI to build an Invoker instance; if it is multiple urls, it traverses the urls, generates Invoker for each URL, and uses cluster to combine multiple Invokers.
  • The final output is an Invoker, because if there are multiple invokers, the cluster will also encapsulate them into one.

Continue to supplement the flow chart:

Why is it necessary to encapsulate Invoker?

As I mentioned in the last article, the idea is to mask the details of local or remote or clustered calls, and to expose an executable that the caller can call, but no matter how it is encapsulated, it actually ends up calling the target method. As you can see, even if there are multiple invokers, It will eventually be clustered into an Invoker.

In terms of dubbo design, it can be said that invoker carries the scheduling of logic, while URL carries the passing parameters of configuration. It can be thought that URL is the material, and finally generates the corresponding food, that is, invoker. In our company, we also package the logic through invoker.

3. What the RegistryProtocol does

Next look at the RegistryProtocol, here is the specific logic to generate invoker, as to how SPI mechanism to find the RegistryProtocol, interested in can continue to follow, will be written separately later

@Override
@SuppressWarnings("unchecked")
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    // Take the value of the Registry parameter and set it to the protocol header. The default is dubbo
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    // Get the registry instance
    Registry registry = registryFactory.getRegistry(url);
    // If it is a registry service, return the invoker of the registry service
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    // Set the URL to a map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    // Get the group value
    String group = qs.get(Constants.GROUP_KEY);
    if(group ! =null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                || "*".equals(group)) {
            // If there are more than one group, the service introduction logic continues with the MergeableCluster call to doRefer
            returndoRefer(getMergeableCluster(), registry, type, url); }}// If there is only one group, doRefer continues
    return doRefer(cluster, registry, type, url);
}
Copy the code

If it is a registration service center, then create a proxy directly. If it is not, then process the configuration. According to the configuration, decide how to implement the Cluster, and finally call the doRefer method

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    / / create RegistryDirectory
    RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
    // Set the registry
    directory.setRegistry(registry);
    // Set the protocol
    directory.setProtocol(protocol);
    // Put all attributes in the map
    Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    // Generate a service consumer connection
    URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    if(! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY,true)) {
        URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
        // Register the service and generate a new node in the Consumers directory
        registry.register(registeredConsumerUrl);
        directory.setRegisteredConsumerUrl(registeredConsumerUrl);
    }
    // Subscribe to providers, Configuratios, and routers
    directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
            Constants.PROVIDERS_CATEGORY
                    + "," + Constants.CONFIGURATORS_CATEGORY
                    + "," + Constants.ROUTERS_CATEGORY));
    // A registry may have multiple service providers, so use a cluster to merge the clusters into a single Invoker
    Invoker invoker = cluster.join(directory);
    // Register the consumer with the service provider
    ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
    return invoker;
}
Copy the code

The core steps of this method are as follows:

  • Create a RegistryDirectory, regenerate a service consumer connection, register the service with the registry, and generate a new node under the Consumers directory

  • Subscribe to the data of providers, Configuratios, routers and other nodes, so that the Diretory can sense the change information of these nodes. For the future service governance, we will talk about this later, and we are interested to keep paying attention to it

  • Merging Invoker, it is possible for a registry to have multiple service providers, so cluster into a single Invoker using a cluster

What is Directory?

Directory is referred to as the Directory of services. To put it simply, consumers cache the information of service providers that they can call into the local Directory. When the service provider changes, consumers will notify the registry and listen for messages about relevant services in the registry. The local service Directory is updated when a message is received about changes to the relevant service provider. You can get rid of it, but the model is not that beautiful.

Continue to supplement the process

4. What the RegistryDirectory has done

Can see the first RegistryDirectory. Subscribe to do what

public void subscribe(URL url) {
    setConsumerUrl(url);
    registry.subscribe(url, this);
}
Copy the code

Since the registry is configured as ZookeeperRegistry, the subscription logic eventually goes to ZookeeperRegistry, which inherits FailbackRegistry. So is eventually go FailbackRegistry. Subscribe

public void subscribe(URL url, NotifyListener listener) {
    super.subscribe(url, listener);
    removeFailedSubscribed(url, listener);
    try {
        // The real place to do the subscription
        doSubscribe(url, listener);
    } catch (Exception e) {
        Throwable t = e;

        List<URL> urls = getCacheUrls(url);
        if(urls ! =null && !urls.isEmpty()) {
            notify(url, listener, urls);
            logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
        } else {
            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true);
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to subscribe " + url + ", waiting for retry, cause: "+ t.getMessage(), t); }}// If the subscription fails, it is placed in the failure containeraddFailedSubscribed(url, listener); }}Copy the code

In fact, we can guess from the name FailbackRegistry that if the subscription fails, there will be a retry mechanism to run again. No, the last line of code confirms this. Let me show you the structure of FailbackRegistry

// TODO:2021/5/24 Get the retry frequency parameter from the URL and start the timer for the retry logic
public FailbackRegistry(URL url) {
    super(url);
    this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run(a) {
            try {
                // TODO:2021/5/29 Retry at a specified time
                retry();
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
            }
        }
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}
Copy the code

Sure enough, eventually, if an exception occurs to the subscription, a scheduled retry is performed.

If you’re careful, you might see void subscribe(URL URL, NotifyListener listener), and NotifyListener, what’s this?

You can see

This is essentially the RegistryDirectory itself, where the listener pattern is used to enable the RegistryDirectory to be aware of changes to service instances.

It’s about to get into high gear

@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // Process subscriptions whose URL parameter is interface *, such as monitoring center subscriptions
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            // Get the root directory
            String root = toRootPath();
            // Get the corresponding listener based on the URL
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                // Create a listener
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            // Get the node listener
            ChildListener zkListener = listeners.get(listener);
            // If the node listener is empty, create it
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    @Override
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if(! anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); }}}}); zkListener = listeners.get(listener); }// Create the service node
            zkClient.create(root, false);
            // Send a subscription to the ZooKeeper service
            List<String> services = zkClient.addChildListener(root, zkListener);
            if(services ! =null && !services.isEmpty()) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // Initiate a subscription to the service layer
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener); }}}else {
            List<URL> urls = new ArrayList<URL>();
            // Iterate over the classification array
            for (String path : toCategoriesPath(url)) {
                // Get the listener set
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                // If not, create it
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                // Get the listener
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            NotifyListener: NotifyListener: NotifyListener
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); }}); zkListener = listeners.get(listener); }/ / create nodes, such as: dubbo/com. Alibaba. Dubbo. Demo. DemoService/will
                zkClient.create(path, false);
                List<String> children = zkClient.addChildListener(path, zkListener);
                if(children ! =null) { urls.addAll(toUrlsWithEmpty(url, path, children)); }}// Notify data changes, such as RegistryDirectorynotify(url, listener, urls); }}catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: "+ e.getMessage(), e); }}Copy the code

It’s a lot of code, but it’s just a few steps

  • Build the listener, create a few nodes in the registry,
  • Listen for service changes on these nodes
  • A change to the listener is triggered when the specific service URL is retrieved, which is perceived by the RegistryDirectory

Let’s go ahead and see what the RegistryDirectory does when it is aware of it

private void refreshInvoker(List<URL> invokerUrls) {
    if(invokerUrls ! =null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // Forbid to access
        this.methodInvokerMap = null; // Set the method invoker map to null
        // Close all invoker
        destroyAllInvokers(); // Close all invokers
    } else {
        this.forbidden = false; // Allow to access
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls ! =null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        if (invokerUrls.isEmpty()) {
            return;
        }
        // Convert the invokers passed in to the new urlInvokerMap
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
        // Transform the new methodInvokerMap
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // Change method name to map Invoker Map
        // state change
        // If the calculation is wrong, it is not processed.
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            // Log off the invoker that is no longer used
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e); }}}Copy the code

Compared to actually destroying the old one, according to the new build invoker, we can see that the next step is to the lat DubboProtocol

Continue to supplement the flow chart

5. What the DubboProtocol does

The end is actually through a few wrapper classes, and then through SPI to dubboprotocol.Refer

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);
    // Create a DubboInvoker and connect to the remote server, placing the connection wrapper ExchangeClient in the DubboInvoker
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
    return invoker;
}
Copy the code

The core is to create a DubboInvoker and connect to the remote server to wrap the connection into the DubboInvoker, so what does the behavior of initiating the connection look like, moving on to the last few steps

private ExchangeClient[] getClients(URL url) {
    // Whether one connection corresponds to one service
    boolean service_share_connect = false;
    // Get the connection share configuration in the URL. The default value is 0
    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
    // If there is no configuration, the class is shared and the number of connections is 1
    if (connections == 0) {
        service_share_connect = true;
        connections = 1;
    }

    // Create an ExchangeClient array
    ExchangeClient[] clients = new ExchangeClient[connections];
    for (int i = 0; i < clients.length; i++) {
        if (service_share_connect) {
            // If shared, the shared client object is obtained
            clients[i] = getSharedClient(url);
        } else {
            // Create a new connectionclients[i] = initClient(url); }}return clients;
}
Copy the code

As you can see, depending on the CONFIGURATION of the URL, it’s going to fetch the connection that’s shared, if so, it’s going to fetch the connection that’s in the cache, and if it’s not shared, it’s going to create a new connection, so let’s just look at creating a new connection

private ExchangeClient initClient(URL url) {

    // client type setting.
    String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

    url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

    // BIO is not allowed since it has severe performance issue.
    if(str ! =null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), ""));
    }

    ExchangeClient client;
    try {
        // Whether to delay the connection. The default value is false
        if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
            client = new LazyConnectExchangeClient(url, requestHandler);
        } else {
            // Initiate the connection directlyclient = Exchangers.connect(url, requestHandler); }}catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + ")." + e.getMessage(), e);
    }
    return client;
}
Copy the code

Finally, this is the final step, where the connection is actually initiated, and whether or not the delay is supported.

Continuation flow chart

After generating the specific Invoker, the next step is to expose the Invoker to external calls, as we can see

Finally, this process is implemented through the proxy mechanism. This part of the logic was mentioned in the previous article, so you can have a look at it

There are still some things in the service exposure process, and you need to master Dubbo SPI, otherwise some points such as adaptive are difficult to understand, and I spent a lot of time on this article.

At the end I’m going to give you a complete flow chart to walk you through again, there’s still a lot of detail, but it’s not the main body and I’m not going to analyze it, otherwise it will fall apart.

The following service governance, APO and SPI mechanisms will also be expanded on the flowchart. Interested parties can also follow the flowchart link:

Flowchart link

conclusion

If you’re looking for an interviewer to ask you a question, remember the flow chart above. When you’re done with Dubbo, you’ll find there’s a lot to write about. Follow up

  • How does a service consumer invoke a service provider remotely

  • SPI

  • AOP mechanism in Dubbo

  • Service governance

  • .

    Wait for several modules, finally is to bring you a RPC framework, or that sentence, want to learn Dubbo can continue to pay attention to this series.