Dubbo, as an RPC framework with a very high occurrence rate, is adopted by a large number of companies because of its proprietary protocol, which makes its remote call process more efficient compared with the remote call implemented by Spring Cloud based on HTTP protocol.

Spring container integration

The server starts the Entry ServiceBean

Let’s first look at the inheritance structure of the entry ServiceBean:

The first thing that catches your eye is the stack of interfaces he implements:

implements InitializingBean,
    DisposableBean, 
    ApplicationContextAware, 
    ApplicationListener, 
    BeanNameAware
Copy the code

This should immediately reflect on the Life cycle of Spring beans. Let’s just look at the relevant entry point here:

  • BeanNameAware

Is in the Bean initialization phase, when the Bean has instantiated and completed property assignment.

  • ApplicationContextAware

Also in the Bean initialization phase, in order to save a context reference to Spring alone.

  • InitializingBean

It is executed during the Bean initialization phase, after xxAware, and after the Bean initializes the preprocessor.

  • DisposableBean

In the Bean destruction phase, some cleanup of the Bean is performed.

Here I just need to focus on the ApplicationContextAware#setApplicationContext and InitializingBean#afterPropertiesSet methods.

Spring context is woven into the point setApplicationContext

  1. Save the Spring context reference inServiceBeanWe’ll use that later.
  2. Associate the Spring context with the Dubbo own container.
SpringExtensionFactory.addApplicationContext(applicationContext);
Copy the code
  1. Set yourself as a Spring container event listener through reflection.
Method method = applicationContext.getClass().getMethod("addApplicationListener".newClass<? >[]{ApplicationListener.class});/ / compatible Spring2.0.1
method.invoke(applicationContext, new Object[] {this});
Copy the code

True service exposure is also triggered by listening to the ContextRefreshedEvent event.

Configure parsing afterPropertiesSet

Configuration resolution and initialization related to service exposure are concentrated in the afterPropertiesSet method. Let’s take a look at the following steps:

Provider Configuration resolution

Service provider Default value. The corresponding configuration class: org. Apache. Dubbo. Config. ProviderConfig. The default value of the dubbo:service and dubbo:protocol labels is set.

Map<String, ProviderConfig> providerConfigMap = applicationContext == null ? 
    null : 
    BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, 
        ProviderConfig.class, false.false); .Copy the code

Application Configuration Resolution

Application information configuration. The corresponding configuration class: org. Apache. Dubbo. Config. ApplicationConfig

Map<String, ApplicationConfig> applicationConfigMap = applicationContext == null ?
    null : 
    BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, 
        ApplicationConfig.class, false.false); .Copy the code

Module configuration resolution

Module information configuration. The corresponding configuration class org. Apache. Dubbo. Config. ModuleConfig

Map<String, ModuleConfig> moduleConfigMap = applicationContext == null ?
    null : 
    BeanFactoryUtils.beansOfTypeIncludingAncestors(applicationContext, 
        ModuleConfig.class, false.false); .Copy the code

Monitor Configuration Resolution

Monitoring center configuration. The corresponding configuration class: org. Apache. Dubbo. Config. MonitorConfig

Protocol Configuration resolution

Service provider protocol configuration. The corresponding configuration class: org. Apache. Dubbo. Config. ProtocolConfig. If you want to support multiple protocols, you can declare multiple dubbo:protocol tags and specify the protocol to be used in the dubbo:service through the protocol attribute.

Set service path

Set the service name to serviceConfig

 if (getPath() == null || getPath().length() == 0) {
            if(beanName ! =null && beanName.length() > 0&& getInterface() ! =null && getInterface().length() > 0&& beanName.startsWith(getInterface())) { setPath(beanName); }}Copy the code

Whether to delay exposing services

If the corresponding service is a configuration delayed exposure, the service exposure method #export is invoked at this stage.

if (! isDelay()) {
    export();
}
Copy the code

Triggers the service to expose the onApplicationEvent

During the initialization phase of the Spring Bean, you have already set yourself up as a listener for Spring Events through reflection. Service exposure is triggered when the ContextRefreshedEvent event is received.

    public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
        	if(isDelay() && ! isExported() && ! isUnexported()) { ... export(); }}}Copy the code

Configure the obtain and check phase

Check items include but are not limited to:

  • Specifies whether the method exists on the interface
  • Whether the configurationapplication:name
  • Whether to configure the registry addressregistry:address
  • Set the service exposure protocol. The default isdubbo
  • Check the pile method and mock method – this is mainly an interface implementation check

Splicing exposes the service URL

Based on the server configuration, the service information to be exposed is spliced into the form of urls. Common keys are:

side=provider&version=? &timestamp=? &pid=? &methods=? &name=? &port=? .Copy the code

In addition, if monitor is configured, parameters related to monitor are added to the URL.

URL monitorUrl = loadMonitor(registryURL);
Copy the code

Create/get invoker

Invoker<? > invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));Copy the code

Dubbo has three implementations of Invoker: Javassist, StubProxy, and the original JdkProxy.

The Invoker is essentially a layer of encapsulation of the actual calling method. Wrap all forms of method calls uniformly in Invoker.

JdkProxyFactory
---
    public <T> T getProxy(Invoker
       
         invoker, Class
        [] interfaces)
        {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, Class
       [] parameterTypes, Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                returnmethod.invoke(proxy, arguments); }}; }Copy the code

Dubbo related

Services available

Protocol layer – Protocol

Dubbo actually supports multiple protocols, but let’s just look at the implementation of the Dubbo protocol here.

String key = url.getAddress(); . ExchangeServer server = serverMap.get(key);if (server == null) {
    serverMap.put(key, createServer(url));
} else {
    // Server supports reset, which can be used with override
    server.reset(url);
}
Copy the code

As you can see from the above code, the createServer will only be executed once in the same address. That is, all services within the same service expose the remote service over the same TCP connection.

Before actually starting the service, add a series of service parameters:

  • channel.readonly.sent

    By default, sending the Readonly event when the server is disabled is enabled

  • heartbeat

    The default interval is 60 x 1000

  • codec

    Service coding mode. The default value is Dubbo

Information exchange layer – Non-recoverable

Call server = exchangers.bind (URL, requestHandler); Really start the service.

During the actual operation, different transfer classes are selected according to the CONFIGURATION in the URL.

The implementation class provided by dubbo is HeaderExchangeServer.

ScheduledExecutorService is used to implement a timed heartbeat mechanism.

 heatbeatTimer = scheduled.scheduleWithFixedDelay(
     new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
         public Collection<Channel> getChannels(a) {
             return Collections.unmodifiableCollection(
                     HeaderExchangeServer.this.getChannels() );
         }
     }, heartbeat, heartbeatTimeout),
     heartbeat, heartbeat,TimeUnit.MILLISECONDS);
Copy the code

Finally, the transporters.bind is called to enter the actual transport layer.

return new HeaderExchangeServer(Transporters.bind(url, 
    new DecodeHandler(new HeaderExchangeHandler(handler))));
Copy the code

Network transport layer – Transporter

Let’s take the NettyTransporter as an example.

Take a look at the code for starting a service using Netty:

        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss".true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker".true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);
        
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline(a) {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); } * /
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                returnpipeline; }});// bind
        channel = bootstrap.bind(getBindAddress());
Copy the code

conclusion

Please briefly describe dubbo’s service exposure process

First, the Dubbo server starts with ServiceBean. To integrate with Spring, he makes two cuts to the Bean initialization phase in Spring:

  1. ApplicationContextAware gets the Spring context ahead of time and sets itself as the listener of the Spring Event.
  2. InitializingBean does the dubbo configuration parsing and wrapping of Invoker objects. If there are errors in the configuration file, they are usually exposed here.

Then, because the listener is set up, the Dubbo service exposure is officially triggered when Spring issues a ContextRefreshedEvent.

The main logic of service exposure is implemented at dubbo’s protocol layer, information exchange layer and network transport layer.

First, service-related parameters, such as heartbeat, encoding, etc. are prepared by the protocol layer. It then goes to the information exchange layer to encapsulate the heartbeat mechanism, and finally to the network transport layer to enable NettyServer.

Set requestHandler as pipeLine in NettyServer. The link is:

DecodeHandler -> HeaderExchangeHandler -> DubboProtocol$ExchangeHandlerAdapter
Copy the code
  • DecodeHandler Decodes raw InputStream.

  • HeaderExchangeHandler routes messages based on message types, such as request/ Response /echo

  • DubboProtocol$ExchangeHandlerAdapter Translates the message (Invocation) to the Invoker Invocation and executes.

extension

When nettyService receives the data, how does it convert it to a method call

Let’s take a look at the ChannelPipeline that dubbo created for us.

ChannelPipeline pipeline = Channels.pipeline(); . pipeline.addLast("handler", nettyHandler);
Copy the code

The key is in nettyHandler.

Recall the code that calls the network transport interface for binding at the information exchange layer:

new HeaderExchangeServer(Transporters.bind(url, 
    new DecodeHandler(
        new HeaderExchangeHandler(handler))));
Copy the code

DecodeHandler -> HeaderExchangeHandler -> DubboProtocol$ExchangeHandlerAdapter

The first step is to deserialize by DecodeHandler:

if (message instanceof Request) {
    decode(((Request)message).getData());
}

if (message instanceof Response) {
    decode( ((Response)message).getResult());
}

handler.received(channel, message);
Copy the code

Then, hand over to the HeaderExchangeHandler to do the request/response routing of the method.

            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } else{ handler.received(exchangeChannel, request.getData()); }}}else if (message instanceof Response) {
                handleResponse(channel, (Response) message);
            } else if (message instanceof String) {
                if (isClientSide(channel)) {
                    Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                    logger.error(e.getMessage(), e);
                } else {
                    String echo = handler.telnet(channel, (String) message);
                    if(echo ! =null && echo.length() > 0) { channel.send(echo); }}}else {
                handler.received(exchangeChannel, message);
            }
Copy the code

Finally, it goes into an ExchangeHandlerAdapter inner class implemented in DubboProtocol:

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
    if (message instanceofInvocation) { ... Invocation inv = (Invocation) message; Invoker<? > invoker = getInvoker(channel, inv); .return invoker.invoke(inv);

    }
Copy the code

As we’ve seen above, invoker is dubbo’s encapsulation of the actual method call.

So in this case, it’s the process of getting the invoker based on serviceKey and executing the call.

String serviceKey = serviceKey(
    port, 
    path, 
    inv.getAttachments().get(Constants.VERSION_KEY), 
    inv.getAttachments().get(Constants.GROUP_KEY));
Copy the code

Exchange – Synchronous/asynchronous conversion in the information exchange layer

When we make a method call using the RPC framework, only a synchronous method call occurs at the caller level, just like a local method call.

However, requests do get sent asynchronously in the RPC framework. When the remote method returns the value asynchronously, it is returned synchronously to the user.

The code is located in Dubboinvoke #doInvoke.

protected Result doInvoke(final Invocation invocation) throws Throwable {...if (isOneway) {
         ...
    } else if (isAsync) {
         ...
    } else {
         RpcContext.getContext().setFuture(null);
         return(Result) currentClient.request(inv, timeout).get(); }...Copy the code

In currentClient. Request, we send this call to the data transport layer asynchronously and then block the caller thread with a call to ResponseFuture#get.

while (! isDone()) {
    done.await(timeout, TimeUnit.MILLISECONDS);
    if (isDone() || System.currentTimeMillis() - start > timeout) {
        break; }}Copy the code

Later, upon receiving data from the server, the caller thread is awakened by an active call to done.signal() to complete the asynchronous to synchronous transition.

private void doReceived(Response res) {... response = res;if(done ! =null) { done.signal(); }... }Copy the code