1. Mainly invoke process resolution

A simple RPC framework call, involving proxy and network communication, protocol design and other technologies, RPC call needs to transfer the call information from the client to the server, including the call interface, call method name, method parameter type and parameter value, etc. When passing the method parameter value, the serialized object needs to be converted into a binary stream and transmitted over the network to the server, where the binary stream needs to be deserialized according to the sequence method of the client. A complete RPC call flow for Dubbo is shown below.

The client pulls and subscribes to the list of services from the registry each time it is started. The Cluster aggregates the list of pulled services into an Invoker that fetches metadata from the service provider through Directory# List before each RPC call. The obtained service lists are used by subsequent routers and LoadBalance. After the service call is initiated, the service list obtained from the routing result is used as the load balancing parameter. After the load balancing, a machine is selected for RPC call. After routing and load balancing, the client sends the request to the underlying I/O thread pool (such as Netty) for processing. The I/O thread pool mainly processes the logic of read/write, serialization, and deserialization. The thread pool is divided into one type of I/O thread pool managed by Netty and the other type of Dubbo service thread pool for implementing business method invocation.

2. Network communication model analysis

Dubbo framework of network communication is through the Exchange of RPC calls/Transport/Serialize layer, mainly the implementation code in Remoting package, some of which mainly defines the interface as shown below. It can be abstracted into concepts such as Endpoint, Server, Client, and Channel. The Client communicates with the Server through channels. Data exchange relies on the underlying Transport layer for actual data transmission, and the Server and Client related to the Transport layer are involved.

In essence, RPC communication is a typical C/S architecture communication, and Dubbo’s network communication flow is shown below. By the client must first connect to the server, with URL as identified storage channels among them, the client sends the communication content, after serialization and protocol coding form a complete communication message is sent to the server, the server will decode and deserialized message content, and through many channels processing node, the message content into the processing and distribution, And submitted to the thread pool for processing. The Dubbo framework provides a number of extension points, including the headerRecovery interface for information interaction, the NettyTransporter interface for the information communication link layer, and the Non-recovery interface for information communication link layer. There are also channel processing extension point ChannelHandle interface, there are many implementation classes, Codec2 codec interface, the default is DubboCodec implementation, Serialization extension point interface, the default implementation is Hessian2Serializaion implementation class.

3. Implement RPC call parsing at the Protocol layer

3.1 The consumer Initiates an RPC Call

The Dubbo framework encapsulates the content of an RPC invocation in the RpcInvocation. After the invocation, the invocation invocation will be enhanced InvokerInvocationHandler#invoke, so that the local invocation will convert the RPC invocation request. Each RPC invocation is encapsulated in the RPC Invocation object as a method invocation to Invoker# Invoke, where the RPC Invocation has the invocation method name, service name, parameter type, parameter value, return type, Invoker object, and so on.

public class RpcInvocation implements Invocation, Serializable { private String targetServiceUniqueName; // methodName private String methodName; // serviceName private String serviceName; Private TRANSIENT Class<? >[] parameterTypes; Private Object[] arguments; //Invoker object private TRANSIENT Invoker<? > invoker; Private TRANSIENT Class<? > returnType; Private TRANSIENT Type[] returnTypes; } public class InvokerInvocationHandler implements InvocationHandler {... @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ... RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args); String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); . return invoker.invoke(rpcInvocation).recreate(); }}Copy the code

In the DubboInvoker#doInvoke method, two-way reply can be divided into one-way and two-way reply according to whether the request type needs two-way reply or not. One-way reply is very simple, using the communication client to directly send the RPC request data, but two-way reply is to wait for the reply response from the remote side. In order to avoid the main thread being trapped in the blocking waiting, A thread pool is used to complete the request-wait process, returning a result object to the upper layer and waiting for it to return if needed.

public class DubboInvoker<T> extends AbstractInvoker<T> { ... @Override protected Result doInvoke(final Invocation invocation) throws Throwable { .... try { ... // One-way, no need to wait for the server to reply, If (isOneway) {Boolean isSent = getUrl().getMethodParameter(methodName, constants.sent_key, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); ExecutorService Executor = getCallbackExecutor(getUrl(), inv); ExecutorService Executor = getCallbackExecutor(getUrl()); // Complete TableFuture <AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor) .thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { ... } catch (RemotingException e) { ... }}... }Copy the code

3.2 The server Responds to RPC requests

When the service is exposed, the instance Invokerd has been stored in DubboExporters according to specific rules such as port, interface name, interface version and interface grouping. It is stored using HashMap. When the client calls it, it must carry the key constructed with the same information. Invoke # DubboInvoker as the default implementation. DoInvoke returns a Result object to be used by DubboProtocol. Returns the holding of asynchronous tasks to the upper level.

public class DubboExporter<T> extends AbstractExporter<T> { ... private final String key; private final Map<String, Exporter<? >> exporterMap; . } public class DubboProtocol extends AbstractProtocol { private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) { Invocation inv = (Invocation) message; Invoker<? > invoker = getInvoker(channel, inv); RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); 】  } else { super.received(channel, message); } } private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation ! = null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); }}}}; Invoker<? > getInvoker(Channel channel, Invocation inv) throws RemotingException { ... String serviceKey = serviceKey(port, path, name, and version); // create a unique serviceKey based on the local port, interface name, interface group, and interface version. (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); // Get DubboExporter< in Map according to service key? > exporter = (DubboExporter<? >) exporterMap.get(serviceKey); return exporter.getInvoker(); }}Copy the code

4.Exchange layer implementation parsing

From the analysis of RPC protocol implementation module, it can be seen that the remote transmission module is mainly provided by Dubbo protocol in RPC protocol, and the main responsibility of Exchange layer is to provide the data transmission function between producer and consumer. The DubboProtocol#protocolBindingRefer and DubboProtocol# Export methods are called when the server and client are initialized for service consumption and production. DubboProtocol#getClients and DubboProtocol#createServer are called to initialize the server and client.

private ProtocolServer createServer(URL url) { ExchangeServer server; Handler server = exchangers.bind (url, requestHandler); // Bind to exchangers.bind (url, requestHandler); } catch (RemotingException e) { ... } str = url.getParameter(CLIENT_KEY); if (str ! = null && str.length() > 0) {// Communication server that supports multiple protocols Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class) .getSupportedExtensions(); } return new DubboProtocolServer(server); }} private ExchangeClient[] getClients(URL URL) {// ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (useShareConnect) { clients[i] = shareClients.get(i); } else { clients[i] = initClient(url); } } return clients; } private ExchangeClient initClient(URL url) { ExchangeClient client; Try {/ / determine whether need to delay client type if (url. The getParameter (LAZY_CONNECT_KEY, false)) {client = new LazyConnectExchangeClient (url, requestHandler); '} else {// connect to the server and specify the requestHandler client = exchangers.connect (url, requestHandler); }... } catch (RemotingException e) { ... } return client; }Copy the code

The main class structure implemented by the Exchaner layer is as follows, that is, each Client communicates with the Server Server through a Channel, and processes channel-related events through a ChannelHandler. The Server Server maintains a set of channels connected to the Client.

Sano1100is an extension point, which contains two methods. The entry parameters are BOTH URL and Exchange Handler, and the Server and Client are returned. The two methods are used by the service consumer and Server provider respectively.

  • Service consumer: contains an ExchangeClient, through the connect method to connect the service provider, and specify the corresponding ExchangeHandler for remote call processing, the request method and parameters through the URL sent to the provider;

  • Service provider: contains an ExchangeServer, bound to a specified URL and port through the bind method class, used to listen to the consumer’s connection and remote call requests, and specified ExchangeHandler to handle the request, according to the request method and parameters call local methods, get the results returned to the service consumer

    @SPI(HeaderExchanger.NAME) public interface Exchanger {

    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;
    
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;
    Copy the code

    }

The Server/Client is created using the Exchanges class, and the extension (HeaderRecovery) is used as the default extension (Recovery) when bind/connect is called. The communication between the Client and the Server is implemented through the Transporter. The Transporter creates a bind and returns a Server object. The Transporter creates a Client object by connecting to the Server. In order to support different protocols/communication implementations, the framework designs the Transporter as an SP extension point, which is implemented as a NettyTransporter class by default. The Transporters are called to load and generate clients and servers with different implementations.

The main call link is as follows:

Exchangers#bind/#connect -> Transporters#bind/#connect -> Server/Client

Except for the Transporter implementation, the corresponding SPI implementation can be selected according to the URL, the other implementation is basically fixed mode. Server and Client use different servers, clients, and channels based on different Transporter implementations. Although Exchange is an SPI and supports Adaptive, the implementation of Exchange only uses HeaderExchange.

4.1 ExchangeChannel Implementation analysis

ExchangeClient and ExchangeServer communicate through ExchangeChannel. In the ExchangeChannel interface class, the methods of requesting and acquiring channel processing class objects are mainly defined, in which asynchronous tasks are returned after the request. The default here is the real HeaderExchangeChannel.

public interface ExchangeChannel extends Channel {

    CompletableFuture<Object> request(Object request, ExecutorService executor);

    CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor);    
    
    ExchangeHandler getExchangeHandler();
}
Copy the code

In ExchangeChannel communication, all RPC request-reply information is encapsulated in the Requst and Response classes. In the Request, each RPC Request is identified with a unique ID. This ID is generated by auto-increment. When the maximum value is reached, the ID is generated from the minimum value, and the current framework version and Request data are recorded. In Response, there is also the request ID, frame version, error message, result, and so on.

Public class Request {private static final AtomicLong INVOKE_ID = new AtomicLong(0); // Request id, unique identifier private final long mId; // Current version private String mVersion; Private Boolean mTwoWay = true; Private Boolean mEvent = false; Private Object mData; Public Request() {mId = newId(); } private static long newId() {return invokeid.getAndincrement (); } } public class Response { ... Private Long mId = 0; private Long mId = 0; Private String mVersion; Private byte mStatus = OK; Private Boolean mEvent = false; // Error message private String mErrorMsg; Private Object mResult; }Copy the code

In the HeaderExchangeChannel implementation, the ExchangeHandler#send method is called whenever the request method is called and the final result is returned as an asynchronous result.

public final class HeaderExchangeChannel implements ExchangeChannel { .... @Override public void sent(Channel channel, Object message) throws RemotingException { try { ExchangeChannel exchangeChannel = HeaderExchangeChannel .getOrAddChannel(channel); // The channel handler class sends handler.sent(exchangeChannel, message); } catch (Throwable t) { exception = t; HeaderExchangeChannel.removeChannelIfDisconnected(channel); } if (message instanceof Request) { Request request = (Request) message; DefaultFuture.sent(channel, request); }... }... @Override public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) {// Build RPC Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture = DefaultFuture. NewFuture (Channel, req, timeout, executor); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }... }Copy the code

4.2 ExchangHandler implements parsing

ExchangeHandler acts as the Channel processor interface for Exchange, inheriting ChannelHannler’s defined approach. · Mainly define the processing method under channel connection/channel disconnection/sending/receiving information/channel abnormality.

@SPI
public interface ChannelHandler {
    
    void connected(Channel channel) throws RemotingException;
    
    void disconnected(Channel channel) throws RemotingException;
  
    void sent(Channel channel, Object message) throws RemotingException;
   
    void received(Channel channel, Object message) throws RemotingException;

    void caught(Channel channel, Throwable exception) throws RemotingException;
}

public interface ExchangeHandler extends ChannelHandler, TelnetHandler {
    
    CompletableFuture<Object> reply(ExchangeChannel channel, Object request) throws RemotingException;

}
Copy the code

ExchangeHandlerDispatcher inherited ExchangeHandler, hold the handle the dispenser ChannelHandlerDispatcher object, when calling the send method to call the send method of the dispenser, The sent method that fires the List of ChannelHandler objects, as well as connections/disconnections. The processor set will be initialized when the CALL to CONNECT /bind D is made.

public class ExchangeHandlerDispatcher implements ExchangeHandler { private final ChannelHandlerDispatcher handlerDispatcher; @Override public void sent(Channel channel, Object message) { handlerDispatcher.sent(channel, message); }... } public class ChannelHandlerDispatcher {private final Collection<ChannelHandler> channelHandlers = new CopyOnWriteArraySet<ChannelHandler>(); . @Override public void sent(Channel channel, Object message) { for (ChannelHandler listener : channelHandlers) { try { listener.sent(channel, message); } catch (Throwable t) { logger.error(t.getMessage(), t); }}}... } public class HeaderExchanger implements Exchanger { @Override public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true); } @Override public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }}Copy the code

5. Analysis of request-response model

In a real-world scenario, the client invokes the service concurrently using multiple threads, and the Dubbo framework uses the global request ID in the communication content to identify the correct response invocation.

The implementation of this request-response model relies on DefaultFuture, a class that stores the relationship between request IDS and communication channels, request IDS and tasks, task execution thread pool, timing scheduler, and send/receive methods. Asynchronous requests are converted into synchronous requests through DefaultFuture tasks, thread pools, timed schedulers, and other utility classes.

Public class DefaultFuture extends CompletableFuture<Object> {private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>(); Private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>(); private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>(); // Private final Channel Channel; // Private final Request Request; Private ExecutorService executor; Public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(...) ; Public static DefaultFuture newFuture(Channel Channel, Request Request, int timeout, ExecutorService executor) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); future.setExecutor(executor); If (executor instanceof ThreadlessExecutor) {// Execute in the thread pool, And wait for the task ((ThreadlessExecutor) executor). SetWaitingFuture (Future); } // timeoutCheck(future); return future; } // timeoutCheck method private static void timeoutCheck(DefaultFuture future) {// create a timeoutCheck task with a unique request id, TimeoutCheckTask task = new TimeoutCheckTask(future.getid ()); future.timeoutCheckTask = TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS); } public static void sent(Channel Channel, Request Request) {DefaultFuture future = FUTURES. Get (request.getid ()); if (future ! = null) {// Here is simply future.dosent (); Public static void received(Channel Channel, Response Response, Boolean timeout) {try {// Retrieve existing task DefaultFuture Future = FUTURES. Remove (response.getid ()); if (future ! Timeout t = future.timeoutCheckTask; if (! timeout) { t.cancel(); } future.doReceived(response); } else {} finally {// Channels.remove (response.getid ()); } } private void doReceived(Response res) { ... This.plete (res.getresult ()) if (res.getStatus() == response.ok) {this.plete (res.getresult ()); } else if (res) getStatus () = = Response. CLIENT_TIMEOUT | | res. The getStatus () = = Response. SERVER_TIMEOUT) {/ / timeout exception handling, Put exception information in the result of the task, End task this.com pleteExceptionally (new TimeoutException (res getStatus () = = Response. SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); }... } private static class TimeoutCheckTask implements TimerTask {@override public void run(Timeout Timeout) { Return DefaultFuture = defaultFuture.getFuture (requestID) if the task is empty or completed; if (future == null || future.isDone()) { return; } // If (future.getexecutor ()! = null) { future.getExecutor().execute(() -> notifyTimeout(future)); } else { notifyTimeout(future); Private void notifyTimeout(DefaultFuture future) {// build timeoutResponse timeoutResponse = new Response(future.getId()); timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); DefaultFuture.received(future.getChannel(), timeoutResponse, true); }}}Copy the code

6. Summary

Mainly introduced the main process of RPC calls, Dubbo framework of remote communication network model, explained the main architecture of Exchange layer implementation, as well as the Exchanger/ExchangeChannel ExchangeHandler related implementation.