background

Service exposure online has a lot of articles, large and complete, we mainly catch details here 😄.

doubt

What does the exposure process do?

Should I start the service or connect to the registry first?

How does a service offline sense the registry?

exposed

We from org. Apache. Dubbo. Config. ServiceConfig# doExportUrls () method

private void doExportUrls() { List<URL> registryURLs = loadRegistries(true); <dubbo:protocol name="dubbo" port="20880"/> / Port ="20881"/> // like this, if there is a PHP client and dubbo client can both support for (ProtocolConfig ProtocolConfig: protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); }}Copy the code

Enter doExportUrlsFor1Protocol(), this method we must take a look, and our code is similar, method length is too long, and loop nesting is very deep.

//org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); If (stringutils.isempty (name)) {// No protocol is configured, default dubbo name = dubbo; } Map<String, String> map = new HashMap<String, String>(); map.put(SIDE_KEY, PROVIDER_SIDE); AppendRuntimeParameters (map); appendRuntimeParameters(map) appendRuntimeParameters(map); appendParameters(map, metrics); appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider); appendParameters(map, protocolConfig); appendParameters(map, this); If (protocolutils.isgeneric (generic)) {// GENERIC_KEY (GENERIC_KEY); map.put(METHODS_KEY, ANY_VALUE); } else {// Version String Revision = version.getVersion (interfaceClass, Version); if (revision ! = null && revision.length() > 0) { map.put(REVISION_KEY, revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { logger.warn("No method found in service interface " + interfaceClass.getName()); map.put(METHODS_KEY, ANY_VALUE); } else { map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); }} // Dubbo supports token verification. Only the correct token can be invoked successfully. ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(TOKEN_KEY, token); } } String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(SCOPE_KEY); // don't export when none is configured if (! SCOPE_NONE.equalsIgnoreCase(scope)) { if (! SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } // export to remote if the config is not local (export to local only when config is local) if (! SCOPE_LOCAL. EqualsIgnoreCase (scope)) {if (CollectionUtils. IsNotEmpty (registryURLs)) {/ / registry also supports multiple, such as services can be exposed to the cluster, Can also expose // service to the center for other lines of business for (URL registryURL: registryURLs) { //if protocol is only injvm ,not register if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { continue; } url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY)); // loadMonitor configuration URL monitorUrl = loadMonitor(registryURL); if (monitorUrl ! = null) { url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString()); } // Invoke the proxy mode for the specific bean, default is Javassist String proxy = url.getParameter(PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(PROXY_KEY, proxy); } // invoker <? > invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // Expose service my friend <? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { Invoker<? > invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } MetadataReportService MetadataReportService = null; if ((metadataReportService = getMetadataReportService()) ! = null) { metadataReportService.publishProvider(url); } } } this.urls.add(url); }Copy the code

Locally exposed exportLocal(URL)

//org.apache.dubbo.config.ServiceConfig#exportLocal private void exportLocal(URL url) { URL local = URLBuilder.from(url) .setprotocol (LOCAL_PROTOCOL)// Received set the protocol to injvm for selecting the corresponding protocol.sethost (LOCALHOST_VALUE).setPort(0).build(); // Exporter<? > exporter = protocol.export( PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); } static Protocol protocol = ExtensionLoader .getExtensionLoader(Protocol.class).getAdaptiveExtension(); static ProxyFactory PROXY_FACTORY = ExtensionLoader .getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();Copy the code

The protocol static variable is an adaptive extension point to the protocol interface. A call to protocol.export(Invoker Invoker) will determine which implementation class to go to based on the Invoker information passed in. The value passed in by invoker is

Proxy_factory.getinvoker (ref, (Class) interfaceClass, local), PROXY_FACTORY static variable is also a ProxyFactory extension point, Javassist =JavassistProxyFactory; JavassistProxyFactory =JavassistProxyFactory; (Ignore the various wrappers here)

@SPI("javassist")
public interface ProxyFactory {
    @Adaptive({"proxy"})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) ;
}

Copy the code

Go to the getInvoker implementation of JavassistProxyFactory.

//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL URL) {// Proxy is our real implementation Class HelloServiceImpl@xxx, HelloService Class CLS = proxy.getClass().getName().indexof ('$') < 0? proxy.getClass() : type; // Wrap HelloServiceImpl as a Wrapper class, and the Wrapper object is formally created with the default Javassist final Wrapper Wrapper = wrapper.getwrapper (CLS); // Return an anonymous inner class object, AbstractProxyInvoker return new AbstractProxyInvoker<T>(proxy, type, AbstractProxyInvoker) url) { @Override protected Object doInvoke(T proxy, String methodName, Class<? >[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); }}; }Copy the code

The above anonymous notation may not be specific enough, but let’s make it more concrete by creating a custom class

//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new MyProxyInvoker(proxy,type,url,wrapper); } public class MyProxyInvoker extends AbstractProxyInvoker { private Wrapper wrapper; public MyProxyInvoker(Object proxy, Class type, URL url, Wrapper wrapper) { super(proxy, type, url); this.wrapper = wrapper; } @Override protected Object doInvoke(Object proxy, String methodName, Class[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); }}Copy the code

The JavassistProxyFactory#getInvoker() method returns the MyProxyInvoker object, which we’ll use later to describe our analysis.

Back to the Exporter
exporter = protocol.export(

PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local)); In, the expression becomes a friend

URL local = URLBuilder.from(url)
        .setProtocol("injvm")
        .setHost(LOCALHOST_VALUE)
        .setPort(0)
        .build();

Copy the code

So the implementation class of protocol.export() is InjvmProtocol

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(),
     exporterMap);
}

Copy the code

The method returns InjvmExporter and finally exporters. Add (exporter), Exposing InjvmExporter (which actually wraps a ListenerExporterWrapper) object to the map completes JVM native exposure.

Remote exposed

Let’s look at the difference between remote exposure

// Return MyProxyInvoker <? > invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString())); / / the difference between local exposure will be packed MyProxyInvoker instance for DelegateProviderMetaDataInvoker DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // the url#protocol in wrapperInvoker is not injvm Exporter<? > exporter = protocol.export(wrapperInvoker); exporters.add(exporter);Copy the code

Let’s dubug see what url#protocol is in wrapperInvoker

The Protocol is registry, so the process goes to RegistryProtocol#export (also there will be Wrapper Wrapper here). We debug this method, this method is too rich, here we will only analyze the service exposure

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // Get the URL to expose to the registry url providerUrl = getProviderUrl(originInvoker); final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); / / service The following analysis of final ExporterChangeableWrapper < T > exporter = doLocalExport (originInvoker providerUrl); / /... return new DestroyableExporter<>(exporter); }Copy the code
Exposure Service doLocalExport()
//org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport private <T> ExporterChangeableWrapper<T> DoLocalExport (final Invoker<T> originInvoker, URL providerUrl) {// The service to be exposed generates a unique key, Avoid duplicate String key = getCacheKey(originInvoker); // Wrap invoker again, Then return (ExporterChangeableWrapper < T >) bounds.com puteIfAbsent (key, Origininvokerdelegate = new invokerDelegate <>(originInvoker, providerUrl); / / protocol. The export through various Wrapper will enter to Dubbo. The export return new ExporterChangeableWrapper < > ((Exporter < T >) protocol.export(invokerDelegate), originInvoker); }); } private String getCacheKey(final Invoker<? > originInvoker) { URL providerUrl = getProviderUrl(originInvoker); String key = providerUrl.removeParameters("dynamic", "enabled").toFullString(); return key; }Copy the code

Here originInvoker DelegateProviderMetaDataInvoker (MyProxyInvoker (HelloServiceImpl @ xx)), InvokerDelegate packaging for invokerDelegate again (DelegateProviderMetaDataInvoker (MyProxyInvoker (HelloServiceImpl @ xx))), we continue to debug, The ProtocolFilterWrapper# export

@Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER)); } private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (! filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { //..... }; } } return new CallbackRegistrationInvoker<>(last, filters); }Copy the code

BuildInvokerChain * () will InvokerDelegate associated multiple Filter Filter, then packing for CallbackRegistrationInvoker object back, we went on to debug, Finally, DubboProtocol#export(), At this point the invoker CallbackRegistrationInvoker (InvokerDelegate (DelegateProviderMetaDataInvoker (MyProxyInvoker (HelloServiceImpl @ x x))))*

//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); / / generate the service key = com. Poizon. Study. API. Service. HelloService: 20880, has nothing to do with method of String key = the serviceKey (url); / / will be packed CallbackRegistrationInvoker DubboExporter, then stored in the / / the map in the map is the key, DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && ! isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); OpenServer (url) on port 20880; Hessan2 optimizeSerialization(URL); return exporter; } //org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer private void openServer(URL url) { //..... CreateServer () Creates the service ServerMap. put(key, createServer(URL)); } //org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer private ExchangeServer createServer(URL url) { ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } return server; } //org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } //org.... remoting.Transporters#bind(URL, ChannelHandler...) public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); }// Netty4 is selected by default to implement return getTransporter().bind(url, handler); } //org.apache.dubbo.remoting.transport.netty4.NettyTransporter#connect public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } //org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); try { doOpen(); //org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen protected void doOpen() throws Throwable { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); bootstrap = new Bootstrap(); bootstrap.group(nioEventLoopGroup) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .channel(NioSocketChannel.class); if (getConnectTimeout() < 3000) { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000); } else { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout()); } bootstrap.handler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception {Copy the code

There are many familiar configurations, such as IdleStateHandler, the heartbeat implementation, and the default heartbeat time urlutils.getheartBeat (getUrl()). There’s also Netty’s custom handler, nettyClientHandler(yes, this handler handles dubbo consumer requests)

conclusion

To sum up, we went all the way to the end of the socket boot, and finally put DubboExporter into map, The last layer of packing is DestroyableExporter(ExporterChangeableWrapper(ListenerExporterWrapper(DubboExporter(CallbackRegistrationInvoker(InvokerD Elegate (DelegateProviderMetaDataInvoker (MyProxyInvoker HelloServiceImpl @ (xx)))))))); The Wrapper class is designed to extend small functions

Features such as registries and Wrappers will be examined later.