// This method is triggered when the Spring container is refreshed
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
    // If the current service is not exposed and has not been unexposed, the service is exposed
    if(! isExported() && ! isUnexported()) {if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
        // Service exposureexport(); }}public void export(a) {
    // Service exposure
    super.export();
    // Publish ServiceBeanExportedEvent
    publishExportEvent();
}
Copy the code

ServiceConfig.java

public synchronized void export(a) {
    // Check all registry availability and further refine the 
       configuration
    checkAndUpdateSubConfigs();

    // If export is set to false, no service exposure is performed
    if(! shouldExport()) {return;
    }
    // If the delay attribute is set, delay exposure is performed
    if (shouldDelay()) {
        // Use thread pools (bottom layer implemented by deferred blocking queues). Timed delay exposure service
        DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
    } else{ doExport(); }}// Execute service exposure business
protected synchronized void doExport(a) {
    // If cancelled exposure is detected, throw error
    if (unexported) {
        throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
    }
    // If the service has been exposed
    if (exported) {
        return;
    }
    // Modify the service exposure state variable
    exported = true;

    / / URL to constitute: protocol: / / host: port/path? metadata
    // If no path attribute is specified in 
      , the value of interface attribute is taken
    if (StringUtils.isEmpty(path)) {
        path = interfaceName;
    }
    // Combine the registry with the service Exposure protocol for service exposure
    doExportUrls();
}

 private void doExportUrls(a) {
        // Handle all registries as standard urlization
        List<URL> registryURLs = loadRegistries(true);
        // Traverses all service exposure protocols supported by the current service
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
            ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
            ApplicationModel.initProviderModel(pathKey, providerModel);
            // Expose the service to all registries using the service exposure protocol currently traverseddoExportUrlsFor1Protocol(protocolConfig, registryURLs); }}Copy the code

AbstractInterfaceConfig.java

// Exposure registry
// Make all registries standard urlized to return a list
// Because there may be multiple 
      
protected List<URL> loadRegistries(boolean provider) {
    // check && override if necessary
    List<URL> registryList = new ArrayList<URL>();
    // If the 
       tag is configured in the configuration file, the url of all registries is obtained
    // Since Dubbo supports multiple registries, there may be multiple 
       tags
    if (CollectionUtils.isNotEmpty(registries)) {
        // Iterate over all 
       tags
        for (RegistryConfig config : registries) {
            // Get the address attribute of 
      
            String address = config.getAddress();
            // If no address is specified, an address that matches all IP addresses is returned
            if (StringUtils.isEmpty(address)) {
                // If it is empty, use the default address
                address = ANYHOST_VALUE;
            }
            // If address is not equal to N/A, it is not direct
            if(! RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {// Define a map to store attributes and other attribute values from various tags,
                // These values will appear as metadata in the URL in the future
                Map<String, String> map = new HashMap<String, String>();
                // Write attributes from the 
       tag to the map
                appendParameters(map, application);
                // Write attributes in the 
       tag to map
                appendParameters(map, config);
                map.put(PATH_KEY, RegistryService.class.getName());
                // Write some run-time parameter values to map
                // It writes the following
                // 1: version number of Dubbo
                // 2:Dubbo protocol number
                // 3: indicates the current running timestamp and the current process IP address
                appendRuntimeParameters(map);
                if(! map.containsKey(PROTOCOL_KEY)) { map.put(PROTOCOL_KEY, DUBBO_PROTOCOL); }// Get all registry urls resolved by the current address attribute
                List<URL> urls = UrlUtils.parseURLs(address, map);

                for (URL url : urls) {
                    // Standardisation of URLS: Change all registry addresses to the following form
                    //host:port/... .RegistryService? . & registry = zookeeper &...
                    url = URLBuilder.from(url)
                            .addParameter(REGISTRY_KEY, url.getProtocol())
                            .setProtocol(REGISTRY_PROTOCOL)
                            .build();
                    if ((provider && url.getParameter(REGISTER_KEY, true) | | (! provider && url.getParameter(SUBSCRIBE_KEY,true))) {
                        registryList.add(url);
                    }
                }
            }
        }
    }
    return registryList;
}
Copy the code

ServiceConfig.java

// Expose service
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List
       
         registryURLs)
        {
        // Get the service exposure protocol
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            // If no protocol is specified, dubbo is used by default
            name = DUBBO;
        }

        // Create a map(for the same reason you exposed the registry) to hold various attributes that will be used as metadata in the URL
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, PROVIDER_SIDE);

        appendRuntimeParameters(map);
        appendParameters(map, metrics);
        appendParameters(map, application);
        appendParameters(map, module);
        // remove 'default.' prefix for configs from ProviderConfig
        // appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, provider);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        if (CollectionUtils.isNotEmpty(methods)) {
            
       
        
       
      
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries"."0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (CollectionUtils.isNotEmpty(arguments)) {
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        if(argument.getType() ! =null && argument.getType().length() > 0) {
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if(methods ! =null && methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if(methodName.equals(method.getName())) { Class<? >[] argtypes = methods[i].getParameterTypes();// one callback in the method
                                        if(argument.getIndex() ! = -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:"+ argument.getType()); }}else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) { Class<? > argclazz = argtypes[j];if (argclazz.getName().equals(argument.getType())) {
                                                    appendParameters(map, argument, method.getName() + "." + j);
                                                    if(argument.getIndex() ! = -1&& argument.getIndex() ! = j) {throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if(argument.getIndex() ! = -1) {
                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("Argument config must set index or type attribute.eg: 
       or 
      
       "
      ); }}}}// end of methods for
        }
        // Whether to enable service generalization
        if (ProtocolUtils.isGeneric(generic)) {
            map.put(GENERIC_KEY, generic);
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if(revision ! =null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); }}if(! ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {
                map.put(TOKEN_KEY, UUID.randomUUID().toString());
            } else{ map.put(TOKEN_KEY, token); }}// export service
        // Get the host and port
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
        / / form service exposure URL protocol: / / host: port/path? Metadata generated by key-value in the map
        URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

        // We can extend the functionality of Dubbo by customizing one of the SPI interface implementation classes
        // Reconfigure existing extension classes: Simply define the implementation class for the ConfiguratorFactory SPI interface to reconfigure
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {  // Determine whether there is a dubbo protocol extension class
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }

        // Get the scope property in 
      
        String scope = url.getParameter(SCOPE_KEY);
        // don't export when none is configured
        // If scope is not None, the service is exposed
        if(! SCOPE_NONE.equalsIgnoreCase(scope)) {// export to local if the config is not remote (export to remote only when config is remote)
            // If the value of scope is not remote, local exposure is performed
            if(! SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url);// Local exposure
            }
            // export to remote if the config is not local (export to local only when config is local)
            // If scope is not local, remote exposure is performed
            if(! SCOPE_LOCAL.equalsIgnoreCase(scope)) {if(! isOnlyInJvm() && logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    // Iterate through all registries
                    for (URL registryURL : registryURLs) {
                        //if protocol is only injvm ,not register
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }

                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if(monitorUrl ! =null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }

                        // For providers, this is used to enable custom proxy to generate invoker
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }

                        // Generate the invoker, which uses a URL containing registry data and provider dataInvoker<? > invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));// Encapsulates the original invoker again, containing metadata from the 
       tag
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        Complete three tasks: register with ZK, generate and write to a cache map, and create and start a Netty Server.
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else {  // Handle the case where there is no registry, i.e. direct connectionInvoker<? > invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this);
                    Perform two tasks: generate my exporter and write to a cache map, and create and start a Netty Server.Exporter<? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter); }/ * * *@since 2.7.0
                 * ServiceData Store
                 */
                MetadataReportService metadataReportService = null;
                if((metadataReportService = getMetadataReportService()) ! =null) { metadataReportService.publishProvider(url); }}}this.urls.add(url);
    }
Copy the code

Start by tracing the **exportLocal()** method in serverConfig.java. This method is to perform local service exposure.

At this point, a proxy object is generated. Let’s copy the code we created. Put it in the demo project package org.apache.dubo.rpc; Under the path

Generated code

package org.apache.dubbo.rpc;

import org.apache.dubbo.common.extension.ExtensionLoader;

public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
    public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg2;
        // You can see that javassist is used here for dynamic proxy
        String extName = url.getParameter("proxy"."javassist");
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        // Proxy objects created using Javassit
        // StubProxyFactoryWrapper.java -> getInvoker
        return extension.getInvoker(arg0, arg1, arg2);
    }

    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy"."javassist");
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0, arg1);
    }

    public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = url.getParameter("proxy"."javassist");
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.ProxyFactory) name from url (" + url.toString() + ") use keys([proxy])");
        org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        returnextension.getProxy(arg0); }}Copy the code

The Invoker obtained is then enhanced with a Wrapper. Also in an adaptive way of generating code

package org.apache.dubbo.rpc;

import org.apache.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    public void destroy(a) {
        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort(a) {
        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }

    public org.apache.dubbo.rpc.Invoker refer(Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        Injvm protocol is currently required for wrapper enhancement
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        // Specify to load the injvm extension class
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader
                .getExtensionLoader(org.apache.dubbo.rpc.Protocol.class)
                .getExtension(extName);
        // ProtocolListenerWrapper.java - > export
        returnextension.export(arg0); }}Copy the code

ProtocolListenerWrapper.java

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    / / REGISTRY_PROTOCOL = registry. If the current protocol is registered
    if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
     Export (invoker) ¶
    return new ListenerExporterWrapper<T>(protocol.export(invoker),
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
Copy the code

InjvmProtocol.java

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    // Url returns Injvm or Exporte
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
Copy the code

InjvmExporter.java

class InjvmExporter<T> extends AbstractExporter<T> {

    private final String key;

    private finalMap<String, Exporter<? >> exporterMap; InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<? >> exporterMap) {super(invoker);
        // key: org.apache.dubbo.demo.DemoService
        this.key = key;
        // Cache Injvm asynchronous conversion objects to the Map of the current protocol
        // Note that not all of the protocol's exporters are in this Map, and each of the different exposed protocols has its own cache Map
        this.exporterMap = exporterMap;
        exporterMap.put(key, this);
    }

    @Override
    public void unexport(a) {
        super.unexport(); exporterMap.remove(key); }}Copy the code

Next, we go back to ServerConfig.java to continue tracing the locally exposed source code

Here is the remote exposure, which will load the extension code we just generated above.

The registry exposure protocol is loaded first, and the process is the same. Go ProtocolListenerWrapper. Java – > export. Same process as above for local exposure.

ProtocolListenerWrapper.java

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    / / REGISTRY_PROTOCOL = registry. If the current protocol is registered
    if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        // Trace here
        return protocol.export(invoker);
    }
    Export (invoker) ¶
    return new ListenerExporterWrapper<T>(protocol.export(invoker),
            Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                    .getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
Copy the code

RegistryProtocol.java

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    // Registry URL exposed
    URL registryUrl = getRegistryUrl(originInvoker);
    // Service provider URL exposed
    URL providerUrl = getProviderUrl(originInvoker);

    // Subscribe the override data
    // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
    // the same service. Because the subscribed is cached key with the name of the service, it causes the
    // subscription information to cover.
    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    // Register a listener for the Invoker
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //export invoker
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    // url to registry
    // Get the registry instance
    final Registry registry = getRegistry(originInvoker);
    final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
    ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
            registryUrl, registeredProviderUrl);
    //to judge if we need to delay publish
    boolean register = registeredProviderUrl.getParameter("register".true);
    if (register) {
        // Perform registration ---- follow here
        register(registryUrl, registeredProviderUrl);
        providerInvokerWrapper.setReg(true);
    }

    // Deprecated! Subscribe to override rules in 2.6.x or before.
    / / subscribe
    registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

    exporter.setRegisterUrl(registeredProviderUrl);
    exporter.setSubscribeUrl(overrideSubscribeUrl);
    //Ensure that a new exporter instance is returned every time export
    return new DestroyableExporter<>(exporter);
}
Copy the code

AbstractRegistryFactory.java

@Override
public Registry getRegistry(URL url) {
    url = URLBuilder.from(url)
            .setPath(RegistryService.class.getName())
            .addParameter(INTERFACE_KEY, RegistryService.class.getName())
            .removeParameters(EXPORT_KEY, REFER_KEY)
            .build();
    String key = url.toServiceStringWithoutResolving();
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        // Get it from the cache
        / / key: zookeeper: / / 127.0.0.1:2181 / org. Apache. Dubbo. Registry. RegistryService
        // Is the full path qualified name of the interface
        Registry registry = REGISTRIES.get(key);
        if(registry ! =null) {
            return registry;
        }
        //create registry by spi/ioc
         // We are registered here, so trace - "ZookeeperRegistryFactory"
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lockLOCK.unlock(); }}Copy the code

ZookeeperRegistryFactory.java

public Registry createRegistry(URL url) {
    // The registry URL, and the ZK client
    return new ZookeeperRegistry(url, zookeeperTransporter);
}
Copy the code

ZookeeperRegistry.java

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
    if(! group.startsWith(PATH_SEPARATOR)) { group = PATH_SEPARATOR + group; }this.root = group;
    // Establish a connection with zK
    zkClient = zookeeperTransporter.connect(url);
    
    zkClient.addStateListener(state -> {
        if (state == StateListener.RECONNECTED) {
            try {
                recover();
            } catch(Exception e) { logger.error(e.getMessage(), e); }}}); }Copy the code

We are following up on the reading below

DubboProtocol.java

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    String key = serviceKey(url);
    // Create a service exposure object of type Dubbo and write it to the cache map
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if(isStubSupportEvent && ! isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded.")); }}else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }

    openServer(url);  // Create and start a Netty Server
    optimizeSerialization(url);

    return exporter;
}


 private void openServer(URL url) {
        // find server.
        // The value of key is the IP of the current provider and the port of the current service exposure protocol
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        // Is a service provider
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        // an ExchangeServer is only responsible for the same-asynchronous conversion of one NettyServer
                        // an ExchangeServer corresponds to only one NettyServer
                        // The number of nettyServers depends on the protocol type of the provider
                        // Number of providers * Number of protocols = number of NettyServersserverMap.put(key, createServer(url)); }}}else {
                // server supports reset, use together with override
                / / reset URL
                // When there are multiple interface implementation classes in the same service, the protocol is the same, because NettyServer has been cached previously.
                // Concatenates the following URL into the first URL. Update the first URL
                // Reset the URLserver.reset(url); }}}// Create a server NettyServer binding
 private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);

        if(str ! =null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            // Connect to address binding
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ")" + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if(str ! =null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if(! supportedTypes.contains(str)) {throw new RpcException("Unsupported client type: "+ str); }}return server;
    }
Copy the code

NettyServer.java

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
    // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
    // the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
    super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
Copy the code

AbstractServer.java

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();

    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = ANYHOST_VALUE;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
    try {
        doOpen();  // Create and start the Netty Server
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export "+ getLocalAddress()); }}catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null."Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    //fixme replace this with better method
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
Copy the code

NettyServer.java

protected void  doOpen(a) throws Throwable {
    bootstrap = new ServerBootstrap();
    // Process the Group of connections
    bossGroup = new NioEventLoopGroup(1.new DefaultThreadFactory("NettyServerBoss".true));
    // Group that processes IO services
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker".true));
    // The provider URL
    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                            / / decoder
                            .addLast("decoder", adapter.getDecoder())
                            / / encoder
                            .addLast("encoder", adapter.getEncoder())
                            // Read/write idle monitor
                            .addLast("server-idle-handler".new IdleStateHandler(0.0, idleTimeout, MILLISECONDS))
                            // Dubbo's custom processor
                            .addLast("handler", nettyServerHandler); }});// bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}
Copy the code