1. Introduction

In the previous analysis of how the Consumer initiates a remote service call, DubboInvoker eventually uses an ExchangeClient client to send network requests. Dubbo encapsulates the network Request and sends it as a Request object, but the network always sends a sequence of bytes, and the Request object must be encoded before it can be sent. Similarly, after receiving the Request from the client, the server must decode to obtain the Request object, and the same is true for Response.

Dubbo network communication protocol is divided into two parts, namely Header and Body. The Header part adopts Codec encoding and decoding, and the Body part adopts serialization. This article looks at the details of Dubbo’s codec and serialization of messages.

2. The codec

Dubbo uses Netty as the network transport layer framework by default, so we also take Netty as an example to analyze from two perspectives of client coding and server decoding respectively.

2.1 Encoder

Encoding a message is much easier than decoding because you don’t have to worry about TCP sticky/unpack. To see what happens to the sent Request object, we need to start with the NettyClient.

With Netty, the most important thing as a developer is to design the ChannelHandler and orchestrate the ChannelHandler pipeline. In general, inbound data needs to be decoded first and outbound data needs to be encoded eventually, so the codec processor is placed in the header of the Pipeline, which is what Dubbo does.

ch.pipeline()
    .addLast("decoder", adapter.getDecoder())
    .addLast("encoder", adapter.getEncoder())
    .addLast("client-idle-handler".new IdleStateHandler(heartbeatInterval, 0.0, MILLISECONDS))
    .addLast("handler", nettyClientHandler);

Copy the code

Dubbo puts the InternalEncoder in the header of the Pipeline, which depends on Exchange Dec, so let’s go straight to Exchange Dec #encodeRequest(). The encoding is divided into two parts: the Header is set according to the protocol, and the Data in the Request is serialized and written into the Body according to the serialization strategy.

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    // The serialization policy defaults to hessian2
    Serialization serialization = getSerialization(channel);
    // Protocol Header 16 bytes
    byte[] header = new byte[HEADER_LENGTH];
    // 2 bytes magic number
    Bytes.short2bytes(MAGIC, header);
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    if (req.isTwoWay()) {
        header[2] |= FLAG_TWOWAY;
    }
    if (req.isEvent()) {
        header[2] |= FLAG_EVENT;
    }
    // RequestId global increment
    Bytes.long2bytes(req.getId(), header, 4);
    int savedWriteIndex = buffer.writerIndex();
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    // serialize Data
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    if (req.isEvent()) {// Event message
        encodeEventData(channel, out, req.getData());
    } else {
        encodeRequestData(channel, out, req.getData(), req.getVersion());
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();
    int len = bos.writtenBytes();
    // Check whether the Body is too large
    checkPayload(channel, len);
    Bytes.int2bytes(len, header, 12);
    buffer.writerIndex(savedWriteIndex);
    buffer.writeBytes(header); // write header.
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
Copy the code

For the Dubbo network communication protocol section, please refer to the previous article.

Note that the last four bytes of the Header record BodyLength, which is used by the server to solve the problem of TCP sticky unpacking.

2.2 Decoder

Decoding is much more complicated than encoding, because of the TCP sticky unpacking scenario.

As with encoding, for server decoding, we will start with NettyServer, following is the setup of Dubbo Pipeline.

ch.pipeline()
    .addLast("decoder", adapter.getDecoder())
    .addLast("encoder", adapter.getEncoder())
    .addLast("server-idle-handler".new IdleStateHandler(0.0, idleTimeout, MILLISECONDS))
    .addLast("handler", nettyServerHandler);

Copy the code

The only difference is the final Handler, one for the client logic and one for the server logic.

Dubbo puts InternalDecoder in the header of the Pipeline, which inherits from the ByteToMessageDecoder abstract class provided by Netty. It relies on DubboCountCodec and does not handle decoding logic itself, just a simple loop to read multiple messages.

private class InternalDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
        ChannelBuffer message = new NettyBackedChannelBuffer(input);
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        // Loop reading, possibly multiple messages
        do {
            // Save the read index first
            int saveReaderIndex = message.readerIndex();
            Object msg = codec.decode(channel, message);
            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                // If the read data is incomplete, restore the read index and wait for the peer end to send more data
                message.readerIndex(saveReaderIndex);
                break;
            } else {
                if (saveReaderIndex == message.readerIndex()) {
                    throw new IOException("Decode without read data.");
                }
                if(msg ! =null) { out.add(msg); }}}while(message.readable()); }}Copy the code

DubboCountCodec is also a decorator that does not handle decoding logic itself, adding the ability to decode multiple messages and writing the number of decoded messages to attachments.

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int save = buffer.readerIndex();
    // Multiple messages may be received due to network problems
    MultiMessage result = MultiMessage.create();
    do {
        Object obj = codec.decode(channel, buffer);
        if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
            // The read data is incomplete and the peer end needs to send more data
            buffer.readerIndex(save);
            break;
        } else{ result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); }}while (true);
    if (result.isEmpty()) {
        // No message was read
        return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    if (result.size() == 1) {
        return result.get(0);
    }
    return result;
}
Copy the code

Eventually ExchangeCodec#decode() will be called to decode the single message. The Header will be read first, but due to TCP unpacking, the read Header may be incomplete, which will be determined later.

public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    // Number of bytes readable
    int readable = buffer.readableBytes();
    // Header may be incomplete
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    buffer.readBytes(header);
    return decode(channel, buffer, readable, header);
}
Copy the code

Then, the system checks whether the received data meets the requirement of 16 bytes. If the requirement is not met, even the basic Header is incomplete. NEED_MORE_INPUT is returned to indicate that the data is not processed until the peer end sends more data.

// Header is incomplete and needs to wait for the peer end to send more data
if (readable < HEADER_LENGTH) {
    return DecodeResult.NEED_MORE_INPUT;
}
Copy the code

If the Header is read, then start to parse BodyLength to determine whether the Body is read completely. If the Body is not read completely, it still cannot be processed and needs to wait for the peer end to send more data.

// Parse BodyLength from Header
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// Total message length
int tt = len + HEADER_LENGTH;
if (readable < tt) {
    // The message is incomplete, waiting for the peer end to transfer more data
    return DecodeResult.NEED_MORE_INPUT;
}

Copy the code

At this point, a complete message has been received and is ready to be decoded by calling the decodeBody() method. The decoding method is not complicated. The client will write data according to the protocol format, and the server will read the data according to the same format, and finally the Request object. It is important to note that the deserialization of the message Body by Dubbo can be set to the worker thread, which is the default on the business thread, or on the IO thread with the decod.in. IO parameter.

DecodeableRpcInvocation inv;
// Whether deserialization uses IO threads. Default is false
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
    inv = new DecodeableRpcInvocation(channel, req, is, proto);
    inv.decode();// The IO thread deserializes directly
} else {
    inv = new DecodeableRpcInvocation(channel, req,
                                      new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
Copy the code

The received() received Request will be processed by a DecodeHandler. If the IO thread did not deserialize the Request, the business thread will deserialize it and hand it to the Handler.

public void received(Channel channel, Object message) throws RemotingException {
    if (message instanceof Decodeable) {
        decode(message);
    }
    if (message instanceof Request) {
        // Decode Data deserialization
        decode(((Request) message).getData());
    }
    if (message instanceof Response) {
        decode(((Response) message).getResult());
    }
    handler.received(channel, message);
}
Copy the code

If the RPC invocation request is from a Consumer, the decoded result is RPC invocation, which is handled by the ExchangeHandler in the DubboProtocol, which calls the local Invoker, responds, and the process ends.

3. The serialization

Dubbo supports a variety of serialization strategies, such as Java serialization, Hessian2, Kryo, etc. Serialization strategies are also loaded via SPI and can be easily replaced. Serialization is Dubbo’s abstract interface for Serialization. The default Serialization scheme is Hessian2. Serialization and deserialization are the processes of converting Java objects and byte sequences, and Dubbo abstracts this process into two interfaces, ObjectOutput for serialization and ObjectInput for deserialization.

public class Hessian2Serialization implements Serialization {
    @Override
    public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
        return new Hessian2ObjectOutput(out);
    }

    @Override
    public ObjectInput deserialize(URL url, InputStream is) throws IOException {
        return newHessian2ObjectInput(is); }}Copy the code

Hessian2 serialization: Hessian2Output: Hessian2Output: Hessian2Output: Hessian2Output: Hessian2Output: Hessian2Output

4. To summarize

The network always transmits a sequence of bytes, whether a request or a response, which the sender encodes and the receiver decodes. Dubbo uses Netty as the network transport layer framework by default. The message codec is set in the header of the Pipeline. In this way, the outbound data will be encoded by Encoder, and the inbound data will be decoded by Decoder first. Decoding is more complicated than encoding because it handles TCP packet sticking/unpacking. Dubbo’s solution is to write BodyLength four bytes after the Header. The receiver must first read a complete Header and then extract BodyLength. To determine whether the received message is complete. If incomplete, NEED_MORE_INPUT is returned indicating that more data needs to be transferred from the peer end.