Just as many standard architectural patterns are supported by various proprietary frameworks, common data-processing patterns are often good candidates for goal realization, saving developers a lot of time and effort. This, of course, applies to the topic of this article: encoding and decoding, or the transformation of data from one protocol-specific format to another. These tasks will be handled by components commonly called codecs. Netty provides a variety of components that simplify the process of creating custom codecs to support a wide range of protocols. For example, if you’re building a NetTy-based mail server, You’ll find Netty’s support for codecs invaluable for implementing POP3, IMAP, and SMTP

0 What is a codec

Every web application must be defined

  • How do you parse raw bytes that are transferred back and forth between two nodes
  • How to convert it to the target application’s data format

This conversion logic is handled by the codec, which consists of an encoder and a decoder, each of which can convert a byte stream from one format to another

So what’s the difference? If you think of a message as a structured sequence of bytes with specific meaning for a particular application — its data. The encoder converts the message into a format suitable for transmission (most likely a byte stream); The decoder converts the network byte stream back to the application’s message format. Thus, the encoder operates on outbound data, while the decoder processes inbound data. With this background information in mind, let’s examine the classes provided by Netty for implementing these two components.

1 Overview of Netty decoding

1.1 Two Questions


In this section, we’ll examine the decoder classes provided by Netty, which cover two different use cases

  • Decode bytes into messages – ByteToMessageDecoder and ReplayingDecoder
  • Decode one message type into another – MessageToMessageDecoder

Since the decoder is responsible for converting inbound data from one format to another, it shouldn’t surprise you to know that Netty’s decoder implements ChannelInboundHandler. When is the decoder used? It’s simple: it’s used whenever inbound data needs to be converted to the next channel-InboundHandler in the ChannelPipeline. In addition, thanks to ChannelPipeline’s design, multiple decoders can be linked together to implement arbitrarily complex conversion logic, This is also a good example of how Netty supports modularization and reuse of code



Abstract decoder ByteToMessageDecoder

2.1 the sample

Decoding a byte into a message (or another sequence of bytes) is such a common task that Netty specifically provides an abstract base class for it :ByteToMessageDecoder Since it’s impossible to know if a remote node will send the entire message all at once, So this class buffers the inbound data until it’s ready to process it

ByteToMessageDecoderAPI






ByteBuf
ChannelPipeline
ChannelInboundHandler



ByteToMessageDecoder


ToIntegerDecoder








The ToIntegerDecoder class extends Bytetommessage Decoder



ByteToMessageDecoder
readInt()


2.2 Source Code Parsing


Decoding steps

2.2.1 Cumulative byte stream

@Override public void channelRead(ChannelHandlerContext ctx, Object MSG) throws Exception {// Decode based on ByteBuf, if the current Object is not propagated directly down if (MSG instanceof ByteBuf) {CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; Cumulation == null; // If the current accumulator is empty, data is read from the I/O stream for the first time. Cumulation = data; cumulation = data; cumulation = data; Cumulator = cumulator.cumulate(ctx.alloc(), cumulation, data); cumulation = cumulator.alloc (), cumulation, data); } // Call the decoding method of the subclass to parse callDecode(CTX, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation ! = null && ! cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = ! out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); }}Copy the code

Cumulator is



MERGE_CUMULATOR

public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; / / the current pointer back some bytes written, if more than the maximum capacity, expansion the if (cumulation. WriterIndex () > cumulation. MaxCapacity () - in. ReadableBytes () | | cumulation.refCnt() > 1) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain(). // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } // Write the current data to the accumulator buffer.writeBytes(in); // Release the data object in.release(); return buffer; }};Copy the code

2.2.2 Call the decode method of the subclass for parsing

Enter this method to view the source code

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {// As long as the accumulator has data, the loop will continue while (in.isreadable ()) {int outSize = out.size(); If (outSize > 0) {if (outSize > 0) {if (outSize > 0) {if (outSize > 0) {fireChannelRead(CTX, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } // Record the length of the current readable data int oldInputLength = in.readableBytes(); decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } if (outSize == out.size()) {if (oldInputLength == in.readableBytes()) {break; } else { continue; If (oldInputLength == in.readableBytes()) {throw new DecoderException(); StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); }}Copy the code

2.2.2 Propagating the parsed ByteBuf

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation ! = null && ! cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } // Record the length of the current list int size = out.size(); // Propagate decodeWasNull =! out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); }}Copy the code


Reference counts in codecs

For the encoder and decoder: once the message is encoding or decoding, it will be ReferenceCountUtil. Release calls automatically release (the message) If you need to keep reference for later use, Then you can call the ReferenceCountUtil. Retain (message) this will increase the reference count, thus preventing the message be released

3 Analysis based on fixed length decoder

/** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes. For example, if you received the following four fragmented packets: * <pre> * +---+----+------+----+ * | A | BC | DEFG | HI | * +---+----+------+----+ * </pre> * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the * following three packets with the fixed length: * <pre> * +-----+-----+-----+ * | ABC | DEF | GHI | * +-----+-----+-----+ * </pre> */ public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; /** * Creates a new instance. * * @param frameLength the length of the frame */ public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException( "frameLength must be a positive integer: " + frameLength); } this.frameLength = frameLength; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded ! = null) { out.add(decoded); } } /** * Create a frame out of the {@link ByteBuf} and return it. * * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @param in the {@link ByteBuf} from which to read data * @return frame the {@link ByteBuf} which represent the frame or {@code null} if no frame could * be created. */ protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {// Check whether the number of bytes in the current totalizer is smaller than frameLength if (in.readableBytes() < frameLength) {return null; } else { return in.readRetainedSlice(frameLength); }}}Copy the code

4 lines of decoder analysis

Non-discard mode processing

4.1 Locating the End of a Row



4.2 Non-discard mode


4.2.1 Finding a newline Character

4.2.2 No Newline character Is Found



If the resolvable length exceeds the maximum resolvable length, the read pointer moves to the write pointer bit (i.e. discarded) and an exception is propagated

4.3 Discard Mode

Find the newline character


Record how many bytes are currently discarded (discarded + to be discarded this time) Lock newline type Move the read pointer directly to the newline after discarded bytes reset to non-discarded state All bytes are discarded before the fast failure mechanism is triggered

A newline character cannot be found


Moves the read pointer directly to the write pointer by directly recording the currently discarded bytes (discarded + currently readable bytes)

5 Decoder analysis based on delimiter

  • The constructor passes in a series of delimiters that divide the binary stream into complete packets through the decoder


  • Decode method


5.1 Analysis of decoding steps

5.1.1 Line processors

  • Line processor decision


  • Define the location


  • Initialization position


  • Judgment separator


5.1.2 Finding the minimum Separator



Iterates through all delimiters to calculate the length of the packet split by each delimiter

5.1.3 decoding

5.1.3.1 Finding the delimiter


If it is not empty, it indicates that the separator has been found as before, and determines whether the current is in discard mode


Non-discarding mode

It’s obviously false the first time, so it’s not a discard mode



If the current packet is larger than the maximum data length allowed for parsing, skip (discard) the packet with the minimum delimiter.






Discard mode


5.1.3.2 No delimiter found


5.1.3.2.1 Non-discard mode


The number of discarded bytes is recorded when the current readable bytes are larger than the maximum data length allowed to be parsed

5.1.3.2.2 Discard Mode

6. Analysis of decoder parameters based on length domain

The important parameters

  • MaxFrameLength (maximum package length)



    Netty will do some special processing to prevent memory overflow if the packet size exceeds the maximum size

  • LengthFieldOffset (message body length)


    The offset of the length field is lengthFieldOffset. 0 indicates that there is no offset

    ByteBufWhere does it startlengthfield

  • lengthFieldLength


    The length of the domainlengthField length

  • lengthAdjustment



    In some cases, the header may also be included in the length length, or the length field may be followed by something that is not included in the length length, which can be adjusted by lengthAdjustment

  • initialBytesToStrip



    This setting allows you to dynamically split the received ByteBuf by the value of a field in the message that represents the length of the message if the header is not needed for the data passed to subsequent handlers

6.1 Length Based









LengthFieldBasedFrameDecoder


6.2 Based on length truncation

If the application layer decoder does not need to use the length field, we expect Netty to unpack it so



initialBytesToStrip







initialBytesToStrip
No length field

6.3 Based on offset length



magicNumber
protocol version
meta







lengthFieldOffset

6.4 Unpacking Based on adjustable length

In some cases, binary protocols may be designed as follows



header

  • The length field at the front of the packet indicates no offset,lengthFieldOffset0
  • The length of the length field is 3, i.elengthFieldLengthFor 3
  • The length field represents the length of the package that skips the header and takes another parameterlengthAdjustment, the size of the packet length adjustment, the length represented by the value of the length field plus this correction represents the packet with header, here is12 + 2, the header and the package are 14 bytes

6.5 Truncation based on offset adjustable length

A binary protocol has two headers



HDR1
header



HDR1
magicNumber
magicNumber

Parameter Settings

  • The offset in the length field is 1, i.elengthFieldOffset1
  • The length domain has length 2, i.elengthFieldLengthFor 2
  • The length of the inclusion represented by the length field is omittedHDR2But when unpackingHDR2It’s also taken apart by Netty as part of the package,HDR2Of length 1, i.elengthAdjustment1
  • When finished, truncate the first three bytes, i.einitialBytesToStripFor 3

6.6 Truncation of adjustable length of variation based on offset

All the preceding length fields indicate the length of the packet without header. If the length fields include the length of the entire packet, see the following



HDR1
HDR2
1 + 1 + 2 + 12 = 16

Parameter Settings

Because Netty does not know the service situation, it needs to tell Netty how many bytes are added after the length field to form a complete packet. In this case, the length field is 13 bytes, and the length field is 16 bytes. Therefore, minus 3 bytes is the required length for real packet unpacking. LengthAdjustment – 3

If your protocol is based on length, consider not implementing it in bytes, but using it directly, or inheriting it and simply modifying it

Decoder analysis based on length domain

7.1 Construction Method

public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, Boolean failFast) {// omit parameter validation this.byteOrder = byteOrder; this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; this.lengthAdjustment = lengthAdjustment; lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength; this.initialBytesToStrip = initialBytesToStrip; this.failFast = failFast; }Copy the code

Save the pass parameters in the field

  • Whether the byteOrder byte stream represents the data at the big end or the small end for reading in the length field
  • LengthFieldEndOffset Specifies the offset of the first byte immediately following the length field field in the entire packet
  • failFast
    • For true the table reads the length field, the value of TA exceedsmaxFrameLength, just throwTooLongFrameException
    • forfalseThe table is thrown only after it has actually read the bytes represented by the value of the length fieldTooLongFrameException, the default value istrue, do not change the value. Otherwise, memory overflow may occur

7.2 Abstraction for Package unpacking

Specific unpacking protocols only need to be implemented

void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)Copy the code

In table has not opened the data so far, after the package is added to the out list to achieve the package down

  • Level 1 implementation


The overloaded protected method decode implements true unpacking in three steps

1 Calculate the length of data packets to be extracted

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {// Get the actual unadjusted package length long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); if (frameLength < lengthFieldEndOffset) { failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset); } if (frameLength > maxFrameLength) { exceededFrameLength(in, frameLength); return null; }}Copy the code
  • Get the actual byte offset of the length field


  • Adjust the length of the package


  • If the current readable byte has not reached the offset of the length field, it indicates that the length field cannot be read


The getUnadjustedFrameLength method can be overwritten if the value represented by your length field is not a basic int,short, etc


  1. The length of the check
  • If the length of the entire packet is not longer than the length field, an exception is thrown directly



  • The packet length exceeded the maximum packet length and entered the discard mode. Procedure


    • The current readable bytes have been reachedframeLength, skip itframeLengthAfter the packet is discarded, it may be a valid packet
    • The current readable byte is not reachedframeLength“Indicates that unread bytes are also discarded. In the discard mode, the current accumulated bytes are discarded

Bytesttestindicates how many bytes remain to be discarded

  • Finally, callfailIfNecessaryDetermine whether an exception needs to be thrown
    • There is no need to discard subsequent unread bytes (bytesToDiscard == 0) to reset the discard state
      • If fast failure is not set (! failFast), or is set to fail fast and is the first time a large packet error is detected (firstDetectionOfTooLongFrame), throws an exception and lets handler handle it
      • If fast failure is set and a packaging error is detected for the first time, throw an exception and let handler handle it


We can know failFast default is true, but here firstDetectionOfTooLongFrame to true, so, the first detected parcel will throw an exception

3 Processing the discard mode

LengthFieldBasedFrameDecoder decoder method at the entrance there is a piece of code

  • If you are in discard mode, calculate the number of bytes to be discarded first and take the minimum value of current discarded bytes and readable bytes. After discarding, enterfailIfNecessaryIf failFast is set to false, exceptions will not be thrown until failFast is discarded

2 Skips logical processing of the specified length of bytes

After discarding mode processing and length verification are passed

  • Verify whether enough bytes have been read. If yes, extract a complete packet according toinitialBytesToStripOf course, the skipped bytes must not be larger than the packet length, otherwise thrownCorruptedFrameExceptionabnormal

Sampling frame

  • Get the read pointer of the current accumulated data, and then get the actual length of the data packet to be extracted for extraction. After extraction, move the read pointer


  • The extraction process is called onceByteBufretainedSliceAPI, which has no memory copy overhead


    To see from the actual packet extraction, the parameter passed in is an int, so in a custom protocol, if your length field is 8 bytes, the first 4 bytes are of little use

summary

  • If you use Netty and the binary protocol is length based, consider using itLengthFieldBasedFrameDecoderBy adjusting the parameters, it will satisfy you
  • LengthFieldBasedFrameDecoderUnpacking includes valid parameter verification, exception package handling, and finally calling ByteBuf’s retainedSlice to implement unpacking without memory copy

8 Summary of decoder

8.1 ByteToMessageDecoder decoding steps

8.2 Length based decoder steps

8.3 Two Questions