Brief introduction to Dubbo registration discovery call process

preface

The author has been using Dubbo and Nacos as distributed governance frameworks since contacting with distributed governance. However, I have never studied how Dubbo registers and exposes services before. I wanted to write an article about Dubbo for a long time, but I did not have time to read Dubbo source code. So here to share their own views, if you have different views welcome to discuss together

The environment

  1. Pull code from Github open source site
  2. This article uses the 2.6.x branch (3.0 is too new to see, but the overall protocol is based on the 2.6-7 version, see dubbo website for details)
  3. openjdk8

The composition of dubbo

Dubbo’s overall design

Description:

  • Blue is the interface used by the service consumer, green is the interface used by the service provider, and the central axis is the interface or implementation class used by both.
  • The overall design of Dubbo can be divided into 10 layers, each layer is unidirectional dependence, and each time the upper layer can be stripped and reused, among which the Sevice layer and Config layer are API layer, and the other layers are SPI. So it’s very extensible
  • In the figure, the green blocks are extension interfaces and the blue blocks are implementation classes. The figure only shows the implementation classes used to associate each layer.
  • In the figure, the dotted blue line is the initialization process, that is, the assembly chain at startup, the solid red line is the method call process, that is, the run-time call chain, the purple triangle arrow is the inheritance, you can regard the subclass as the same node of the parent class, and the text on the line is the method called.

Each layer specification

  • Config configuration layer: external configuration interface, centering on ServiceConfig and ReferenceConfig, can directly initialize configuration classes, or generate configuration classes through Spring configuration parsing
  • Proxy ServiceProxy layer: transparent proxy of service interfaces. The client Stub and server Skeleton of the service are generated. The extension interface is ProxyFactory
  • Registry layer: encapsulates the registration and discovery of service addresses, centering on service URLS and extending interfaces as RegistryFactory, Registry, and RegistryService
  • Cluster routing layer: encapsulates routing and load balancing of multiple providers, Bridges registries, centers on Invoker, and extends interfaces to Cluster, Directory, Router, and LoadBalance
  • Monitor monitoring layer: Monitors the number and time of RPC calls. It centers on Statistics and extends interfaces to MonitorFactory, Monitor, and MonitorService
  • Protocol Remote Invocation layer: Encapsulates RPC Invocation with Protocol, Invoker, and half interface, based on Invocation and Result
  • Exchange information exchange layer: It encapsulates the Request and Response mode, turns synchronous to asynchronous, uses Request and Response as the center, and uses exchange channel, ExchangeClient and ExchangeServer as the expansion interface
  • Transport Network transport layer: Abstract MINA and Netty as the unified interface, Message as the center, extended interfaces are Channel, Transporter, Client, Server, Codec
  • Serialize data Serialization layer: reusable tools with Serialization, ObjectInput, ObjectOutput, and ThreadPool extensions

Module subcontracting Instructions

  • Dubo-common logic module: includes Util classes and common models.
  • Dubbo -remoting: An implementation of the Dubbo protocol. If RPC uses RMI, this package is not required. This module contains the Transport layer and exchange layer in layer 10 as the base modules for RPC calls.
  • Dubbo-rpc remote call module: Abstracts various protocols, as well as dynamic proxies, containing only one-to-one calls, with no concern for cluster management. This module contains the Protocol and Proxy layers in layer 10. Interface exposure and proxy generation are in this layer.
  • Dubbo-cluster cluster module: multiple service providers can be disguised as one provider, including load balancing, fault tolerance, routing, etc. The address list of the cluster can be statically configured or delivered by the registry. This module is a Cluster layer of 10 layers
  • Dubbo-registry registry module: clustering based on registries’ distribution addresses and abstractions to various registries.
  • Dubo-monitor monitoring module: statistics service call times, call time, call chain tracking service. This layer is the Monitor layer of the 10 layers
  • Dubo-config configuration module: it is an EXTERNAL API of Dubbo. Users use Dubbo through config to hide all details of Dubbo. This layer includes the Config layer of layer 10
  • Dubbo-container: Is a Standlone container that starts with a simple Main load of Spring. There is no need to use a Web container to load services because services usually do not require Web container features such as Tomcat/JBoss.
  • Dubbo-serialization: Serialization of RPC calls (currently there are 5 fastJSON, FST, Hessian2, JDK, kryo) the default is Hessian2. This module is the serialize layer in layer 10

At this point, all Dubbo source modules and design layers are matched. From this information you can actually see that Dubbo was very clear about the module layering when he designed it.

Project startup initialization process details (producer and consumer)

Analytical services

Based on the Meta-INF/Spring. handlers configuration in Dubo.jar, Spring calls back to the DubboNamespaceHandler when it encounters the Dubbo namespace. Label all dubbo, unified use DubboBeanDefinitionParser parsing, based on one-on-one attribute mapping, XML parsing for Bean object tags. When serviceconfig.export () or referenceconfig.get () is initialized, the Bean object is converted to URL format and all Bean properties are converted to URL parameters. Then, the URL is sent to the protocol extension point. Based on the extension point adaptive mechanism, services of different protocols are exposed or referenced according to the protocol header of the URL.

Exposed services

  1. 1.ServiceConfig parses the URL in the following format: dubbo://service-host/ com.foo.fooservice? Version = 1.0.0. Based on the extension point adaptive mechanism, through the URL dubbo:// protocol header recognition, directly call the export() method of DubboProtocol, open the service port.

  2. Providers address registered in the registry, need 2, under the condition of ServiceConfig parsing the URL format for: registry: / / registry – host/org. Apache. Dubbo. Registry. RegistryService? export=URL.encode(“dubbo://service-host/com.foo.FooService? Version =1.0.0”), based on the extension point adaptive mechanism, through the REGISTRY :// protocol header recognition of the URL, the export() method of the RegistryProtocol is called to register the provider URL in the export parameter with the registry first. Dubbo ://service-host/ com.foo.fooservice? Version =1.0.0, then the export() method of the DubboProtocol is invoked to open the service port through the dubbo:// protocol header recognition of the provider URL based on the extension point adaptive mechanism.

Reference service

  1. In the case of no registry and direct connection to the provider 3, the URL resolved by ReferenceConfig is in the following format: dubbo://service-host/ com.foo.fooservice? Version = 1.0.0. Based on the extension point adaptive mechanism, the refer() method of DubboProtocol is directly called through the DUbbo :// protocol header recognition of the URL to return the provider reference.

  2. Discover reference services from the registry:

In the case that there is a registry and the provider address is found through the registry, 4, the FORMAT of URL resolved by ReferenceConfig is: registry://registry-host/org.apache.dubbo.registry.RegistryService? refer=URL.encode(“consumer://consumer-host/com.foo.FooService? Version = “1.0.0). Based on the extension point adaptive mechanism, the refer() method of RegistryProtocol is called through the registry:// protocol header of the URL. Based on the conditions in the refer parameter, the provider URL is queried, for example: dubbo://service-host/com.foo.FooService? Version = 1.0.0. Based on the extension point adaptive mechanism, the refer() method of DubboProtocol is called to get the provider reference through the dubbo:// protocol header recognition of the provider URL. RegistryProtocol then returns multiple provider references masquerading as a single provider reference through the Cluster extension point.

Summary of project startup initialization details

Regardless of producer or consumer, registry or not, the Protocol interface is indispensable. The DubboProtocol implementation class is used to expose and reference the service when there is no registry to remove and reuse. When there is a registry, Dubbo’s adaptive mechanism is changed to RegistryProtocol as the implementation class for exposing and referencing the service. So Protocol is accused of exposing and referencing services, and its implementation depends on the context in which Dubbo is used.

As shown in figure:

    Protocol.java
    /** * The exposed interface for remote invocation, where Invoker is converted to Exporter * 1. After receiving a request, the protocol records the source address of the request rpcContext.getContext ().setremoteAddress (); * 2. Export () must be idempotent, i.e. there is no difference between invoking the same URL once and invoking it twice@param<T> Service type Interface type *@paramInvoker Service Invoker interface type converts to URL and then converts to Invoker */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    1. When a user invokes the invoke() method of the 'Invoker' object returned from the 'refer()' call, the protocol needs to correspond to the 'invoke()' method of the 'Invoker' object. The responsibility of the protocol is to implement the 'Invoker' returned from 'refer()'. In general, the protocol sends remote requests in the 'Invoker' implementation. * 3. When check=false is set in the URL, the implementation must not throw an exception, but try to recover if the connection fails. * /
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
Copy the code

The process by which a producer exposes a service

First, the ServiceConfig class gets the actual class ref that provides the service. HelloWorldImpl), and then generate an instance of AbstractProxyInvoker using ref in the getInvoker method of the ProxyFactory class. Next comes the transition from Invoker to Exporter. The key to Dubbo’s handling of service exposure is the conversion of Invoker to Exporter, shown in red in the image above. The following are two typical implementations of Dubbo and RMI: Dubbo implementation of the Dubbo protocol Invoker turns to my friend in the Export method of the DubboProtocol class, which mainly opens the socket listening service and receives various requests from the client. The communication details are implemented by Dubbo himself.

The process by which a consumer references a service

First, the init method of the ReferenceConfig class calls the refer method of Protocol to generate the Invoker instance (shown in red), which is the key to service consumption. Next, convert Invoker to the interface required by the client (for example, HelloWorld). The details of each protocol such as RMI/Dubbo/Web Service that calls the refer method to generate an Invoker instance are similar to those of the producer but the steps are different.

@SPI("javassist")
public interface ProxyFactory {

    /** * Create proxy **@param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    /** * Create proxy **@param invoker
     * @return proxy
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

    /** * Convert the proxy to Invoker **@param <T>
     * @param proxy
     * @param type
     * @param url
     * @return invoker
     */
    @Adaptive({Constants.PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
Copy the code

From figure 1 shows this interface is used by default JavassistProxyFactory ProxyFactory. Java implementation class, and the interface is can use to producers and consumers, but also can know ProxyFactory: : getProxy method is applicable to the consumer, ProxyFactory::getInvoker applies to producers. It says that both producers and consumers use Invoker, but this getInvoker method is only used by producers. The following chart clearly shows why both producers and consumers have invokers.

Server source code analysis

Project initialization phase

The Dubbo service export process starts when the Spring container publishes a refresh event, and Dubbo immediately executes the service export logic upon receiving the event. The entry method for the service export is onApplicationEvent of ServiceBean. OnApplicationEvent is an event-response method that performs a service export upon receiving a Spring context refresh event. The method code is as follows: Source branch 2.6.x is the onApplicationEvent entry, And 2.7.8 exposure fusion to Springboot OneTimeExecutionApplicationContextEventListener. Java DubboBootstrap after listening. Start method, of course, the use is branch of Java 2.6. x

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean.DisposableBean.ApplicationContextAware.ApplicationListener<ContextRefreshedEvent>, BeanNameAware.ApplicationEventPublisherAware {

    private transient volatile boolean exported;

    private transient volatile boolean unexported;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // Whether the export is delayed (delayed false) && Whether the export is already performed && Whether the export is cancelled
        if(isDelay() && ! isExported() && ! isUnexported()) {if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: "+ getInterface()); } export(); }}private boolean isDelay(a) {
        Integer delay = getDelay();
        ProviderConfig provider = getProvider();
        if (delay == null&& provider ! =null) {
            delay = provider.getDelay();
        }
        return supportedApplicationListener && (delay == null || delay == -1);
    }

    public synchronized void export(a) {
        // The current class inherits ServiceConfig. Java so it looks at the current
        if(provider ! =null) {
            if (export == null) {
                export = provider.getExport();
            }
            if (delay == null) { delay = provider.getDelay(); }}if(export ! =null && !export) {
            return;
        }

        if(delay ! =null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                @Override
                public void run(a) {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else{ doExport(); }}/** * the actual output operation */
    protected synchronized void doExport(a) {
        if (unexported) {
            throw new IllegalStateException("Already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;
        // Check whether interfaceName is valid
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
        }
        // Check whether the provider is empty. If it is empty, create a new one and initialize it using system variables
        checkDefault();
        if(provider ! =null) {... }if (module! =null) {... }if(application ! =null) {... }// Check if ref is a generalized service type
        if (ref instanceof GenericService) {
            interfaceClass = GenericService.class;
            if(StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); }}// ref non-genericService type
        else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            // Check the necessary fields in the interfaceClass and 
      
        tags
      
            checkInterfaceAndMethods(interfaceClass, methods);
            // Check the validity of ref
            checkRef();
            generic = Boolean.FALSE.toString();
        }
        // Local and stub functions should be the same, used to configure local stubs
        if(local ! =null) {
            if ("true".equals(local)) {
                local = interfaceName + "Local"; } Class<? > localClass;try {
                // Get the local stub class
                localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            // Check whether the local stub class can be assigned to the interface class. If it cannot be assigned, an exception will be thrown to remind the user that the local stub class type is invalid
            if(! interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface "+ interfaceName); }}if(stub ! =null) {... }// Check whether various objects are empty. If empty, create a new object or throw an exception
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStub(interfaceClass);
        checkMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        // Export the service
        doExportUrls();
        CodecSupport.addProviderSupportedSerialization(getUniqueServiceName(), getExportedUrls());
        // ProviderModel represents the service ProviderModel, which stores information about the service provider in this object.
        // Service configuration information, service instances, etc. Each exported service corresponds to a ProviderModel.
        // ApplicationModel holds all providerModels.
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
    }

    /** * multi-protocol multi-registry export service * converts the current object to url.java */
    private void doExportUrls(a) {
        // Load the registry link
        List<URL> registryURLs = loadRegistries(true);
        // Traverse protocols and export services under each protocol
        for (ProtocolConfig protocolConfig : protocols) {
            Assembly / / URLdoExportUrlsFor1Protocol(protocolConfig, registryURLs); }}/** * Convert the URL to Invoker pseudocode */
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {...//methods is a MethodConfig collection, which stores configuration information for the 
      
        tag
      
        if(methods ! =null && !methods.isEmpty()) {
            for (MethodConfig method : methods) {
                // Perform all operations for method. }// end of methods for
        }

        // Similar to generics judgment
        if (ProtocolUtils.isGeneric(generic)) {
            ...
        } else{... }if(! ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {
                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
            } else{ map.put(Constants.TOKEN_KEY, token); }}if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
            protocolConfig.setRegister(false);
            map.put("notify"."false");
        }
        // Expose services that are urls converted into invokers. String scope = url.getParameter(Constants.SCOPE_KEY);// if scope = none, do nothing
        if(! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// scope ! = remote, exported to the local PC
            if(! Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); }// scope ! = local, export to remote
            if(! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                // There is a registry
                if(registryURLs ! =null && !registryURLs.isEmpty()) {
                    for (URL registryURL : registryURLs) {
                        ...
                        // Generate Invoker for the service provider class (ref)
                        // The producer's ProxyFactory gets InvokerInvoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this);
                        // The Invoker is exposed by default DubboProtocol
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } 
                // There is no registry, only export services
                else{ Invoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this); Exporter<? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter); }}}this.urls.add(url);
    }


    /** * DubboProtocol. Java export method */
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if(isStubSupportEvent && ! isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded.")); }}else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        openServer(url);
        optimizeSerialization(url);
        returnexporter; }}Copy the code
  • This is actually the interface that Spring scanned from the project startup to load the urls in ServiceConfig that translate the interface into the URL. When the URL is loaded, the ref(actual implementation class) is converted into Invoker, and the interface is exposed through DubboProtocol after converting into Invoker. At this point the whole start-up process of the producer has gone through.

Consumer source code parsing

Consumers and producers are essentially the same, except in a different order. Producers get Invoker through ref and then expose it through Protocol, while consumers

    public synchronized T get(a) {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        // Check if ref is empty and init if it is
        if (ref == null) {
            The init method is mainly used to handle configuration and createProxy classes by calling createProxy
            init();
        }
        return ref;
    }

    private void init(a) {
        // Avoid repeated initialization
        if (initialized) {
            return;
        }
        
        / /... I'm going to skip a lot of code and make a quick summary
        // 1. It is used to check whether the ConsumerConfig instance exists
        // 2. This logic is used to load the configuration corresponding to the interface name from system properties or configuration files and assign the result of parsing to the URL field. The URL field is typically used for point-to-point calls.
        String resolve = System.getProperty(interfaceName);
        // 3. Load the configuration corresponding to the interface name from system properties or configuration files and assign the result of parsing to the URL field. The URL field is typically used for point-to-point calls.
        // 4. Check whether several core configuration classes are empty. If they are empty, try to obtain them from other configuration classes.
        // 5. Collects various configurations and stores them in the Map.
        // 6. Used to handle MethodConfig instances. This instance contains event notification configurations such as onReturn, onThrow, onInvoke, and so on.

        // Get the service consumer IP address
        String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
        if (hostToRegistry == null || hostToRegistry.length() == 0) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(Constants.REGISTER_IP_KEY, hostToRegistry);

        //attributes are stored by system context.
        StaticContext.getSystemContext().putAll(attributes);
        // Create proxy
        ref = createProxy(map);
        ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
        ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
    }

    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp"."localhost".0, map);
        final boolean isJvmRefer;
        // This code determines whether the current class is local, which is called a local call
        if (isInjvm() == null) {
            // if the url configuration is specified, no local reference is made
            if(url ! =null && url.length() > 0) { // if a url is specified, don't do local reference
                isJvmRefer = false;
            } 
            // Check whether local references are required according to the protocol, scope, and injVM parameters of the URL
            For example, if scope=local is explicitly specified, isInjvmRefer returns true
            else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
                // by default, reference local service if there is
                isJvmRefer = true;
            } else {
                isJvmRefer = false; }}else {
            // Get injVM configuration value
            isJvmRefer = isInjvm().booleanValue();
        }
        // Local reference
        if (isJvmRefer) {
            // Generate a local reference URL with the protocol injvm
            URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
            // Call the refer method to build InjvmInvoker instance
            // RefProtocol is DubboProtocol for local references
            invoker = refprotocol.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service "+ interfaceClass.getName()); }}// Remote call
        else {
            // If the url is not empty, the consumer wants to call point-to-point. This URL can be set in DubboReference's URL property
            if(url ! =null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
                String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
                if(us ! =null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (url.getPath() == null || url.getPath().length() == 0) {
                            url = url.setPath(interfaceName);
                        }
                        // Check whether the URL protocol is Registry, if the user wants to use the specified registry
                        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                            urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                        } else{ urls.add(ClusterUtils.mergeUrl(url, map)); }}}}// An empty URL indicates that the user is connected to a registry
            else { // assemble URL from register center's configuration
                // Load the registry URL
                List<URL> us = loadRegistries(false);
                if(us ! =null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if(monitorUrl ! =null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        // Add the refer parameter to the URL and add the URL to the urlsurls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); }}// No registry is configured, an exception is thrown
                if (urls.isEmpty()) {
                    throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config 
       to your spring config."); }}// There is only one registry
            if (urls.size() == 1) {
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                // Multiple registriesList<Invoker<? >> invokers =newArrayList<Invoker<? > > (); URL registryURL =null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url}}// The invokers will be merged into one through the cluster layer, depending on the strategy set by the consumer
                if(registryURL ! =null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameterIfAbsent(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        Boolean c = check;
        if (c == null&& consumer ! =null) {
            c = consumer.isCheck();
        }
        if (c == null) {
            c = true; // default true
        }
        Dubo.consumer.check =false
        if(c && ! invoker.isAvailable()) { ...// create service proxy
        // Create the proxy
        return (T) proxyFactory.getProxy(invoker);
    }
Copy the code

The code flow above is basically from checking configuration classes and various system information at initialization to Map, and then getting consumer information, Refer and ProxyFactory.getProxy. Here’s what the refer and getProxy do.

Consumers create Invoker

  • In the consumer case Invoker is created from Protocol, the refer method, which in turn includes DubboProtocol and RegistryProtocol, meaning Invoker is created with or without a registry.

DubboProtocol

public class DubboProtocol extends AbstractProtocol {
    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

    / * * * getClients. This method is used to get the client instance */
    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean service_share_connect = false;
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (service_share_connect) {
                clients[i] = getSharedClient(url);
            } else{ clients[i] = initClient(url); }}returnclients; }}Copy the code

RegistryProtocol

public class RegistryProtocol implements Protocol {
    
    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // Take the registry parameter value and set it to the protocol header
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        // Get the registry instance
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // The URL is converted to Map
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        // group= @dubboReference (group=)
        String group = qs.get(Constants.GROUP_KEY);
        if(group ! =null && group.length() > 0) {
            // To merge two groups, go to the cluster routing layer and let this layer choose which group to distribute
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                returndoRefer(getMergeableCluster(), registry, type, url); }}return doRefer(cluster, registry, type, url);
    }

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // Create an instance
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        // Set up the registry instance
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        // Generate service consumer links
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        // Register service consumers, new node in the consumers directory
        if(! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY,true)) {
            URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
            registry.register(registeredConsumerUrl);
            directory.setRegisteredConsumerUrl(registeredConsumerUrl);
        }
        // The current consumer sends a subscribed message to the registry, which notifies the concerned producer dynamically to the consumer
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        // A registry can have multiple service providers, so you need to consolidate multiple service providers into one
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        returninvoker; }}Copy the code

Producer stop

  • You’ve certainly seen a lot of proxy destruction when a producer goes offline, but none of it is an actual implementation class
/**
     * Get proxy.
     *
     * @param cl  class loader.
     * @param ics interface class array.
     * @return Proxy instance.
     */
    public static Proxy getProxy(ClassLoader cl, Class
       ... ics) {
        if (ics.length > 65535)
            throw new IllegalArgumentException("interface limit exceeded");

        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < ics.length; i++) {
            String itf = ics[i].getName();
            if(! ics[i].isInterface())throw new RuntimeException(itf + " is not a interface."); Class<? > tmp =null;
            try {
                tmp = Class.forName(itf, false, cl);
            } catch (ClassNotFoundException e) {
            }

            if(tmp ! = ics[i])throw new IllegalArgumentException(ics[i] + " is not visible from class loader");

            sb.append(itf).append('; ');
        }

        // use interface class name list as key.
        String key = sb.toString();

        // get cache by class loader.
        // Get the Map for the class load. If not, create a new class loader
        // ProxyCacheMap uses WeakHashMap. The entries in this class are weak references and there is not enough memory for dubbo to burst the memory
        Map<String, Object> cache;
        synchronized (ProxyCacheMap) {
            cache = ProxyCacheMap.get(cl);
            if (cache == null) {
                cache = new HashMap<String, Object>();
                ProxyCacheMap.put(cl, cache);
            }
        }

        Proxy proxy = null;
        synchronized (cache) {
            do{... Get it from cache, forget about it}}long id = PROXY_CLASS_COUNTER.getAndIncrement();
        String pkg = null;
        ClassGenerator ccp = null, ccm = null;
        try {
            ccp = ClassGenerator.newInstance(cl);

            Set<String> worked = new HashSet<String>();
            List<Method> methods = new ArrayList<Method>();

            for (int i = 0; i < ics.length; i++) {
                if(! Modifier.isPublic(ics[i].getModifiers())) { String npkg = ics[i].getPackage().getName();if (pkg == null) {
                        pkg = npkg;
                    } else {
                        if(! pkg.equals(npkg))throw new IllegalArgumentException("non-public interfaces from different packages");
                    }
                }
                ccp.addInterface(ics[i]);

                for (Method method : ics[i].getMethods()) {
                    String desc = ReflectUtils.getDesc(method);
                    if (worked.contains(desc))
                        continue;
                    worked.add(desc);

                    intix = methods.size(); Class<? > rt = method.getReturnType(); Class<? >[] pts = method.getParameterTypes(); StringBuilder code =new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                    for (int j = 0; j < pts.length; j++)
                        code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                    code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                    if(! Void.TYPE.equals(rt)) code.append(" return ").append(asArgument(rt, "ret")).append(";"); methods.add(method); ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString()); }}if (pkg == null)
                pkg = PACKAGE_NAME;

            // create ProxyInstance class.
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
            ccp.addConstructor(Modifier.PUBLIC, newClass<? >[]{InvocationHandler.class},newClass<? > [0]."handler=$1;"); ccp.addDefaultConstructor(); Class<? > clazz = ccp.toClass(); clazz.getField("methods").set(null, methods.toArray(new Method[0]));

            // create Proxy class.
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            ccm.setSuperClass(Proxy.class);
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }"); Class<? > pc = ccm.toClass(); proxy = (Proxy) pc.newInstance(); }catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        } finally {
            // release ClassGenerator
            if(ccp ! =null)
                ccp.release();
            if(ccm ! =null)
                ccm.release();
            synchronized (cache) {
                if (proxy == null)
                    cache.remove(key);
                else
                    cache.put(key, newWeakReference<Proxy>(proxy)); cache.notifyAll(); }}return proxy;
    }
Copy the code
  • CCP and CCM are easy to confuse. CCP produces proxy classes for implementing classes, while CCM produces proxy classes.

Problems encountered

I didn’t understand why every time a producer restarts, I will report that there are no available producers.

  • In the following figure, when the consumer starts up and the registry notifies the consumer that the producer is available, go back and refresh Invoker. In particular, the class ExchangeClient mentioned above is refreshed. You can see the properties of this class on Invoker. When a consumer calls a producer, it checks whether the client has a producer and raises an RPC exception if it does not. When the consumer is notified, he goes back to create the client, but when the registry notifies the consumer, the process depends on the registry’s load, and it will be notified very quickly when the registry is idle or the number of connections is low.

  • The following image shows how the registry notifies consumers immediately when a producer is out of service. In my case, the local test was instantaneous, but I’m not sure, so I might want to see how Nacos handles producer offline. When the notification goes offline, the client in the Invoker will be emptied and the call will naturally be RPC exception. So while scrolling, you can wait one minute before deactivating another node.

summary

Use Dubbo from now to watch his source, found a lot details before you don’t understand or don’t understand, slowly he completed a process look like this design very much, but all calls are so ugly and easy to expose information into a url form, but the Dubbo usually belong to the internal call, will not be the network to see their is no so-called. If you need more details about the call, you can look into it later and fill in the blanks.