Netty – Core codec

The core part of Netty is to master the usage principle of ByteBuf. The business logic is mainly to write ChannelHandler and various codecs.

1, ChannelInboundHandlerAdapter & ChannelOutboundHandlerAdapter – core implementation class

This both implementation class inherits the ty. Io.net channel. ChannelHandlerAdapter class, basic implementation requires the user to achieve, so he basically is transparent for the user, we developed can control all our input and output, object to release and so on.

API methods

// Determine whether to share data with other channels, each client connection will apply for a channel, Handler handlerAdded // Each client registers channelRegistered // Active channelActive // Receives client information only once ChannelRead -> write/flush out: write out: flush.... ChannelReadComplete // Disconnect.... channelInactive channelUnregistered handlerRemovedCopy the code

1. When the exceptionCaught() and handlerRemoved() events are triggered

  1. ExceptionCaught () : Exceptional shut down, for example, the client is shut down or the server is manually shut down
  2. HandlerRemoved () : Normally removed from an executive, such as an executivectx.close()

So the general scenario is:

  1. The server executesctx.close(), the client should be in thehandlerRemoved()To execute the shutdown business logic, as well as the server side, when the server is abnormally shut down, the client sideexceptionCaught()Events trigger
  2. When the client executesctx.close(), server sidehandlerRemoved()The event will be triggered when the client abnormally shuts down the serverexceptionCaught()Events trigger

Therefore, exceptionCaught() for the abnormally closed event and handlerRemoved() for the normally closed event


2. Ctx.writeandflush (MSG) differs from ct.channel().writeAndFlush(MSG)

  1. ChannelHandlerContext.writeAndFlush(msg);

    Such as the current pipeline. Addlast (out1 in, out2), at this time. Such as in performing the ChannelHandlerContext writeAndFlush (MSG); In this case, the decoder will only go out1, that is, from the beginning to in, that is, out1.write -> ou1.flush

  2. ChannelHandlerContext.channel().writeAndFlush(msg);

    And this or the above case, at this time I in output changed to ChannelHandlerContext channel () writeAndFlush (MSG); Write -> out2.write -> ou1.flush -> ou2.flush

Ctx.write () and ctx.flush() and ctx.writeandFlush ()

Write means write, and flush means pushed to the buffer

Ctx.write () calls the write method of the output stream, as does Flush, and writeAndFlush is a combination of both, executing both


2. MessageToByteEncoder and ByteToMessageDecoder

The two class implements the above we mentioned both the realization of codec, so he basically encapsulates his main method, so we focus on io.net ty. Handler. The codec. ByteToMessageDecoder# channelRead this method. These are abstract classes, so we need to inherit and override the corresponding encode() and decode() methods

1. MessageToByteEncoder<I> coder

Converts the sent information into a ByteBuf object. The generic parameter, I, is the input message.

Since we perform a write operation, such as ctx.write() or writeAndFlush, the write method of the output stream is called

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    ByteBuf buf = null;
    try {
        if (acceptOutboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            // The message we sent
            I cast = (I) msg;
            // Allocate memory
            buf = allocateBuffer(ctx, cast, preferDirect);
            try {
                // Decode -> let us write
                encode(ctx, cast, buf);
            } finally {
                ReferenceCountUtil.release(cast);
            }

            if (buf.isReadable()) {
                ctx.write(buf, promise);
            } else {
                buf.release();
                ctx.write(Unpooled.EMPTY_BUFFER, promise);
            }
            buf = null;
        } else {
            / / ChannelOutboundInvoker processing chainctx.write(msg, promise); }}catch (EncoderException e) {
        throw e;
    } catch (Throwable e) {
        throw new EncoderException(e);
    } finally {
        // It will help us release once, so there is no need for us to manually release our output object
        if(buf ! =null) { buf.release(); }}}// For example, customizing this class will allow us to implement encode methods
public class IntegerEncoder extends MessageToByteEncoder<Integer> {
    @Override
    public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
        throws Exception { out.writeInt(msg); }}Copy the code

ByteToMessageDecoder (emphasis)

He rewrote the ChannelInboundHandlerAdapter class to override the parent class a lot of methods, we focus on io.net. Ty handler. Codec. ByteToMessageDecoder# channelRead this method

The documentation for the basic core is as follows:

​ ChannelInboundHandlerAdapter which decodes bytes in a stream-like fashion from one ByteBuf to an other Message type.

The input ByteBuf is converted to the desired data type object and added to its collection

​ Generally frame detection should be handled earlier in the pipeline by adding a DelimiterBasedFrameDecoder, FixedLengthFrameDecoder, LengthFieldBasedFrameDecoder, or LineBasedFrameDecoder.

Generally, by adding DelimiterBasedFrameDecoder, FixedLengthFrameDecoder, LengthFieldBasedFrameDecoder or LineBasedFrameDecoder, Frame detection can be handled earlier in the pipeline.

io.netty.handler.codec.ByteToMessageDecoder#channelRead

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // Determine if he is a ByteBuf type
    if (msg instanceof ByteBuf) {
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            Cumulation (BUF); // First to determine whether cumulation(BUF) is cached
            first = cumulation == null;
            Cumulation = cumulation; cumulation = cumulation; cumulation = cumulation
            // Cumulator is used here. Default: Memory copy accumulator
            cumulation = cumulator.cumulate(ctx.alloc(),
                                            first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
            // Call the decoding method here
            callDecode(ctx, cumulation, out);

        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally { // The callDecode() method above is finally executed if break, etc
            // Release it after reading, so it is usually not necessary to release it manually
            if(cumulation ! =null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                ...
            }

            int size = out.size();
            firedChannelRead |= out.insertSinceRecycled();
            // Pass data downfireChannelRead(ctx, out, size); out.recycle(); }}else{ ctx.fireChannelRead(msg); }}Copy the code

The difference between two kinds of cumulators in decoding.

The policy pattern is used

  1. MERGE_CUMULATOR (default)

    Using memory copies

  2. COMPOSITE_CUMULATOR

    Combination. Provides a unified view of logic externally

Io.net ty. Handler. Codec. ByteToMessageDecoder# callDecode method

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        // If the pointer is readable, it is always executed
        while (in.isReadable()) {
            int outSize = out.size();

            // Usually not go this way
            if (outSize > 0) { fireChannelRead(ctx, out, outSize); out.clear(); . outSize =0;
            }

            // Record the readable length
            int oldInputLength = in.readableBytes();
            // call here. This includes calling your own decode() method
            // In decode, handler remove cannot be cleaned
            // After decode, need to clean up the datadecodeRemovalReentryProtection(ctx, in, out); .// No data is added to out
            if (outSize == out.size()) {
                // If the read pointer has not moved,
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    //
                    continue; }}... }}catch(DecoderException e) { ... }}Copy the code

Io.net ty. Handler. Codec. ByteToMessageDecoder# decodeRemovalReentryProtection method

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
    throws Exception {
    // decodeState state: handle handler being removed
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        // This is an abstract method that is actually called when you implement the class
        // Use the template mode
        decode(ctx, in, out);
    } finally{... }}Copy the code

Io.net ty. Handler. Codec. ByteToMessageDecoder# decode abstract method, we need to achieve

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
Copy the code

3. MessageToMessageDecoder and MessageToMessageEncoder

1. MessageToMessageEncoder<I> coder

He inherited the ChannelOutboundHandlerAdapter class, so it is an output stream encoder,

When I say generic, I mean input, and then I add output to out, which, whatever you are, is usually a ByteBuf object.

This is usually done by turning Java objects into ByteBuf objects. Generic I is a Java object

public class IntegerToStringEncoder extends
       MessageToMessageEncoder<Integer> {

    // It's pretty simple
   @Override
   public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out)
           throws Exception { out.add(message.toString()); }}Copy the code

MessageToMessageDecoder<I> decoder

By generics, I mean the input, and the output adds whatever object you want.

This is usually done by turning a ByteBuf object into a Java object. The generic I is the ByteBuf object

Just rewrite the following method. This example shows that the input is String and the output is Int

public class StringToIntegerDecoder extends
       MessageToMessageDecoder<String> {

    @Override
   public void decode(ChannelHandlerContext ctx, String message, List out) throws Exception { out.add(message.length()); }}Copy the code

4, SimpleChannelInboundHandler < I >

Compared with us directly inherited ChannelInboundHandlerAdapter advantage:

  1. The acceptInboundMessage method determines whether the currently passed MSG can be processed by the current Handler
  2. He can help us automatically free the memory

Generic I is the type we have decoded, he would pass type after decoding to us, he doesn’t need us to manually release at the same time, he achieved ChannelInboundHandlerAdapter class,

Basically, he doesn’t need us to cast. At development time, you just need to focus on implementing the abstract method channelRead0

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    // Release is initialized to true every time
    boolean release = true;
    try {
        / / type, because SimpleChannelInboundHandler has a generic type I, is the judgment, and his record is the same type
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I imsg = (I) msg;
            // This is the business logic we wrote
            channelRead0(ctx, imsg);
        } else {
            // If it is not of type I, the current handler will not process it
            release = false;
            // Just pass it like Hanlerctx.fireChannelRead(msg); }}finally {
        // Finally, it will automatically release to us, so we do not need to manually release, which will cause unnecessary waste
        if(autoRelease && release) { ReferenceCountUtil.release(msg); }}}Copy the code

Io.net ty. Channel. SimpleChannelInboundHandler# channelRead0 abstract methods

protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;
Copy the code

5, generally use “two layer” codec

Why is “secondary” decoding needed

Three commonly used decoders to solve sticky packet and half packet problems are called primary decoders

Because the result of a decoding is bytes, it needs to be converted into an object. This is called a “secondary decoder”.

Conversely, encoders are similar.

  • One decoder: ByteToMessageDecoder

    ByteBuf –> user data –> Sticky packets and half packets are handled here

  • MessageToMessageDecoder<I>

    ByteBuf(User data) -> Java Object

Can be merged into one step in place?

Yes, but not recommended. Reason: No stratification, not clear enough; High coupling, not easy to replace the scheme

Common “secondary codec” mode

Java serialization, XML, JSON, MessagePack, Protobuf, etc

Netty support for secondary codec

Example: io.net ty. Example. Worldclock. WorldClockClientInitializer# initChannel