Netty source code analysis – coding/decoding

preface

This article takes a look at Netty encoding and decoding of the working principle. A codec is a special kind of Handler. Since it is a Handler, there is a difference between inBound and outBound.

This article is mainly to sort out, from the inbound decoding to business processing, and then to the outbound coding process, so that we have an understanding of the process of encoding and decoding.

Decoder

We randomly choose a Decoder to see the implementation can be, relatively simple. This article is simpler and lighter than the previous ones.

Choose ByteToMessageDecoder see, first of all, the Decoder inherited from ChannelInboundHandlerAdapter, this class is actually a InBoundHandler, processing is actually read event.

So let’s see how this Decoder converts the data into the class object we want. Take a look at the channelRead method:

If (MSG instanceof ByteBuf) {// This is a data structure inherited from ArrayList CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; Cumulation = data; cumulation = data; cumulation = data; Cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // True decode part callDecode(CTX, cumulation, out); } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { if (cumulation ! = null && ! cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) {// channelRead (cumulation); Avoid OOM numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = ! out.insertSinceRecycled(); FireChannelRead (CTX, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); }Copy the code

Here’s the steps to take apart. First, ByteToMessageDecoder can only handle ByteBuf. Then create a CodecOutputList data structure. The structure here is really just an ArrayList, but all the methods in it have been rewritten. The difference is not huge, but this class has a getUnsafe method and the index passed in is not checked. The data structure here is mainly for performance.

Then put the data into cumulation. This is actually a ByteBuf that holds all the data that hasn’t been decoded yet.

And then the actual decoding. Note the code in Finally that avoids OOM. This is a scenario. If the cumulation has been read for more than 16 times and the cumulation has not been read, a memory leak is believed to have occurred. For example, if I have a Decoder that inherits a ByteToMessageDecoder, but decode method I don’t read data from ByteBuf, but write data directly to out, then the cumulation will get bigger and bigger. Netty will discard this data to avoid OOM production in this case (execute discardSomeReadBytes).

Let’s take a look at the callDecode method:

While (in.isreadable ()) {// The first time out is new, there is no data int outSize = out.size(); if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); if (ctx.isRemoved()) { break; } outSize = 0; } int oldInputLength = in.readableBytes(); / / call the decode method, be decoded data into out decodeRemovalReentryProtection (CTX, in, out); If (ctx.isremoved ()) {break; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; }} if (oldInputLength == in.readableBytes()) {throw new DecoderException("...") ); } if (isSingleDecode()) { break; }}Copy the code

If there is already data in out, it will trigger channelRead to pass the data back:

static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, Int numElements) {if (MSGS instanceof CodecOutputList) { (CodecOutputList) msgs, numElements); } else { for (int i = 0; i < numElements; i++) { ctx.fireChannelRead(msgs.get(i)); Static void fireChannelRead(ChannelHandlerContext CTX, CodecOutputList MSGS, int numElements) { for (int i = 0; i < numElements; i ++) { ctx.fireChannelRead(msgs.getUnsafe(i)); }}Copy the code

Note that every data in out is passed backwards. Note here the second fireChannelRead using CodecOutputList getUnsafe, the Unsafe method into the parameter is an array subscript, like ArrayList, but will not check the legitimacy of the subscript, mainly in order to improve performance.

Notice that in the Finally block of channelRead, we fire a fireChannelRead anyway.

Encoder

MessageToByteEncoder, which is actually an outBoundHandler. Look at the write method:

ByteBuf buf = null; Try {if (acceptOutboundMessage(MSG)) {// Whether the outbound MSG can be processed by encoder. I cast = (I) msg; buf = allocateBuffer(ctx, cast, preferDirect); Try {// encode encode(CTX, cast, buf); } finally {/ / code before the release after MSG ReferenceCountUtil. Release (cast). } if (buf.isreadable ()) {// pass ctx.write(buf, promise); } else {// If unreadable, pass a buffer buf.release() backwards; ctx.write(Unpooled.EMPTY_BUFFER, promise); } buf = null; } else {// cannot handle direct backpass ctx.write(MSG, promise); } } catch (EncoderException e) { throw e; } catch (Throwable e) { throw new EncoderException(e); } finally {// If (buf! = null) { buf.release(); }}Copy the code

This is a little bit easier. There are not a lot of details, look at the above basic can understand, I will not elaborate.