>>>> 😜😜😜 Github: 👉 github.com/black-ant CASE backup: 👉 gitee.com/antblack/ca…

A. The preface

Dubbo 3.0 was released last year. Overall, there is no significant difference between using Dubbo 3.0 and using Dubbo 2.0. I haven’t read the Dubbo source code before.

In Dubbo 2.0, Dubbo is divided into 10 layers, which is the same as in 3.0. This one only corresponds to Proxy, Exchange and Transport

1.1 Addition 1: Introduction to RPC

Remote Procedure Call Protocol (RPC) is a Protocol that requests services from Remote computer programs over the network without understanding the underlying network technology.

A basic RPC architecture should contain at least four components:

2. Client Stub: Stores the address information of the server and packages the request parameter data information of the Client into network messages. Server Stub: receives the request message sent by the client, unpackets it, and invokes the local service for processing. 4. Server: the real provider of the service

One of the core contents of Dubbo3 is to define the next generation RPC protocol. In addition to communication functions, Dubbo3 also has the following functions:

  • Uniform binary format across languages
  • Supports Streaming and application layer full duplex calling model
  • extensible
  • Can be recognized by all layers of equipment

2. The RPC Server

2.1 Service Cases

Using the official example, let’s look at the key points on the Service side:

@DubboService
public class DemoServiceImpl implements DemoService {
    private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);

    @Override
    public String sayHello(String name) {
        logger.info("Hello " + name + ", request from consumer: " + RpcContext.getServiceContext().getRemoteAddress());
        return "Hello " + name + ", response from provider: " + RpcContext.getServiceContext().getLocalAddress();
    }

    @Override
    public CompletableFuture<String> sayHelloAsync(String name) {
        return null; }}// The core is @dubboService
// The implementation interface is not a necessary point on the Dubbo process, but for specification purposes, @Reference on the Client will throw an exception if it does not match this interface


Copy the code

2.2 Creating an Agent on the Server

Ignoring @dubboService’s scan logic, let’s just look at how to scan and create a Server agent

2.3 Process for Commissioning the Server

This step maps attributes to corresponding methods, looking at the server-side reflection process:

  • C-channeleventrunnable # run: listens for and processes a channel
  • C-decodehandler # received: received message
  • C-headerexchangehandler # received: Exchange received
  • C-headerexchangehandler # handleRequest: Handles the request
  • C-dubboprotocol # requestHandler: reflects to the specified method
  • C-xxxfilter. invoke: Filter interceptor chain processing
  • C- InvokerWrapper # invoke
  • C- AbstractProxyInvoker # invoke
  • C- JavassistProxyFactory # doInvoke

As you can see here, the principal is still processed by an exchange-protocol-proxy

2.3.1 Receiving messages

Before from a received, need to have a look at the Dubbo Remote Remote call related logic, can read, entrance class for ChannelEventRunnable:

Supplementary: registration of ChannelEventRunnable

// The registration of ChannelEventRunnable is made when called, first building a NettyServerHandler to create a Netty serverC- NettyServerHandler # channelActive C- AbstractServer # connected C- AllChannelHandler # connected : Channel connection processing C-channeleventrUnnable # ChannelEventRunnable: the constructor loads ChannelEventRunnable// PS: previously initiated by DubboBootstrap # exportServices
// Pass DubboProtocol # createServer logic
// Finally create NettyServer. I won't go into detail here

Copy the code

How does ChannelEventRunnable implement listener classification

As you can see, ChannelEventRunnable calls different logic for different states

// Data is received
- ChannelState.RECEIVED       <--->  handler.received(channel, message)
/ / Channel connection
- ChannelState.CONNECTED      <--->  handler.connected(channel)
// Channel connection failed
- ChannelState.DISCONNECTED   <--->  handler.disconnected(channel)
// Channel Sending status
- ChannelState.SENT           <--->   handler.sent(channel, message)
// An exception occurred
- ChannelState.CAUGHT         <--->  handler.caught(channel, exception)

// Step 1: Call CONNECTED to establish a connection
channel -> [id: 0xb761cccc, L:/192.168181.2.:20880 - R:/192.168181.2.:52575]
url : dubbo:/ / 192.168.181.2:20880 / org. Apache. Dubbo. Demo. DemoService? Anyhost = true&application = dubbo - demo - the annotation - provider&bind. IP = 192.168.181.2 & bind. Port = 20880 & channel. Readonly. Sent = true&codec = dubbo&deprecated = false&dubbo = 2.0.2 & dynamic = true&generic = f alse&heartbeat=60000&interface=org.apache.dubbo.demo.DemoService&metadata-type=remote&methods=sayHello,sayHelloAsync&pid = 5676 & release & side of = = provider&threadname = DubboServerHandler - 192.168.181.2:20880 & timestamp = 1627140229352

// Step 2: receive the message
handler.received(channel, message)

Copy the code

Here we focus only on received. The code and the data received are as follows

// c-decodehandler # receive: receive message
public void received(Channel channel, Object message) throws RemotingException {
    // Step 1: Decode the message...
    
    // Step 2: Call Handler
    handler.received(channel, message);
}

channel -> NettyChannel [channel=[id: 0x665df7bf, L:/192.168181.2.:20880 - R:/192.168181.2.:61537]]
message -> Request 
[id=1, version=2.02., twoway=true, event=false, broken=false, data=RpcInvocation 
    [methodName=sayHello, parameterTypes=[class java.lang.String].arguments=[world], 
        attachments=
        {
            input=272, dubbo=2.02., path=org.apache.dubbo.demo.DemoService, version=0.0. 0, 
            remote.application=dubbo-demo-annotation-consumer, 
            interface=org.apache.dubbo.demo.DemoService
        }
    ]
]

Copy the code

2.3.2 Received Processing requests

Take a look at the ExchangeHandler architecture first

In the DecodeHandler, the message type is DecodeHandler, and the message type is DecodeHandler.

  • Decodeable –> decode(message);
  • Request –> decode(((Request) message).getData())
  • Response –> decode(((Response) message).getResult());

Then comes the HeaderExchangeHandler handling the request for a Channel:

C- HeaderExchangeHandler
    M- received : 
	- HeaderExchangeChannel.getOrAddChannel(channel)
	1- handlerEvent(channel, request)
	2- handleRequest(exchangeChannel, request)
	3- handler.received(exchangeChannel, request.getData())
	4- handleResponse(channel, (Response) message)
	5- channel.send(echo)
    

public void received(Channel channel, Object message) throws RemotingException {
	final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    // Decode the message by type
    // Do this separately and you can see the obvious idea of decoupling
    if (message instanceof Request) {
        // handle request.
        Request request = (Request) message;
        // Events or bidirectional identifiers (response or ack) are handled separately, and the default is directed directly to the underlying Handler
        if (request.isEvent()) {
            handlerEvent(channel, request);
        } else {
            if (request.isTwoWay()) {
                handleRequest(exchangeChannel, request);
            } else{ handler.received(exchangeChannel, request.getData()); }}}else if (message instanceof Response) {
        handleResponse(channel, (Response) message);
    } else if (message instanceof String) {
        // Only exception handling, meaning what type
        if (isClientSide(channel)) {
            Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
            logger.error(e.getMessage(), e);
        } else {
            String echo = handler.telnet(channel, (String) message);
            if(echo ! =null && echo.length() > 0) { channel.send(echo); }}}else{ handler.received(exchangeChannel, message); }}Copy the code

2.3.3 Process Request and build Response

Because isTwoWay is true, a response is required to inform the caller that the message has been received

The Response is prepared, and the Response is returned, and the corresponding method is called

void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {

    // Prepare Response for return
   Response res = new Response(req.getId(), req.getVersion());
   if (req.isBroken()) {
        / /... Here is an exception, omit the logic associated with building the exception
        
        // Channle returns response
       channel.send(res);
       return;
   }
   // Get InvokeMehod from Messsage
   Object msg = req.getData();
   try {
       // Here a CompletableFuture is built to initiate asynchronous processing -> DubboProtocol
       CompletionStage<Object> future = handler.reply(channel, msg);
       future.whenComplete((appResult, t) -> {
           try {
               if (t == null) {
                   res.setStatus(Response.OK);
                   res.setResult(appResult);
               } else {
                   res.setStatus(Response.SERVICE_ERROR);
                   res.setErrorMessage(StringUtils.toString(t));
               }
               // Return directly after processing, comfortable
               channel.send(res);
           } catch (RemotingException e) {
               
           }
       });
   } catch(Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); }}Copy the code

2.3.4 Obtaining the method of reflection

The above section builds a CompletableFuture, which is built by DubboProtocol # requestHandler:

C-dubboprotocol p-requesthandler: Note that this is a property that is created when the DubboProtocol is createdprivate ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

    // This is the method called in the process
    @Override
    public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {

        if(! (messageinstanceof Invocation)) {
            throw newRemotingException(....) ; }// Build the proxy class
        Invocation inv = (Invocation) message;
        // Step 2: Core logic, get the Invoke proxy classInvoker<? > invoker = getInvoker(channel, inv);// For callbacks, backward compatibility needs to be considered
        if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
            String methodsStr = invoker.getUrl().getParameters().get("methods");
            boolean hasMethod = false;
            if (methodsStr == null| |! methodsStr.contains(",")) {
                hasMethod = inv.getMethodName().equals(methodsStr);
            } else {
                String[] methods = methodsStr.split(",");
                for (String method : methods) {
                    if (inv.getMethodName().equals(method)) {
                        hasMethod = true;
                        break; }}}if(! hasMethod) {return null; }}// Configure the container message: RemoteAddress
        RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
        
        // Step 3: Notice. The interceptor chain -> 2.4 Filter system starts here
        // invoker : FilterChainBuilder$FilterChainNode
        Result result = invoker.invoke(inv);
        
        // When the processing is complete, the Future returns
        return result.thenApply(Function.identity());
    }


    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Invocation) {
            reply((ExchangeChannel) channel, message);

        } else {
            super.received(channel, message); }}// This method will be invoked when the first connection is made
    // However, I think there should be a customized approach, which I will have a chance to look at later
    @Override
    public void connected(Channel channel) throws RemotingException {
        invoke(channel, ON_CONNECT_KEY);
    }
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        invoke(channel, ON_DISCONNECT_KEY);
    }

    private void invoke(Channel channel, String methodKey) {
        // Create a proxy. If the proxy is not empty, the logic will continue
        Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
        if(invocation ! =null) {
            try {
                received(channel, invocation);
            } catch (Throwable t) {
                logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: "+ t.getMessage(), t); }}}// Channel.geturl () is always bound to a fixed service, and the service is random
    private Invocation createInvocation(Channel channel, URL url, String methodKey) {
        // In this case, the properties are fetched from the URL. Constants are onConnect, ondisconnect, and no proxy is created
        String method = url.getParameter(methodKey);
        if (method == null || method.length() == 0) {
            return null;
        }
        
        / / by
        RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), "".newClass<? > [0].new Object[0]);
        invocation.setAttachment(PATH_KEY, url.getPath());
        invocation.setAttachment(GROUP_KEY, url.getGroup());
        invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
        invocation.setAttachment(VERSION_KEY, url.getVersion());
        if (url.getParameter(STUB_EVENT_KEY, false)) {
            invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
        }

        returninvocation; }};// Invoke (ServiceKey); // Invoke (ServiceKey)Invoker<? > getInvoker(Channel channel, Invocation inv)throws RemotingException {
    boolean isCallBackServiceInvoke = false;
    boolean isStubServiceInvoke = false;
    
    // Step 2-1: Prepare identification data
    // Get port: 20880
    int port = channel.getLocalAddress().getPort();
    / / to get the required configuration types: org. Apache. Dubbo. Metadata. MetadataService
    String path = (String) inv.getObjectAttachments().get(PATH_KEY);

    // Step 2-2: Determine whether it is a callback service for the client
    isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY));
    if (isStubServiceInvoke) {
        port = channel.getRemoteAddress().getPort();
    }

    // Whether there is a callback agentisCallBackServiceInvoke = isClientSide(channel) && ! isStubServiceInvoke;if (isCallBackServiceInvoke) {
        path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY);
        inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
    }
    
    // Step 2-3: Obtain ServiceKey
    / / dubbo - demo - the annotation - the provider/org. Apache. Dubbo. Metadata. MetadataService: 1.0.0:20880
    String serviceKey = serviceKey(
            port,
            path,
            (String) inv.getObjectAttachments().get(VERSION_KEY),
            (String) inv.getObjectAttachments().get(GROUP_KEY)
    );
    
    // Step 2-4: Query the corresponding DubboExporterDubboExporter<? > exporter = (DubboExporter<? >) exporterMap.get(serviceKey);if (exporter == null) {
        throw newRemotingException(.... ; }// Step 2-5: Return the Invoke class
    return exporter.getInvoker();
}

// 补充 Step 2-3 : 
protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
        return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}
    

Copy the code

2.4 the Filter system

To invoke, create a FilterChain. To invoke, create a FilterChain. To invoke, create FilterChain.

  • EchoFilter
  • ClassLoaderFilter
  • GenericFilter
  • ContextFilter
  • ExceptionFilter
  • MonitorFilter
  • TimeoutFilter
  • TraceFilter
C- FilterChainBuilder
    
public Result invoke(Invocation invocation) throws RpcException {
    Result asyncResult;
    try {
        // Filter chain processing
        asyncResult = filter.invoke(nextNode, invocation);
    } catch (Exception e) {
       // Omit, mainly for Listerner listening, again playing asynchronous
        throw e;
    } finally{}return asyncResult.whenCompleteWithContext((r, t) -> {
        // After the processing is complete, the listener should be notified
        if (filter instanceof ListenableFilter) {
            ListenableFilter listenableFilter = ((ListenableFilter) filter);
            Filter.Listener listener = listenableFilter.listener(invocation);
            try {
                if(listener ! =null) {
                    if (t == null) {
                        listener.onResponse(r, originalInvoker, invocation);
                    } else{ listener.onError(t, originalInvoker, invocation); }}}finally{ listenableFilter.removeListener(invocation); }}else if (filter instanceof FILTER.Listener) {
            FILTER.Listener listener = (FILTER.Listener) filter;
            if (t == null) {
                listener.onResponse(r, originalInvoker, invocation);
            } else{ listener.onError(t, originalInvoker, invocation); }}}); }Copy the code

2.5 Final method calls

And a method call to the last Filter

C- AbstractProxyInvoker

// Asynchrony and Future
public Result invoke(Invocation invocation) throws RpcException {
        try {
            // Call the JavassistProxyFactory handler
            Object value = doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
            CompletableFuture<Object> future = wrapWithFuture(value);
            CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
                AppResponse result = new AppResponse(invocation);
                / /...
                return result;
            });
            return new AsyncRpcResult(appResponseFuture, invocation);
        } catch (InvocationTargetException e) {
            return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation);
        } catch (Throwable e) {
            throw new RpcException(....);
        }
}

// PS: AsyncRpcResult (AsyncRpcResult)This class represents an unfinished RPC call, and it will store some of the context information for the call, such as the RpcContext and Invocation, so when the call ends and the result is returned, it ensures that all the context recovered is the same as when the call was made before any callback was called.// c-JavAssistProxyFactory: JavassistProxyFactory: JavassistProxyFactory: JavassistProxyFactory
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName, Class
       [] parameterTypes, Object[] arguments) throws Throwable {
            returnwrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); }}; } -proxy: indicates the class being proxied. -methodName: indicates the method being called. -parameterTypes: indicates the property typeCopy the code

2.6 Thread invocation of RPC

When an RPC is called, it creates an InternalRunnable. Let’s see how it is called

The underlying interaction is through Netty, which will not be detailed here, and will be analyzed later when we talk about Netty

public class InternalRunnable implements Runnable{
    private final Runnable runnable;

    public InternalRunnable(Runnable runnable){
        this.runnable=runnable;
    }

    /**
     * After the task execution is completed, it will call {@link InternalThreadLocal#removeAll()} to clear
     * unnecessary variables in the thread.
     */
    @Override
    public void run(a) {
        try{
            runnable.run();
        }finally{ InternalThreadLocal.removeAll(); }}}// The path of the call
public class NamedInternalThreadFactory extends NamedThreadFactory {
    @Override
    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        InternalThread ret = new InternalThread(mGroup, InternalRunnable.Wrap(runnable), name, 0);
        ret.setDaemon(mDaemon);
        returnret; }}Copy the code

3. Dig deep

3.1 Method information source and verification

// The parameters passed from Netty look like this:
{
	"broken": false."data": {
		"arguments": ["world"]."attachments": {
			"input": "272"."path": "org.apache.dubbo.demo.DemoService"."remote.application": "dubbo-demo-annotation-consumer"."dubbo": "2.0.2"."interface": "org.apache.dubbo.demo.DemoService"."version": "0.0.0"
		},
		"methodName": "sayHello"."objectAttachments": {
			"input": "272"."dubbo": "2.0.2"."path": "org.apache.dubbo.demo.DemoService"."version": "0.0.0"."remote.application": "dubbo-demo-annotation-consumer"."interface": "org.apache.dubbo.demo.DemoService"
		},
		"parameterTypesDesc": "Ljava/lang/String;"."targetServiceUniqueName": "Org. Apache. Dubbo. Demo. DemoService: 0.0.0"
	},
	"event": false."heartbeat": false."id": 1."twoWay": true."version": "2.0.2"
}

// We can see that the parameters passed by Netty already carry the method information, that is, the method information is sent by the Client.
// note that this methodName is not a postcheck. this methodName is checked when the item is initialized

C- ReferenceConfig
protected synchronized void init(a) {
    // The initial configuration process is called DubboBootstrap
    
    / / check Invoke
    checkInvokerAvailable();
}



// Step 2: Check whether it is correct
private void checkInvokerAvailable(a) throws IllegalStateException {
	if(shouldCheck() && ! invoker.isAvailable()) { invoker.destroy();throw new IllegalStateException("Failed to check the status of the service "......);
	}
}


Copy the code

conclusion

From this article, it is found that Dubbo’s Method proxy information is passed from the remote Client, and the remote Client checks the @referenceservice at startup to ensure accuracy

  • The Dubbo Server first creates the NettyService
  • The starting point for processing is ChannelEventRunnable, which is resolved through DecodeHandler
  • The core processing logic is in the DubboProtocol, where the Invoke class is obtained
  • The final caller, JavassistProxyFactory, initiates the call to the proxy

Overall, the Dubbo code is more down to earth than the Spring code,!!!!!!!!!!