Welcome to the public number [sharedCode] committed to mainstream middleware source analysis, you can contact me directly

The blogger’s personal website: www.shared-code.com

preface

In the previous article, we introduced the initialization of ServiceBean. Each exposed service has an instance of ServiceBean, and in this class there is a method that exposes the service.

The source code analysis of this article will be very in-depth, from the parameter analysis of the service, to how to connect to ZooKeeper, and finally at the start of port 20880, start dubbo service. All of the source code analysis is here, so this article is for those who want to have a deep understanding of Dubbo’s source code.

Exposure to entry

@Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
      	// The service is not lazily loaded && the service is not published && the service is not offline. Service exposure occurs when these three conditions are met
        if(isDelay() && ! isExported() && ! isUnexported()) {if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
          	// Service exposureexport(); }}Copy the code

When the service is not lazily loaded, the export method is called to expose the service, which is a method of its parent class, in ServiceConfig.

ServiceConfig#export

public synchronized void export(a) {
  		// The information provided by the service is not empty,
        if(provider ! =null) {
            if (export == null) {
              	// Set the publishing information
                export = provider.getExport();
            }
            if (delay == null) {
              	// Get delay informationdelay = provider.getDelay(); }}// if export is false, do not publish, return directly.
        if(export ! =null && !export) {
            return;
        }
		// The delay exposure setting is greater than 0
        if(delay ! =null && delay > 0) {
          	// Start a timed thread and execute the doExport method after the delay.
            delayExportExecutor.schedule(new Runnable() {
                @Override
                public void run(a) {
                    // Service exposure
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
          	// Service exposuredoExport(); }}Copy the code

Description:

Through the above code, we have the basis to draw a conclusion

1. You can set the delay property to expose service delay. The unit of delay is millisecond.

2. If the delay property is not set or the delay value is -1, the service will be exposed when the Spring context is refreshed.

doExport

protected synchronized void doExport(a) {
  		// Unexported indicates that the service is offline.
        if (unexported) {
            throw new IllegalStateException("Already unexported!");
        }
        // Exported = true to serve and expose without republishing
        if (exported) {
            return;
        }
        exported = true; // Set the exposure property to true,
  		// The service interface to be exposed cannot be empty.
        if (interfaceName == null || interfaceName.length() == 0) {
            throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
        }
  		// Check the default configuration, provider configuration.
        checkDefault();
  		// Provider property check
        if(provider ! =null) {
            if (application == null) {
                application = provider.getApplication();
            }
            if (module= =null) {
                module = provider.getModule();
            }
            if (registries == null) {
                registries = provider.getRegistries();
            }
            if (monitor == null) {
                monitor = provider.getMonitor();
            }
            if (protocols == null) { protocols = provider.getProtocols(); }}// Check module properties
        if (module! =null) {
            if (registries == null) {
                registries = module.getRegistries();
            }
            if (monitor == null) {
                monitor = module.getMonitor(); }}// global application applies attribute checking
        if(application ! =null) {
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) { monitor = application.getMonitor(); }}//ref is the service exposed implementation class
  		// GenericService is an implementation of the dubbo generalization call.
  		// The generic interface implementation is mainly used when there is no API interface and model class element on the server side. All POJOs in parameters and return values are represented by Map, which is usually used for framework integration.
		// For example, implement a generic Mock framework for remote services that handles all service requests by implementing the GenericService interface.
        if (ref instanceof GenericService) {
            interfaceClass = GenericService.class;
            if(StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); }}else {
            try {
              	// Instantiate the exposed service interface instance through reflection
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
          	// Check the method
            checkInterfaceAndMethods(interfaceClass, methods);
          	// Check whether the implementation class of the service is consistent with the current service
            checkRef();
            generic = Boolean.FALSE.toString();
        }
        // If the value is set to true, the default proxy class name is used, that is, the interface name + Local suffix, the service interface client Local proxy class name, which is used to perform Local logic on the client, such as Local caching, etc. The Local proxy class construction function must allow the passing of remote proxy objects, constructors such as: public XxxServiceLocal(XxxService xxxService)
        if(local ! =null) {
            if ("true".equals(local)) {
                local = interfaceName + "Local"; } Class<? > localClass;try {
                localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            if(! interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface "+ interfaceName); }}// Dubbo's local stub configuration. http://dubbo.apache.org/zh-cn/docs/user/demos/local-stub.html
        if(stub ! =null) {
            if ("true".equals(stub)) {
                stub = interfaceName + "Stub"; } Class<? > stubClass;try {
              	// Get an instance of the local stub class
                stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
          	// Local stub class check
            if(! interfaceClass.isAssignableFrom(stubClass)) {throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface "+ interfaceName); }}// Check the configuration.
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
  		// Execute service exposure. The following process mainly follows this approach
        doExportUrls();
        ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
        ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
    }
Copy the code

Step description:

1. Check the configuration

2. Check the local, stub, and mock modes.

3. Run doExportUrls to expose services.

The above method, the main role is to check the configuration. The main logic of service exposure is not here.

doExportUrls

private void doExportUrls(a) {
  		Dubbo can support multiple registries
        List<URL> registryURLs = loadRegistries(true);
  		// Dubbo can also support multiple Protocols
        for (ProtocolConfig protocolConfig : protocols) {
          	// Expose servicedoExportUrlsFor1Protocol(protocolConfig, registryURLs); }}Copy the code

The loadRegisteries method returns the following registered address:

registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService? application=dubbo-service&Dubbo = 2.6.2&pid=20748&registry=zookeeper&timestamp=1537240877173
# #The application of&The version number&The application of PID&Registration type&The time stampCopy the code

If there are multiple registered addresses, multiple will be returned.

doExportUrlsFor1Protocol

private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
  		// Get the protocol name,
        String name = protocolConfig.getName();
        if (name == null || name.length() == 0) {
          	// If not set, the default is dubbo
            name = "dubbo";
        }

        Map<String, String> map = new HashMap<String, String>();
  		// side , provider
        map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
  		// Dubbo, version number
        map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
  		// timestamp, current timestamp
        map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
        if (ConfigUtils.getPid() > 0) { // Get the application PID,
          	/ / set
            map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
        }
  		// Concatenate parameters, the following five lines of code are concatenate parameters, write the parameters to the map set
        appendParameters(map, application);
        appendParameters(map, module);
        appendParameters(map, provider, Constants.DEFAULT_KEY);
        appendParameters(map, protocolConfig);
        appendParameters(map, this);
        if(methods ! =null && !methods.isEmpty()) {
         	// Methods of dealing with...
        }

        if (ProtocolUtils.isGeneric(generic)) {
            map.put(Constants.GENERIC_KEY, generic);
            map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
        } else {
          	// Dubbo's version
            String revision = Version.getVersion(interfaceClass, version);
            if(revision ! =null && revision.length() > 0) {
                map.put("revision", revision);
            }
			// The method to get the service interface
            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
              	// The number of methods is empty
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
            } else {
              	// The number of methods is greater than 0
                map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); }}// The token is not empty
        if(! ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {
                map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
            } else{ map.put(Constants.TOKEN_KEY, token); }}Injvm = injvm; injvm = injvm
        if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
          	// Set the registration property to false
            protocolConfig.setRegister(false);
            map.put("notify"."false");
        }
        // export service
        String contextPath = protocolConfig.getContextpath();
        if ((contextPath == null || contextPath.length() == 0) && provider ! =null) {
            contextPath = provider.getContextpath();
        }
		/ / get the IP
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
  		// Get the port
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
  		//-----------------------------------------------------------------------------------------------------
  		// The above code is used to assemble the map parameter, which is used to concatenate the URL /
        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
		
        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
          	// 
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
		// The publishing scope of the service
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // If none is configured, the service will not be published
        if(! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// If scope is not equal to remote, local publishing is done
            if(! Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {// Local exposure
                exportLocal(url);
            }
            // If scope is not equal to local, remote publishing is done
            if(! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
              	// Register URL is not empty
                if(registryURLs ! =null && !registryURLs.isEmpty()) {
                  	// Publish circularly
                    for (URL registryURL : registryURLs) {
                      	// 
                        url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
                        URL monitorUrl = loadMonitor(registryURL);
                        if(monitorUrl ! =null) {
                            url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
                        }
                        if (logger.isInfoEnabled()) {
                            logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                        }
                      	// Get proxFactory to generate invokerInvoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));// Service exposed invoker
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
					 	// Call Protocol for service publishing
                        Exporter<?> exporter = protocol.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                } else{ Invoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker =new DelegateProviderMetaDataInvoker(invoker, this); Exporter<? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter); }}}this.urls.add(url);
    }
Copy the code

The above code is long, so to summarize.

In the code, I’ve separated the two parts of the code using the middle bar, the part above the middle bar that collects the parameters, puts the parameters in the map, and generates the URL from the map,

The url is as follows:

dubbo:/ / 192.168.59.3:20880 / com. Dubbo. Service. User. UserFacadeService? Anyhost = true&application = dubbo - service&bind. IP = 192.168.59.3 & bind. The port = 20880 & dubbo = 2.6.2 & generic = false&interface = com. Dubb o.service.user.UserFacadeService&methods=getUser&pid=20748&side=provider&stub=com.dubbo.service.user.UserFacadeServiceSt ub&timestamp=1537240877178
Copy the code

Contains all the parameters for the service. Under the bar, dubbo is about to launch the service.

There is an important property, **scope **, which has three Settings.

Local: only local exposure remote: only remote exposure None: no service exposureCopy the code

In the above code, the value of scope is null in the actual case, so it does both local and remote exposure, and both.

Local exposure is not covered in this article for the time being. The focus is on remote exposure, but there is one caveat:

** Local exposure in the same JVM is preferentially selected, such as consumers and producers running in the same Tomcat, to call the locally exposed service **

The service exposes the three most important lines of code

// Get proxFactory to generate invokerInvoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded (Constants.EXPORT_KEY, url.toFullString()));// Service exposed invoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// Call Protocol for service publishingExporter<? > exporter = protocol.export(wrapperInvoker);Copy the code

Generate the Invoker using registryURL as follows:

registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService? Application = dubbo - service&dubbo = 2.6.2 & pid = 10588 & registry = zookeeper&timestamp = 1537255304679
Copy the code

This is where the protocol type for Invoker is established. The protocol type in Invoker is Register, not Dubbo. Because this is used to publish services

ProxyFactory is an extended implementation class generated by Dubbo using SPI extension mechanism. The default implementation is Javassist. After generating Invoker, bind Invoker and ServiceConfig. Generate DelegateProviderMetaDataInvoker, strengthen it.

The export method of Protocol is called to publish the service.

Protocol is an extended proxy class generated through SPI. The code in Export is as follows

public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
  		// Get the current protocol type, whether it is dubbo or something else, such as the local service publication, passed in register, used for registration.
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader
                                            (com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
Copy the code

The extension instance structure returned here is as follows: ProtocolListenerWrapper > ProtocolFilterWrapper > QosProtocolWrapper > RegistryProtocol

ProtocolListenerWrapper ProtocolFilterWrapper QosProtocolWrapper (ProtocolListenerWrapper) And the ** order of the three classes is inconsistent.

In my debugging, the Protocol decorator classes are executed in the following order:

QosProtocolWrapper > ProtocolFilterWrapper >QosProtocolWrapper >RegistryProtocol

QosProtocolWrapper

For the registration operation, enable dubbo qos-server online o&M platform, default port: 22222

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
      	// Check whether it is a registration operation.
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
          	// Enable the online O&M platform of Dubbo. By default, the port is occupied
            startQosServer(invoker.getUrl());
            return protocol.export(invoker);
        }
        return protocol.export(invoker);
    }
Copy the code

ProtocolListenerWrapper

Add a listener to Invoker to remove the registration operation

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
  // Check whether it is a registration operation.
  if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
    return protocol.export(invoker);
  }
  // 
  return new ListenerExporterWrapper<T>(protocol.export(invoker),
                                        Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                                                                     .getActivateExtension(invoker.getUrl(), 	Constants.EXPORTER_LISTENER_KEY)));
}
Copy the code

ProtocolFilterWrapper

Remove the registration operation and add a filter to Invoker

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
      	// Check whether it is a registration operation.
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
      	// buildInvokerChain builds the filter chain.
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
Copy the code

RegistryProtocol

@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // Open dubbo's
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
		// Obtain the zooKeeper address
        URL registryUrl = getRegistryUrl(originInvoker);

        // Connect to ZooKeeper and establish a long-term connection
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);

        // Get the register attribute to determine whether to register immediately
        boolean register = registedProviderUrl.getParameter("register".true);

        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);

        if (register) {
          	/ / register
            register(registryUrl, registedProviderUrl);
            ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
        }

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
    }
Copy the code

With the exception of doLocalExport, the rest of the code interacts with ZooKeeper, registering urls, subscribing to resources, and so on.

As we know, the dubbo consumer calls the producer’s methods, which by default communicate via Netty, and the default port is 20880, and a NetTY service is started inside the producer. This operation is done in the doLocalExport method.

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
  		// key = dubbo protocol URL
        String key = getCacheKey(originInvoker);
  		// Get the decorator class for Dubbo's protocol Export
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
  		// If the value is empty, the dubbo service is not enabled.
        if (exporter == null) {
            synchronized (bounds) { / / synchronization locks
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) { // Double check
                    finalInvoker<? > invokerDelegete =new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
                  	// Here we start using protocol again, but note that this is invokerDelegete
                    exporter = newExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); }}}return exporter;
    }
Copy the code

The protocol type in invokerDelegete is Dubbo, not register.

So the execution chain for protocol.export() is as follows: ProtocolListenerWrapper > ProtocolFilterWrapper > QosProtocolWrapper > DubboProtocol

The order of the first three is uncertain. Eventually, DubboProtocol is executed.

DubboProtocol

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
      	// Get the URL of the current service
        URL url = invoker.getUrl();

        / / service for the key, the key = com. The dubbo. Service. User. UserFacadeService: 20880
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if(isStubSupportEvent && ! isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded.")); }}else{ stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); }}// Enable the netty service port, which is dubbo 20880, and is ready to receive requests.
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }
Copy the code

Description:

The URL format of the service is as follows:

Dubbo: / / 192.168.59.3:20880 / com. Dubbo. Service. User. UserFacadeService? anyhost=true&application=dubbo-service&Bind the IP = 192.168.59.3&bind.port=20880&Dubbo = 2.6.2&generic=false&interface=com.dubbo.service.user.UserFacadeService&methods=getUser&pid=20748&side=provider&stub=com.dubbo.service.user.UserFacadeServiceStub&timestamp=1537240877178
Copy the code

Dubbo Protocol Service Exposure flowchart:

Dubbo service exposes collation execution chain:

Zookeeper directory:

Welcome to the public number [sharedCode] committed to mainstream middleware source analysis, you can contact me directly

The blogger’s personal website: www.shared-code.com