1. Implementation analysis of Transport layer

According to the Dubbo framework, the Transporter and Serializase layers are mainly used for real network communication transmission and serialization of communication content, which is mainly used in remoting/transport packages. After the implementation of the Sano3300 layer, the Transporter and Serializase layers are used for real network communication transmission and serialization of communication content. Can know that network communication can be abstracted as Server/Client/Channel/ChannelHandler several key concepts, the use of all kinds of Abstract Abstract classes in transport packaging according to different communication framework, Communication frameworks such as Netty/Grizzly/Mina etc. The interface design for transport is shown below, which is an extension point and is implemented by default as the NettyTransporter class. The Transporters class is used during Exchange level initialization, where NettyClient and NettyServer objects are returned.

public interface Transporter { @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException; @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException; } public class Transporters { ... public static RemotingServer bind(String url, ChannelHandler... handler) { return bind(URL.valueOf(url), handler); } public static Client connect(URL url, ChannelHandler... handlers) { return getTransporter().connect(url, handler); } public static Transporter getTransporter() {// Load Transporter extension class return ExtensionLoader.getExtensionLoader(Transporter.class) .getAdaptiveExtension(); }... } public class NettyTransporter implements Transporter { @Override public RemotingServer bind(URL url, ChannelHandler Handler) throws RemotingException {// Return the Netty server instance. Return new NettyServer(URL,g handler); } @Override public Client connect(URL url, ChannelHandler Handler) throws RemotingException {// Returns the Netty client instance. Return New NettyClient(URL, handler); }}Copy the code

1.1 Server and Client Design analysis

Both server and client adopt the template design method, providing AbstractServer and AbstractClient, respectively specifying a basic framework for the implementation of server and client. The concrete real class logic is shown as follows, and the actual network communication is carried out at the bottom through Channel. Create server object instance, concrete implementation class to achieve abstract method doOpen, complete the actual server instance creation.

public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer { ... public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); . try { doOpen(); . } catch (Throwable t) { ... } executor = executorRepository.createExecutorIfAbsent(url); } // Implement the abstract method, leaving the inherited subclasses to implement protected abstract void doOpen() throws Throwable; }Copy the code

The server receives the connection, calls the getChannels method in the implementation class to determine if the maximum number of accepted connections has been exceeded, and then calls AbstractPeer#connected to handle the Channel creation/disconnect logic.

public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer { @Override public void connected(Channel ch) throws RemotingException { ... Collection<Channel> channels = getChannels(); if (accepts > 0 && channels.size() > accepts) { ... ch.close(); return; } super.connected(ch); } } public abstract class AbstractPeer implements Endpoint, ChannelHandler {@override public void Connected (Channel ch) throws RemotingException // This is a constructed handler instance handler.connected(ch); }}Copy the code

In the client implementation, you can look at methods related to AbstractClient. Here thread concurrency control is implemented at initialization via a connectLock reentrant lock, and the abstract method doConnect implements the specific connection logic by the concrete implementation class.

public abstract class AbstractClient extends AbstractEndpoint implements Client { public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); . Try {// call the abstract method doOpen(); } catch (Throwable t) { close(); } try { connect(); } catch (RemotingException t) { ... } catch (Throwable t) { ... }} protected void connect() throws RemotingException {connectlock. lock(); try { if (isConnected()) { return; } doConnect(); } catch (RemotingException e) { throw e; } catch (Throwable e) { ... } finally {connectlock. unlock(); } } protected abstract void doConnect() throws Throwable; }Copy the code

When sending information, the client checks whether the client is connected to the server. If not, the client connects to the server. If yes, the client invokes the specific send method of subclasses to send information.

public abstract class AbstractClient extends AbstractEndpoint implements Client { @Override public void send(Object Message, Boolean sent) throws RemotingException {// Check if (needReconnect &&! isConnected()) { connect(); } // Return the implemented Channel by subclass Channel Channel = getChannel(); Channel. send(message, sent); } protected abstract Channel getChannel(); }Copy the code

Taking Netty server and client as an example, codecs and processing classes are set up during Netty channel initialization, so that the processing of each channel will be processed by these codecs and processing classes.

public class NettyClient extends AbstractClient { public NettyClient(final URL url, Final ChannelHandler Handler) throws RemotingException {// Initialize the chain super(URL, wrapChannelHandler(URL, handler)); } @Override protected void doOpen() throws Throwable { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); . bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) Throws Exception {// Initialize the codec NettyCodecAdapter Adapter = new NettyCodecAdapter(getCodec(), getUrl(), nettyClient.this); // initialize pipeline, Pipeline ().addlast ("logging",new LoggingHandler(loglevel.info)).addlast ("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("handler", nettyClientHandler); }}); . } } public class NettyServer extends AbstractServer implements RemotingServer { ... @Override protected void doOpen() throws Throwable { bootstrap.group(bossGroup, workerGroup) .channel(NettyEventLoopFactory.serverSocketChannelClass()) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { .... // initialize pipeline, Pipeline ().addLast("decoder", adapter.getdecoder ()).addlast ("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); }}); }}Copy the code

1.2 Channel processing implementation analysis

In network communication, after the two parties establish a connection, the Dubbo framework abstracts as ChannelHandler to deal with the creation/disconnection/message sending/message receiving/channel abnormality, etc. Dubbo provides a large number of handlers to carry features and extensions. These handlers will eventually be associated with the underlying communication framework, such as Netty. A complete RPC call traverses a series of handlers, which, if mounted directly into the underlying framework, results in a link process that triggers a large number of chain-searches and events, which is inefficient and wasteful.

@SPI
public interface ChannelHandler {
  
    void connected(Channel channel) throws RemotingException;
  
    void disconnected(Channel channel) throws RemotingException;

    void sent(Channel channel, Object message) throws RemotingException;

    void received(Channel channel, Object message) throws RemotingException;
  
    void caught(Channel channel, Throwable exception) throws RemotingException;
}
Copy the code

Some common processing classes are shown below.

  • ExchangeHandlerAdapter: used to find microservice method calls;
  • HeaderExchangeHandler: Encapsulates Request and Response processing
  • DecodeHandler: Supports decoding in the Dubbo thread pool
  • ChannelHandlerDisptcher: Encapsulates multiple Handler broadcast calls
  • AllChannelHandler: Supports Dubbo thread pools to invoke business methods
  • HeartbeatHandler: Supports heartbeat processing
  • MultiMessageHandler: Support multi message message batch processing in stream
  • ConnnectionOrderedChannelHandler: single thread pool processing TCP connect and disconnect
  • MessageOnlyChannelHandler: only in the thread pool to receive a message processing, other events in the I/O thread processing
  • WrappedChannelHandler: Encapsulates and shares thread pool capabilities, such as logging thread pools, based on memory key-value storage
  • NettyServerHandler: Encapsulates Netty server events and handles connections, disconnections, reads, writes, and exceptions
  • NettyClientHandler: Encapsulates Netty client events and handles connections, disconnections, reads, writes, and exceptions

The HeaderExchangeHandler handles the class, and the next processing node of the HeaderExchangeHandler is the ExchangeHandlerAdapter handling class. Once wrapped in a proxy class, these processors can be connected in a chain of responsibility manner.

public interface ChannelHandlerDelegate extends ChannelHandler { ChannelHandler getHandler(); } //Handler proxy abstract class, Can be set up under the chain of responsibility a node public abstract class AbstractChannelHandlerDelegate implements ChannelHandlerDelegate {protected ChannelHandler handler; protected AbstractChannelHandlerDelegate(ChannelHandler handler) { this.handler = handler; } @Override public ChannelHandler getHandler() { if (handler instanceof ChannelHandlerDelegate) { return ((ChannelHandlerDelegate) handler).getHandler(); } return handler; }}Copy the code

The final effect is shown below. Set up the processing link in the ChannelPipeline, and process the incoming and outgoing communication content through these processing classes in turn.

public class ChannelHandlers { public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler (ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }}Copy the code

Dispatcher thread pool is the dispenser, can be understood as a ChannelHandler have thread pool distribution ability, such as AllChannelHandler, MessageOnlyChannelHandler and ExcutionChannelHandler etc., It does not have thread dispatch capability. Dispatcher is an extension point of Dubbo to generate dynamic handlers for different usage scenarios, where Dubbo supports the following distribution strategies.

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"})
    ChannelHandler dispatch(ChannelHandler handler, URL url);

}
Copy the code
  • AllDispatcher: dispatches all I/O events to the Dubbo thread pool. Dubbo is enabled by default.
  • Connection: a separate thread pool handles disconnection events, separate from the Dubbo thread pool;
  • DirectDispatcher: All method calls and event handling are in the I/O thread;
  • MessageOnlyChannelHandler: only in the thread pool processing request and respond to events, the other is in the I/O thread pool;
  • MockDispatcher: Returns by default.

Closely related to the thread pool distribution is the realization of the thread pool, Dubbo framework ThreadPool threads in the pool is also a SPI extension points that have FixedThreadPool/LimitedThreadPool wait for a variety of specific implementation, can according to the related parameters in the URL decided to use a specific thread pool implementation. In an application can have multiple thread pool, use the DefaultExecutorRepository storage here, In AbstractServer/AbstractClient invokes DefaultExecutorRepository# createExecutorIfAbsent to create a thread pool.

@spi ("fixed") public interface ThreadPool {// Determine @adaptive ({THREADPOOL_KEY}) Executor getExecutor(URL) according to the ThreadPool parameter on the URL  url); } public class DefaultExecutorRepository implements ExecutorRepository { public synchronized ExecutorService createExecutorIfAbsent(URL url) { String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url)); return executor; } // Create a thread pool according to the URL private ExecutorService createExecutor(URL URL) {return (ExecutorService) ExtensionLoader .getExtensionLoader(ThreadPool.class).getAdaptiveExtension() .getExecutor(url); } } public abstract class AbstractServer extends AbstractEndpoint implements RemotingServer { public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { ... / / initialize the server thread pool executor. = executorRepository createExecutorIfAbsent (url); }Copy the code

The Dubbo framework provides several different thread pool implementations:

  • FixedThreadPool: Create a thread pool that reuses a fixed number of threads.
  • LimitedThreadPool: Creates a thread pool in which the number of threads increases dynamically as the number of threads is required, but the number does not exceed the configured threshold. Idle threads are not collected and will always exist.
  • EagerThreadPool: Creates a thread pool that creates new threads to execute new tasks when all core threads are busy, rather than putting the tasks on a thread pool blocking queue;
  • CachedThreadPool: Create an adaptive thread pool that is reclaimed when a thread is idle for 1 minute and created when a new request arrives.

2. Implementation principle of protocol parsing and codec

2.1 Dubbo Protocol Parsing

The efficient and exquisite communication protocol design directly determines the performance, stability and expansibility of framework communication. The design of Dubbo protocol refers to the existing TCP/IP protocol, which can be divided into protocol header and protocol body.

  • Message header: length of 16 bytes, including magic number (0xdabb) used to process sticky packet segmentation, current Request message is Request, Response, heartbeat and event information, serialization protocol number in the current message, Request status, unique ID of the Request, and length of message format.
  • Protocol body: includes dubbo version, service name, service version, method name, method parameter type, and method parameter.

2.2 Encoding implementation analysis

I/O encoding and decoding processes have been defined in Codec2. The main methods are encode and decode, as defined below.

@SPI
public interface Codec2 {
    @Adaptive({Constants.CODEC_KEY})
    void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException;

    @Adaptive({Constants.CODEC_KEY})
    Object decode(Channel channel, ChannelBuffer buffer) throws IOException;
}
Copy the code

AbstractCodec mainly provides basic capabilities, such as verifying message length and finding specific codecs. DubboCodec inherits ExchangeCodec, and ExchangeCodec inherits TelnetCodec.

In Dubbo’s RPC request construction, the encoder mainly encodes the Java object and programs the byte stream back to the upper-level client, including constructing the header of the message and then serializing the message body. All the processing of message headers is implemented in Exchange Dec# encode, including request and reply two encoding processing.

public class ExchangeCodec extends TelnetCodec { @Override public void encode(Channel channel, ChannelBuffer buffer, Obj ect msg) throws IOException { if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { encodeResponse(channel, buffer, (Response) msg); } else { super.encode(channel, buffer, msg); } } protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {// Obtain the specified or default Serialization protocol Serialization = getSerialization(channel); Byte [] header = new byte[HEADER_LENGTH]; Bytes.short2bytes(MAGIC, header); / / in the third byte header [2] = (byte) (FLAG_REQUEST | serialization. GetContentTypeId ()); . Long2bytes (req.getid (), header, 4); int savedWriteIndex = buffer.writerIndex(); // Skip the 16 bytes of the header buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); // Serialize ObjectOutput out = serialization.serialize(channel.geturl (), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); Equestdata (channel, out, req.getData(), req.getVersion())); }... int len = bos.writtenBytes(); // check whether the size exceeds the default 8MB checkPayload(channel, len); // Write the length of the message to bytes.int2bytes (len, header, 12); buffer.writerIndex(savedWriteIndex); // Write the complete buffer. WriteBytes (header); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, Equestdata (out, data);} // Request request request request request request request request request request request. } } public class DubboCodec extends ExchangeCodec { protected void encodeRequestData(Channel channel, ObjectOutput out, Object Data, String version) throws IOException {// Every remote invocation is encapsulated as RpcInvocation, Get the RpcInvocation inv = (RpcInvocation) data; // Write framework version out.writeutf (version); String serviceName = inv.getAttachment(INTERFACE_KEY); if (serviceName == null) { serviceName = inv.getAttachment(PATH_KEY); } // write interface serviceName out.writeutf (serviceName); // Write service version out.writeutf (inv.getAttachment(VERSION_KEY)); WriteUTF (inv.getMethodName()); // Write the call method name out.writeutf (inv.getMethodName()); / / write method parameter types describe out writeUTF (inv. GetParameterTypesDesc ()); Object[] args = inv.getArguments(); if (args ! For (int I = 0; i < args.length; i++) { out.writeObject(encodeInvocationArgument(channel, inv, i)); }} // Write the implicit attachments parameter out.writeattachments (inv.getobjectattachments ()); }Copy the code

After the encoding request is processed, you can see that the encoding of the response is implemented in Exchange DEC# encodeResponse, and the specific encoding of the response is implemented in Dubbodec# encodeResponseData.

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException { int savedWriteIndex = buffer.writerIndex(); Try {// Get the specified or default Serialization protocol (default: Hessian2) Serialization = getSerialization(channel); Byte [] header = new byte[HEADER_LENGTH]; Short2bytes (MAGIC, header); // Take up 2 Bytes to store bytes. short2bytes(MAGIC, header); // Store the response flag in the third byte status = res.getStatus(); header[3] = status; Long2bytes (res.getid (), header, 4); Buffer. WriterIndex (savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (status == Response.OK) { if (res.isHeartbeat()) { encodeEventData(channel, out, res.getResult()); } else {// Give the subclass override method to implement encodeResponseData(channel, out, res.getresult (), res.getVersion()); } } else { out.writeUTF(res.getErrorMessage()); }... int len = bos.writtenBytes(); CheckPayload (channel, len); Bytes.int2bytes(len, header, 12); buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } catch (Throwable t) { buffer.writerIndex(savedWriteIndex); if (! res.isEvent() && res.getStatus() ! = Response.BAD_RESPONSE) { Response r = new Response(res.getId(), res.getVersion()); r.setStatus(Response.BAD_RESPONSE); If (t instanceof ExceedPayloadLimitException) {try {/ / told here is beyond the limit abnormal r.s etErrorMessage (t.g etMessage ()); channel.send(r); return; } catch (RemotingException e) { } } else { ... }}... } } public class DubboCodec extends ExchangeCodec { ... protected void encodeResponseData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { Result result = (Result) data; / / whether the Version of the client request to support the server returns a Boolean attach = Version. IsSupportResponseAttachment (Version); Throwable th = result.getException(); Object ret = result.getValue(); if (th == null) {Object ret = result.getValue(); if (ret == null) { out.writeByte(attach ? RESPONSE_NULL_VALUE_WITH_ATTACHMENTS : RESPONSE_NULL_VALUE); } else { out.writeByte(attach ? RESPONSE_VALUE_WITH_ATTACHMENTS : RESPONSE_VALUE); out.writeObject(ret); }} else {// If an error or exception is thrown, mark the call exception and serialize the exception message out.writeByte(attach? RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS : RESPONSE_WITH_EXCEPTION); out.writeThrowable(th); }}... }Copy the code

Similar to the encoding, there are two parts: the header (16 bytes) of the message is decomposed, then the decoding of the message content and how to convert the message content to a RpcInvocation object, which fires the exchange dec#decode method when the server decoding occurs.

public class ExchangeCodec extends TelnetCodec { @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; // Read the 16-byte buffer. ReadBytes (header); return decode(channel, buffer, readable, header); } @Override protected Object decode(Channel channel, ChannelBuffer buffer, int readable, Byte [] header) throws IOException {// If the start of the byte stream is not Dubbo magic if (readable > 0 && header[0]! = MAGIC_HIGH || readable > 1 && header[1] ! = MAGIC_LOW) { int length = header.length; If (header.length < readable) {header = bytes.copyof (header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; } } return super.decode(channel, buffer, readable, header); } if (readable < HEADER_LENGTH) {return decoderesult. NEED_MORE_INPUT; Int len = bytes. bytes2int(header, 12); checkPayload(channel, len); Int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); DecodeBody (channel, is, header); decodeBody(channel, is, header); } the finally {/ / if a problem in decoding process, skip the RPC calls message if (is) available () > 0) {try {StreamUtils. SkipUnusedStream (is); } catch (IOException e) { ... } } } } protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException { byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK); long id = Bytes.bytes2long(header, 4); If ((flag & FLAG_REQUEST) == 0) {Response res = new Response(id); if ((flag & FLAG_EVENT) ! = 0) { res.setEvent(true); } byte status = header[3]; res.setStatus(status); ObjectInput in = codecsupport. deserialize(channel.geturl (), is, proto); if (status == Response.OK) { Object data; if (res.isHeartbeat()) { data = decodeHeartbeatData(channel, in); } else if (res.isEvent()) { data = decodeEventData(channel, in); } else { data = decodeResponseData(channel, in, getRequestData(id)); } res.setResult(data); } else { res.setErrorMessage(in.readUTF()); } } catch (Throwable t) { res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } return res; } else {Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) ! = 0); if ((flag & FLAG_EVENT) ! = 0) { req.setEvent(true); } try {// deserectinput in = codecsupport. deserialize(channel.geturl (), is, proto); Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, in); } else if (req.isEvent()) { data = decodeEventData(channel, in); } else { data = decodeRequestData(channel, in); } req.setData(data); } catch (Throwable t) { req.setBroken(true); req.setData(t); } return req; }}}Copy the code

3. Summary

This paper mainly explains the implementation principle of Dubbo network communication layer, introduces the Client and Server initialization process, channel processing class related implementation, thread distribution strategy and thread pool implementation, also explains the design of Dubbo communication protocol and the implementation principle of codec.

reference

Zhuanlan.zhihu.com/p/98562180 Dubbo protocol

www.jianshu.com/p/cc7597dfe…

www.jianshu.com/p/abfa29c01… CompetableFutrue depth solution

Blog.csdn.net/u010013573/… dubbo