This debug code uses the Dubbo – Demo of the Dubbo project on Githubdubbo-demo-xmlThe code below. Here with the default Dubbo communication protocol for the debug code, inHow to register Dubbo Consumer with SpringandSpring: Dubbo ProviderIn this article, we’ll look at the entire process of making a call to the consumer side. In a post on dubbo’s websiteDubbo’s design principlesArticle, here first posted inside the call chain diagram:

XML configuration for consumer in demo:

<beans 
       // XMLNS :xsi is the xSI tag namespace
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       // XMLNS :dubbo is the namespace of the dubbo tag
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       // The default namespace for the current XML file
       xmlns="http://www.springframework.org/schema/beans"
       // xsi:schemaLocation configures the configuration specification corresponding to each namespace and is used for format verification
       xsi:schemaLocation="Http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <dubbo:application name="demo-consumer"/>

    <dubbo:registry address=Zookeeper: / / "127.0.0.1:2181" timeout="6000"/ > <! <dubbo:protocol name="dubbo"/ > <! <dubbo: Reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" timeout="6000" />

</beans>
Copy the code

Call entry code:

public class Application {
    /** * In order to make sure multicast registry works, need to specify '-Djava.net.preferIPv4Stack=true' before * launch the application */
    public static void main(String[] args) throws Exception {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml");
        context.start();
        DemoService demoService = context.getBean("demoService", DemoService.class);
        for(int i=0; i<1000; i++){ String hello = demoService.sayHello("world");
            System.out.println("result: " + hello);
            Thread.sleep(1000L); }}}Copy the code

inHow to register Dubbo Consumer with SpringIn talking aboutdubbo:referenceThe configuration is resolved into one by the Spring containerFactoryBeanObject in dependency injection or directly calledgetBeanMethod will trigger thisFactoryBeanthegetObject()Method to return the real proxy class, posted firstgetObject()Methods combined with dubbo’s official hierarchical complete sequence diagram:

In blue, the wrapper pattern is used. The core DubboInvoker is wrapped in layers, and when called, it is called from one layer of the outer wrapper class.

There are three main steps:

  1. The consumer sends the request
  2. The Provider parses and processes the request, sending the result to the Consumer
  3. The consumer receives the result of the request

1. The consumer sends the request

Post the overall sequence diagram first:

SayHello (“world”) demoService is a proxy object. The demoService is a proxy object. The demoService is a proxy object.

public class proxy0
implements ClassGenerator.DC.Destroyable.EchoService.DemoService {
    public static Method[] methods;
    private InvocationHandler handler;

    @Override
    public Object $echo(Object object) {
        Object[] objectArray = new Object[]{object};
        Object object2 = this.handler.invoke(this, methods[0], objectArray);
        return object2;
    }

    public CompletableFuture sayHelloAsync(String string) {
        Object[] objectArray = new Object[]{string};
        Object object = this.handler.invoke(this, methods[1], objectArray);
        return (CompletableFuture)object;
    }
    
    // Invoke the actual InvocationHandler
    public String sayHello(String string) {
        Object[] objectArray = new Object[]{string};
        Object object = this.handler.invoke(this, methods[2], objectArray);
        return (String)object;
    }

    @Override
    public void $destroy() {
        Object[] objectArray = new Object[]{};
        Object object = this.handler.invoke(this, methods[3], objectArray);
    }

    public proxy0(a) {}public proxy0(InvocationHandler invocationHandler) {
        this.handler = invocationHandler; }}Copy the code

SayHello (String String) invoke Invoke InvokerInvocationHandler (demoService.sayHello(String String))

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(this.invoker, args);
        } else{ String methodName = method.getName(); Class<? >[] parameterTypes = method.getParameterTypes();if (parameterTypes.length == 0) {
                if ("toString".equals(methodName)) {
                    return this.invoker.toString();
                }

                if ("$destroy".equals(methodName)) {
                    this.invoker.destroy();
                    return null;
                }

                if ("hashCode".equals(methodName)) {
                    return this.invoker.hashCode(); }}else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
                return this.invoker.equals(args[0]);
            }
            // struct the RpcInvocation message, including the interface, method, parameters, and other metadata
            RpcInvocation rpcInvocation = new RpcInvocation(method, this.invoker.getInterface().getName(), args);
            rpcInvocation.setTargetServiceUniqueName(this.invoker.getUrl().getServiceKey());
            // Invoke the invoker chain
            return this.invoker.invoke(rpcInvocation).recreate(); }}Copy the code

In fact, the call is wrapped in layersinvoker.invoke(rpcInvocation)Methods. As you can see from the above code, the main logic is to encapsulate the metadata information of the class, method, and parameter being calledRpcInvocationObject generated by the callinvokerThe chaininvokeMethod, this invoker chain generation process is also inHow to register Dubbo Consumer with SpringAs mentioned in The Invoker packaging chain is as follows:

Will call to FailoverClusterInvoker. Invoke (Invocation Invocation) method, the code is as follows:

public Result invoke(Invocation invocation) throws RpcException {
        this.checkWhetherDestroyed();
        Map<String, Object> contextAttachments = RpcContext.getContext().getAttachments();
        if(contextAttachments ! =null&& contextAttachments.size() ! =0) {
            ((RpcInvocation)invocation).addAttachments(contextAttachments);
        }
        // 1 Obtain the appropriate service provider from the registry object
        List<Invoker<T>> invokers = this.list(invocation);
        // 2 Select one service provider based on the load balancing policy
        LoadBalance loadbalance = this.initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
        // 3 Invoke the selected invoker
        return this.doInvoke(invocation, invokers, loadbalance);
    }
Copy the code

There are three key steps:

  1. Get the appropriate service provider from the registry object
  2. Select one of multiple service providers based on the load balancing policy
  3. Calls the finally selected Invoker

1.1 Obtain the list of service providers that comply with routing policies from the registry object

List<Invoker<T>> invokers = this.list(invocation);

Copy the code

The RegistryDirectory. DoList (Invocation) method is eventually called with the following trunk:

public List<Invoker<T>> doList(Invocation invocation) {
        ...
            List invokers = null;

            try {
            // Filter by routing chain
                invokers = this.routerChain.route(this.getConsumerUrl(), invocation);
            } catch (Throwable var4) {
                logger.error("Failed to execute router: " + this.getUrl() + ", cause: " + var4.getMessage(), var4);
            }

            return invokers == null? Collections.emptyList() : invokers; . }Copy the code

One thing to note here is that the service provider and route configuration information are locally cached by the registration discovery object RegistryDirectory, while the update action is triggered by a listening callback from the registry (see Spring parsing and Registering the Dubbo Consumer for details).

1.2 Select a service provider based on the load balancing policy

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
        return CollectionUtils.isNotEmpty(invokers) ? (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance"."random")) : (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random");
    }
Copy the code

This is done by getting all load balancing policies through the SPI extension mechanism, and then checking the URL to see if there is a load policy specified for the current method. If there is a load policy, then getting the corresponding extension implementation class, which is responsible for using the default random load, can be found here

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (CollectionUtils.isEmpty(invokers)) {
            return null;
        } else if (invokers.size() == 1) {
            return (Invoker)invokers.get(0);
        } else {
        // Select a service provider based on the load policy
            Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);
            if(selected ! =null&& selected.contains(invoker) || ! invoker.isAvailable() &&this.getUrl() ! =null && this.availablecheck) {
                try {
                    Invoker<T> rInvoker = this.reselect(loadbalance, invocation, invokers, selected, this.availablecheck);
                    if(rInvoker ! =null) {
                        invoker = rInvoker;
                    } else {
                        int index = invokers.indexOf(invoker);

                        try {
                            invoker = (Invoker)invokers.get((index + 1) % invokers.size());
                        } catch (Exception var9) {
                            logger.warn(var9.getMessage() + " may because invokers list dynamic change, ignore.", var9); }}}catch (Throwable var10) {
                    logger.error("cluster reselect fail reason is :" + var10.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", var10); }}returninvoker; }}Copy the code

When the service provider node in the registry changes, the service discovery object will be notified to change the locally maintained Invoker list, which will be wrapped in layers of invoker. Each layer of wrapping adds new features, so when invoked, so do the layers.

1.2.1 Assembling the Request Object and setting the Request ID

The ID of the request is set for asynchronous invocation. In this way, after a request is sent, the next request does not need to be blocked. In addition, the returned response will also carry this ID. The core code is in the request method of HeaderExchangeChannel as follows:

public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (this.closed) {
            throw new RemotingException(this.getLocalAddress(), (InetSocketAddress)null."Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        } else {
            / / set id
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            // Set the asynchronous result and cache the result in DefaultFuture's global map
            DefaultFuture future = DefaultFuture.newFuture(this.channel, req, timeout, executor);

            try {
            // Send the request
                this.channel.send(req);
                // Returns asynchronous results
                return future;
            } catch (RemotingException var7) {
                future.cancel();
                throwvar7; }}}Copy the code

1.2.2 Encapsulating asynchronous resultsDefaultFutureAnd cache the asynchronous results into a global Map so that the consumer thread that waits for the results can be woken up and blocked later

DefaultFuture = DefaultFuture. NewFuture (this.channel, req, timeout, executor); DefaultFuture caches the current asynchronous result into a global Map.


/ / the global Map
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap();

public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
    // Call the constructor, where DefaultFuture is cached in the global Map
    DefaultFuture future = new DefaultFuture(channel, request, timeout);
    // Encapsulate the thread pool object into the DefaultFuture object
    future.setExecutor(executor);
    timeoutCheck(future);
    return future;
}

// constructor method
private DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter("timeout".1000);
        // Cache the result in a global static Map with Id as key
        FUTURES.put(this.id, this);
        CHANNELS.put(this.id, channel);
    }
    
  
Copy the code

After receiving the provider result, DefaultFuture is found in the global Map using the ID of Response, and the thread pool object executor encapsulated in the DefaultFuture object is found. Runnable is submitted to the thread pool blocking queue. Wake up the consumer thread blocked by asyncrPcresult.get ().

1.2.3 Calling NettyClient to send Request and return AsyncRpcResult

DubboInvoker. DoInvoke (Invocation)

protected Result doInvoke(Invocation invocation) throws Throwable {
        // Encapsulate request metadata information
        RpcInvocation inv = (RpcInvocation)invocation;
        String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment("path".this.getUrl().getPath());
        inv.setAttachment("version".this.version);
        ExchangeClient currentClient;
        if (this.clients.length == 1) {
            currentClient = this.clients[0];
        } else {
            currentClient = this.clients[this.index.getAndIncrement() % this.clients.length];
        }

        try {
            boolean isOneway = RpcUtils.isOneway(this.getUrl(), invocation);
            int timeout = this.getUrl().getMethodPositiveParameter(methodName, "timeout".1000);
            if (isOneway) {
                boolean isSent = this.getUrl().getMethodParameter(methodName, "sent".false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = this.getCallbackExecutor(this.getUrl(), inv);
                // Send the request through NettyClient
                CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply((obj) -> {
                    return (AppResponse)obj;
                });
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                returnresult; }}catch (TimeoutException var10) {
            throw new RpcException(2."Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: " + var10.getMessage(), var10);
        } catch (RemotingException var11) {
            throw new RpcException(1."Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + this.getUrl() + ", cause: "+ var11.getMessage(), var11); }}Copy the code

throughExchangeClientFinally, the request is sent through NettyClient. The sequence diagram is as follows:

1.3 Block the get() method of asynchronous AsyncRpcResult and wait for the provider to return the result

DubboInvoker returns an asynchronous Future that needs to be blocked to wait for a synchronous call. This logic is in AsyncToSyncInvoker.

public Result invoke(Invocation invocation) throws RpcException {
        Result asyncResult = this.invoker.invoke(invocation); .if (InvokeMode.SYNC == ((RpcInvocation)invocation).getInvokeMode()) {
            // block wait
            asyncResult.get(2147483647L, TimeUnit.MILLISECONDS); }...return asyncResult;
    }
Copy the code

The waitAndDrain() method retrieves data from the blocking queue of the thread pool. When no data is available, it blocks. When the provider returns a result, the waitAndDrain() method is used to retrieve data from the queue. Data is added to the blocking queue to wake up the consumer thread, which executes the runnable logic (including decoding logic) to get the final result. The code is as follows:

public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (this.executor ! =null) {
        ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor)this.executor;
        // block wait
        threadlessExecutor.waitAndDrain();
    }
    return (Result)this.responseFuture.get(timeout, unit);
}

public void waitAndDrain(a) throws InterruptedException {
        Runnable runnable = (Runnable)this.queue.take();
        synchronized(this.lock) {
            this.waiting = false;
            runnable.run();
        }

        for(runnable = (Runnable)this.queue.poll(); runnable ! =null; runnable = (Runnable)this.queue.poll()) {
            try {
                runnable.run();
            } catch(Throwable var4) { logger.info(var4); }}}Copy the code

2. The provider side processes the request

Post the overall sequence diagram first:

2.1 NettyServer listens to the Accept event of channel and binds the Accepted channel to a worker

First, post the flow chart of Netty Server:

Netty Server is a bossGroup that listens for accept events. After listening for Accept events, it selects a corresponding channel bound by NioEventLoop from the workerGroup and processes the read and write events of this channel. When the Dubbo Provider is exposed, a NettyServer will be created and started at the same time, so as to enable listening. The code is as follows:


    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
       // The parent constructor calls the doOpen() method
        super(ExecutorUtil.setThreadName(url, "DubboServerHandler"), ChannelHandlers.wrap(handler, url));
    }

    protected void doOpen(a) throws Throwable {
        // Create a Netty Server boot class
        this.bootstrap = new ServerBootstrap();
        // Create a bossGroup that listens for accept events
        this.bossGroup = new NioEventLoopGroup(1.new DefaultThreadFactory("NettyServerBoss".true));
        // Construct workerGroup, which handles IO reads and writes
        this.workerGroup = new NioEventLoopGroup(this.getUrl().getPositiveParameter("iothreads", Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker".true));
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(this.getUrl(), this);
        this.channels = nettyServerHandler.getChannels();
        // Set the related configuration
        ((ServerBootstrap)((ServerBootstrap)this.bootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)).childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childHandler(new ChannelInitializer<NioSocketChannel>() {
            protected void initChannel(NioSocketChannel ch) throws Exception {
                int idleTimeout = UrlUtils.getIdleTimeout(NettyServer.this.getUrl());
                // Set the codec classes in the inBound and outBound links of a channel
                NettyCodecAdapter adapter = new NettyCodecAdapter(NettyServer.this.getCodec(), NettyServer.this.getUrl(), NettyServer.this);
                if (NettyServer.this.getUrl().getParameter("ssl-enabled".false)) {
                    ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslServerHandler(NettyServer.this.getUrl(), nettyServerHandler));
                }

                ch.pipeline().addLast("decoder", adapter.getDecoder()).addLast("encoder", adapter.getEncoder()).addLast("server-idle-handler".new IdleStateHandler(0L.0L, (long)idleTimeout, TimeUnit.MILLISECONDS)).addLast("handler", nettyServerHandler); }});// Bind ports to enable ServerSocket service and port listening
        ChannelFuture channelFuture = this.bootstrap.bind(this.getBindAddress());
        channelFuture.syncUninterruptibly();
        this.channel = channelFuture.channel();
    }

Copy the code

ChannelFuture = this.bootstrap.bind(this.getBindAddress()); InitAndRegister () is called in two steps: 1, the init () method, the initialization NioServerSocketChannel inbound and outbound processing chain, which will add ServerBootstrap in inbound ChannelHandler chain. ServerBootstrapAcceptor, ChannelRead () of this class will register the Acceted channel with a worker as follows:

public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel)msg;
            child.pipeline().addLast(new ChannelHandler[]{this.childHandler});
            AbstractBootstrap.setChannelOptions(child, this.childOptions, ServerBootstrap.logger);
            Entry[] var4 = this.childAttrs;
            int var5 = var4.length;

            for(int var6 = 0; var6 < var5; ++var6) { Entry<AttributeKey<? >, Object> e = var4[var6]; child.attr((AttributeKey)e.getKey()).set(e.getValue()); }try {
            // Select a NioEventLoop from the workGroup to bind
                this.childGroup.register(child).addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if(! future.isSuccess()) { ServerBootstrap.ServerBootstrapAcceptor.forceClose(child, future.cause()); }}}); }catch(Throwable var8) { forceClose(child, var8); }}Copy the code

The register() method, after initialization, selects a NioEventLoop from the BossGroup responsible for handling accept events and binds the NioServerSocketChannel as follows:

public ChannelFuture register(Channel channel) {
        return this.next().register(channel);
    }
Copy the code

To register a NioServerSocketChannel with a Selector, enable listening:

protected void doRegister(a) throws Exception {
        boolean selected = false;

        while(true) {
            try {
                this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0.this);
                return;
            } catch (CancelledKeyException var3) {
                if (selected) {
                    throw var3;
                }

                this.eventLoop().selectNow();
                selected = true; }}}Copy the code

As described in Spring’s Process of parsing and registering Dubbo Consumer, when listening to the service provider configuration of the registry and refreshing the local service list, a long connection will be directly established with the provider, thus triggering the Accept event.

2.2 Listen for Read events, Read and decode data, and parse metadata information in the request

The main process is to call the handler chain that handles the inbound Request to process the channel, which includes decoding operations, deserializing data in the IO stream, and wrapping it into a Request object. There are two important data: RequestId: Each request has an Id. This Id is returned when the Response is returned. The Response RpcInvocation encapsulates the request metadata, including the class name, interface name, version number, invocation parameters, and other data

The sequence diagram is as follows:

2.3 The metadata information in the request is matched with the corresponding Invoker for processing

The debug diagram is as follows:

It can be seen that the unique Invoker is determined by the four dimensions of class name, port, version number and grouping in the call processSpring: Dubbo ProviderSpeaking of the service exposure process, this key has been cached along with the Exporter that encapsulates the InvokerDubboProtocolThe object’sexporterMapAttribute. The sequence diagram is as follows:

2.4 Package the processing result into a Response objectHeaderExchangeHandlerSend to the caller

There are two main steps:

  1. Wrap the returned result into a Response object, which needs to contain RequestId
  2. The results are sent through the HeaderExchangeHandler

The sequence diagram is as follows:

3. The consumer receives Response

The entry is also for Netty WorkGroupNioEventLoopClass, the sequence diagram is as follows (the lower part is the receiving sequence diagram) :

3.1 Listen on read event of channel, deserialize IO data through Hessian2 and encapsulate it into Response object

The debug diagram is as follows:

3.2 Through the Id of Response, find the corresponding Future in the global Map of DefaultFuture, obtain the corresponding thread pool through the Future, add Runnable to the blocking queue of the thread pool, and wake up byAsyncRpcResult.get()The blocked consumer thread

The consumer blocking logic is in 1.3, and the core code to wake up is in the Received method of AllChannelHandler, which looks like this:

public void received(Channel channel, Object message) throws RemotingException {
        // Find the corresponding Future from the global Map of DefaultFuture by the Id of Response, and get the corresponding thread pool by the Future
        ExecutorService executor = this.getPreferredExecutorService(message);
        // Wake up the blocked consumer thread by adding tasks to the blocking queue of the thread pool
        try {
            executor.execute(new ChannelEventRunnable(channel, this.handler, ChannelState.RECEIVED, message));
        } catch (Throwable var5) {
            if (message instanceof Request && var5 instanceof RejectedExecutionException) {
                this.sendFeedback(channel, (Request)message, var5);
            } else {
                throw new ExecutionException(message, channel, this.getClass() + " error when process received event .", var5); }}}public ExecutorService getPreferredExecutorService(Object msg) {
        if (msg instanceof Response) {
            Response response = (Response)msg;
            // Get the Future from the global Map with the Id of Response
            DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
            if (responseFuture == null) {
                return this.getSharedExecutorService();
            } else {
                ExecutorService executor = responseFuture.getExecutor();
                if (executor == null || executor.isShutdown()) {
                    executor = this.getSharedExecutorService();
                }

                returnexecutor; }}else {
            return this.getSharedExecutorService(); }}Copy the code

3.3 The awakened consumer thread decodes the request data and returns the request result

The debug diagram is as follows: