This is the sixth day of my participation in the August Text Challenge.More challenges in August

Stick/unpack

MTU Maximum transmission unit size and MSS maximum segment size

Maxitum Transmission Unit (MTU) is the maximum size of data transmitted at one time on the link layer. MTU is generally 1500 bytes in size. Maximum sesize (MSS) refers to the Maximum TCP packet segment length, which is the Maximum data Size that the transport layer sends at a time. The calculation relationship between MTU and MSS is as follows: MSS = MTU-IP header – TCP header. If MSS + TCP header + IP header > MTU, packets are split into multiple packets and sent. This is the unpacking phenomenon

The sliding window

Sliding window, also known as notification window, is an effective measure of TCP transport layer for traffic control. The sliding window is the window size set by the data receiver, and then the receiver will tell the sender the window size, so as to limit the size of the data sent by the sender each time, so as to achieve the purpose of traffic control. All data frames are numbered. TCP does not reply an ACK for each segment. Instead, IT replies an ACK for multiple segments.

Nagle algorithm

It is used to solve network congestion caused by frequent sending of small data packets. Nagle algorithm can be understood as batch sending, which is also an optimization idea often used in our daily programming. It writes data to the buffer before it is confirmed, and then sends data packets after data confirmation or buffer accumulate to a certain size. Linux has Nagle enabled by default, which can effectively reduce network overhead in the case of large numbers of small packets. You can disable the Nagle algorithm with the TCP_NODELAY parameter provided by Linux. The Nagle algorithm is disabled by default in Netty to minimize data transfer latency, contrary to the default behavior of the Linux operating system.

The solution

The only way to resolve unpacking/sticking is to define the communication protocol at the application layer.

Message length fixed

Each data message needs a fixed length. When the receiver accumulatively reads fixed-length packets, it considers that it has obtained a complete message. When the data of the sender is less than a fixed length, the vacancy completion is required.

It is very simple to use, but the disadvantages are also very obvious, it is not good to set a fixed length value, if the length is too large will cause byte waste, too small will affect the message transmission, so in general, the message fixed length method will not be adopted.

Specific separator

By adding a specific delimiter to the end of each sent message, the receiver can split the message according to the special delimiter.

The delimiter must be added at the end of the packet. Therefore, the delimiter must be different from the characters in the message body. Otherwise incorrect message splitting may occur. The preferred approach is to encode the message, such as base64 encoding, and then select characters other than 64 encoded characters as specific delimiters. Specific separators are efficient in situations where the message protocol is simple enough, such as Redis, which uses newline separators in communication.

Message length + message content

The total length of the message stored in the message header, the actual binary byte data of the message body.

The way message length + message content is used is very flexible and does not have the obvious drawbacks of fixed-length messages and specific delimiters. Of course, in the header, you can not only store the length of the message, but also customize other necessary extension fields, such as message version, algorithm type, and so on.

Netty implements custom protocol communication

Communication Protocol design

The magic number

Prevents anyone from randomly sending data to the server’s port. The Class file starts with the magic number 0xCAFEBABE, which is verified when the Class file is loaded.

Protocol Version number

Different versions of protocols have different resolution methods

Serialization algorithm

The serialization algorithm field indicates how the data sender should convert the requested object to binary, and how to convert binary to object again

Message type

In the RPC framework, there are request, response, and heartbeat packets. In IM, there are login, group chat creation, message sending, message receiving, and group chat exit packets

Length field

The length field represents the length of the requested data. The receiver obtains a complete packet according to the length field.

The request data

Request data is usually a serialized binary stream, and the content of each request data is different.

state

The status field is used to identify whether the request is healthy. Usually set by the called party. For example, when an RPC call fails, the status field can be set to an exception state by the service provider.

Keep field

Reserved fields are optional. To cope with the possibility of protocol upgrade, you can reserve several bytes of reserved fields for emergencies.

Netty implementation

Netty common encoder type

Once the encoder object is encoded as a byte stream MessageToByteEncoder

Secondary codec Codes one message type into another message type MessageToMessageEncoder Codes one message type into another message type

  • MessageToByteEncoder

    @Override
    
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            if (acceptOutboundMessage(msg)) { // 1. Check whether the message types match
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                buf = allocateBuffer(ctx, cast, preferDirect); // 2. Allocate the ByteBuf resource
                try {
                    encode(ctx, cast, buf); // 3. Execute encode method to encode data
                } finally {
                    ReferenceCountUtil.release(cast);
                }
                if (buf.isReadable()) {
                    ctx.write(buf, promise); // 4. Pass the write event backwards
                } else {
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else{ ctx.write(msg, promise); }}catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if(buf ! =null) { buf.release(); }}}public class StringToByteEncoder extends MessageToByteEncoder<String> {
            @Override
            protected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception { byteBuf.writeBytes(data.getBytes()); }}Copy the code

    MessageToByteEncoder overrides the write() method of ChanneOutboundHandler

    • AcceptOutboundMessage Checks whether a matching message type exists. If a matching message type exists, an encoding process is performed. If a matching message type does not exist, the message is forwarded to the next ChannelOutboundHandler.
    • Allocate ByteBuf resources, using off-heap memory by default;
    • Call subclasses implement encode method to complete data coding, coding, once the message has been successfully through call ReferenceCountUtil. Release (cast) automatic release;
    • If ByteBuf is readable, the data has been successfully encoded and written to the ChannelHandlerContext to the next node. If ByteBuf is unreadable, the ByteBuf resource is freed and the empty ByteBuf object is passed down.
  • MessageToMessageEncoder

    MessageToMessageEncoder is to convert a Message from one format to another format. The second Message can refer to any object. If the object is ByteBuf, it is consistent with the implementation principle of MessageToByteEncoder. The output result of MessageToByteEncoder is a list of objects. The encoded result belongs to the intermediate object and will still be converted into ByteBuf for transmission.

    Implementation subclasses are StringEncoder, LineEncoder and so on

    public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
        private final Charset charset;
        /** * Creates a new instance with the current system character set. */
        public StringEncoder(a) {
            this(Charset.defaultCharset());
        }
        /** * Creates a new instance with the specified character set. */
        public StringEncoder(Charset charset) {
            this.charset = ObjectUtil.checkNotNull(charset, "charset");
        }
        @Override
        protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
            if (msg.length() == 0) {
                return; } out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset)); }}Copy the code

Common Netty decoder type

A decoder Byte stream decoding into a message object: ByteToMessageDecoder/ReplayingDecoder

  • ByteToMessageDecoder

    public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter {
        protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
        protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if(in.isReadable()) { decodeRemovalReentryProtection(ctx, in, out); }}}Copy the code

    Decode () is an abstract method that the user must implement, passing in the received data ByteBuf when called, along with a List to add to the encoded message. Due to TCP sticky packets, ByteBuf may contain multiple valid packets or not one complete packet. Netty calls the decode() method repeatedly until no new full packets are decoded to add to the List, or until there is no more data to read by ByteBuf. If the List is not empty at this point, it is passed to the next ChannelInboundHandler in the ChannelPipeline.

    DecodeLast is called once after a Channel has closed, mainly to process the last remaining bytes of ByteBuf data. The default implementation of decodeLast in Netty simply calls the decode() method. If you have special business requirements, you can extend the custom logic by overriding the decodeLast() method.

    ByteToMessageDecoder Another abstract subclass is ReplayingDecoder. It encapsulates buffer management and you no longer need to check the length of bytes when reading buffer data. Because ReplayingDecoder will stop decoding if there are not enough bytes of data. ReplayingDecoder performance is slower than ByteToMessageDecoder and is not recommended in most cases.

Secondary decoder A message type decoder into two message types MessageToMessageDecoder

  • MessageToMessageDecoder

    MessageToMessageDecoder does not cache data packets. It is mainly used to transform message models. The recommended approach is to use ByteToMessageDecoder to resolve TCP protocol, to solve the problem of unpacking/sticky packet. Valid ByteBuf data is obtained by parsing, and then passed to the subsequent MessageToMessageDecoder to convert data objects

Common codecs

FixedLengthFrameDecoder

The constructor sets the size of the fixed length frameLength, strictly following the frameLength decoding

Special delimiter decoder DelimiterBasedFrameDecoder

Delimiters specify a special separator that is written by writing ByteBuf as an argument. The type is a ByteBuf array. Multiple arrays can be specified at the same time, and the shortest delimiter is selected for splitting. When multiple separator DelimiterBasedFrameDecoder will degenerate into use LineBasedFrameDecoder parsing

MaxLength Indicates the maximum length of a packet. If the maxLength is exceeded and the specified delimiter is not detected, TooLongFrameException is thrown

FailFast controls when to throw TooLongFrameException. If true, if maxLength is exceeded, TooLongFrameException will be thrown immediately without further decoding. If false, a TooLongFrameException will not be thrown until a complete message is decoded

StripDelimiter Determines whether delimiters are removed from decoded messages

The length field decoder LengthFieldBasedFrameDecoder

Through the LengthFieldBasedFrameDecoder RocketMQ decoding, decoder characteristic mainly has a length field similar to the other decoder and attribute

  • Length domain decoder specific values
// The offset of the length field, which is where the length data is stored
private final int lengthFieldOffset; 
// The number of bytes occupied by the length field
private final int lengthFieldLength; 
In many more complex protocol designs, the length field contains not only the length of the message, but also other data, such as version number, data type, data state, etc. LengthAdjustment = The length value of the package - the length field value * */
private final int lengthAdjustment; 
// The initial number of bytes to skip after decoding, which is the starting position of the message content field
private final int initialBytesToStrip;
LengthFieldOffset = lengthFieldOffset + lengthFieldLength
private final int lengthFieldEndOffset;
Copy the code
  • Similar properties
private final int maxFrameLength; // Maximum length of a packet
private final boolean failFast; // Whether to throw TooLongFrameException immediately, with maxFrameLength
private boolean discardingTooLongFrame; // Whether it is in discard mode
private long tooLongFrameLength; // The number of bytes to discard
private long bytesToDiscard; // Total number of bytes discarded
Copy the code

WriteAndFlush processing

DefaultChannelPipeline

@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}
Copy the code

AbstractChannelHandlerContext

private void write(Object msg, boolean flush, ChannelPromise promise) {
        ObjectUtil.checkNotNull(msg, "msg");
        try {
            if (isNotValidPromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return; }}catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
         // Find the next ChannelHandler node of type Outbound in the Pipeline list
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        // Check whether the current thread is a thread in NioEventLoop
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else{ next.invokeWrite(m, promise); }}else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if(! safeExecute(executor, task, promise, m, ! flush)) {// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.task.cancel(); }}}Copy the code

AbstractChannelHandlerContext will complete the default initialization a ChannelPromise asynchronous operations, hold the current Channel and EventLoop ChannelPromise internal

  • Call the findContextOutbound method to find the ChannelHandler of the next Outbound type in the Pipeline list. The next Outbound node in our simulated scenario is ResponseSampleEncoder.
  • The inEventLoop method identifies the current thread. If the current thread is the same thread that EventLoop assigned to the current Channel, the submitted task will be executed immediately. Otherwise, the current operation will be encapsulated as a Task and placed in the EventLoop Task queue for later execution.
  • Execute next. InvokeWriteAndFlush (m, promise), Eventually it will perform a ChannelHandler node under the write method, process to the AbstractChannelHandlerContext repeat again write method, continue to look for the next Outbound node.

The Head Write Buffer queue

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    unsafe.write(msg, promise);
}

@Override
public final void write(Object msg, ChannelPromise promise) {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        try {
            // release message now to prevent resource-leak
            ReferenceCountUtil.release(msg);
        } finally {
            // If the outboundBuffer is null we know the channel was closed and so
            // need to fail the future right away. If it is not null the handling of the rest
            // will be done in flush0()
            // See https://github.com/netty/netty/issues/2362
            safeSetFailure(promise,
                           newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
        }
        return;
    }

    int size;
    try {
        msg = filterOutboundMessage(msg);// Filter messages
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0; }}catch (Throwable t) {
        try {
            ReferenceCountUtil.release(msg);
        } finally {
            safeSetFailure(promise, t);
        }
        return;
    }

    outboundBuffer.addMessage(msg, size, promise);// Add data to Buffer
}
Copy the code
  • The filterOutboundMessage method filters the MSG written to it and converts it to DirectByteBuf if the MSG is not using DirectByteBuf.
  • ChannelOutboundBuffer can be understood as a cache structure, from the last line source outboundBuffer. Can be seen addMessage is added to the cache data, So the ChannelOutboundBuffer is the key to understanding data sending.
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }
    incrementPendingOutboundBytes(entry.pendingSize, false);
}
Copy the code

The ChannelOutboundBuffer cache is a linked list structure in which each incoming data is encapsulated as an Entry object and added to the list. The ChannelOutboundBuffer contains three Pointers that are very important: flushedEntry, the first node written to the buffer, unflushedEntry, the first node not written to the buffer, and tailEntry, the last node.

AddMessage method will be called after each write data incrementPendingOutboundBytes method to judge the cache, the water level

private static final int DEFAULT_LOW_WATER_MARK = 32 * 1024;
private static final int DEFAULT_HIGH_WATER_MARK = 64 * 1024;
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    // Determine whether the cache size exceeds the high watermark
    if(newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); }}Copy the code

IncrementPendingOutboundBytes logic is very simple, each time adding data will accumulate the number of bytes of data, and then determine whether the cache size set by the high water level more than 64 KB, if more than the high water level, so the Channel will be set to not write. A Channel does not return to a writable state until the cached data size falls below the low water mark of 32KB.

Flush Buffer queue

Flush is triggered by invokeFlush0 when the write operation is complete, and like write, flush is propagated from the Tail node to the Head node

// HeadContext # flush
@Override
public void flush(ChannelHandlerContext ctx) {
    unsafe.flush();
}
// AbstractChannel # flush
@Override
public final void flush(a) {
    assertEventLoop();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }
    outboundBuffer.addFlush();
    flush0();
}

// ChannelOutboundBuffer # addFlush
public void addFlush(a) {
    Entry entry = unflushedEntry;
    if(entry ! =null) {
        if (flushedEntry == null) {
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if(! entry.promise.setUncancellable()) {int pending = entry.cancel();
                // Subtract the data to be sent. If the total number of bytes falls below the low water level, the Channel becomes writable
                decrementPendingOutboundBytes(pending, false.true);
            }
            entry = entry.next;
        } while(entry ! =null);
        unflushedEntry = null; }}// AbstractNioUnsafe # flush0
@Override
protected final void flush0(a) {
    if(! isFlushPending()) {super.flush0(); }}// AbstractNioByteChannel # doWrite
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = config().getWriteSpinCount();
    do {
        Object msg = in.current();
        if (msg == null) {
            clearOpWrite();
            return;
        }
        writeSpinCount -= doWriteInternal(in, msg);
    } while (writeSpinCount > 0);
    incompleteWrite(writeSpinCount < 0);
}
Copy the code