ObjectDecoder and ObjectEncoder codecs

1. ObjectEncoder

An encoder which serializes a Java object into a {@link ByteBuf}.

Serialize a Java object into a ByteBuf object, noting that the Java object needs to implement the Java.io.Serializable interface

2. ObjectDecoder

A decoder which deserializes the received {@link ByteBuf}s into Java objects.

To de-sequence a ByteBuf into a Java object, note that the Java object needs to implement the java.io.Serializable interface

3. Start fast

The first step is that a Java object must implement the Serializable interface

public class OPack implements Serializable {

    private static final long serialVersionUID = -5734509523963527363L; String name; String msg; . Other ellipsis, get, set methods, etc.}Copy the code

Both encoders and decoders are added to the client server

ch.pipeline().addLast(new ObjectEncoder());

ch.pipeline().addLast(new ObjectDecoder(1004.new ClassResolver() {
    @Override
    publicClass<? > resolve(String className)throws ClassNotFoundException {
        returnOPack.class; }}));Copy the code

Himself wrote a ChannelInboundHandlerAdapter implementation channelRead () method, is very simple, or SimpleChannelInboundHandler < T > channelRead0 () method,

4. Codec source code analysis

1. ObjectEncoder coder

public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
    // This is a byte array of ints, because ints are four bytes
    private static final byte[] LENGTH_PLACEHOLDER = new byte[4];

    @Override
    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
        / / write pointer
        int startIdx = out.writerIndex();

        // 
        ByteBufOutputStream bout = new ByteBufOutputStream(out);
        //
        ObjectOutputStream oout = null;
        try {
            // First write a 4-byte array
            bout.write(LENGTH_PLACEHOLDER);
			
            oout = new CompactObjectOutputStream(bout);
            
            // Then write an object
            oout.writeObject(msg);
            oout.flush();
        } finally{... }// Get the write pointer
        int endIdx = out.writerIndex();

        // Set the length value
        out.setInt(startIdx, endIdx - startIdx - 4); }}Copy the code

Play by yourself……..

ByteBuf out = Unpooled.buffer(1024);

// the w pointer position
int start = out.writerIndex();

// Set an output stream
ByteBufOutputStream bbos = new ByteBufOutputStream(out);

// First write an array of 4 bytes
bbos.write(new byte[4]);

/ / object flow
ObjectOutputStream objectOutputStream = new ObjectOutputStream(bbos);

// Write an obj
objectOutputStream.writeObject(new OPack("hhh"."hhh"));

// W final position
int end = out.writerIndex();

// Set an int at the start position of the buf object to end-start-4, so the object length is calculated
out.setInt(start, end - start - 4);

System.out.println("out = " + out);

System.out.println("out.readInt() = " + out.readInt());

// look at counting references
System.out.println("out.refCnt() = " + out.refCnt());
Copy the code

The output

out = UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 117, cap: 1024)
out.readInt() = 113
out.refCnt() = 1
Copy the code

2. ObjectDecoder

public class ObjectDecoder extends LengthFieldBasedFrameDecoder {

    private final ClassResolver classResolver;

    public ObjectDecoder(ClassResolver classResolver) {
        this(1048576, classResolver);
    }
	
    // This means the length of each frame is 1048576
    // The offset of the length is 0, occupying 4 bytes, and finally removing the first 4 bytes
    public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {
        super(maxObjectSize, 0.4.0.4);
        this.classResolver = classResolver;
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        // Call the parent's decode() method directly and return the ByteBuf method that has already deducted our 4 bytes
        ByteBuf frame = (ByteBuf) super.decode(ctx, in);
        if (frame == null) {
            return null;
        }

        // This code has two parts: one is to release ByteBuf and one is to convert it to an ObjectInputStream object
        ObjectInputStream ois = new CompactObjectInputStream(new ByteBufInputStream(frame, true), classResolver);
        try {
            // Just read it last
            return ois.readObject();
        } finally{ ois.close(); }}}Copy the code

I have here a picture, you can clearly see the previous change, change point is what I mentioned above, I’ll explain below LengthFieldBasedFrameDecoder source of its analysis, also note is that you get the count of reference at this time is 2, the need to manually release, I already mentioned…. in the comments above Netty has released it for us, but I debug found that it was not released, but there was a release behind it.

StringEncoder and StringDecoder – string codec

MessageToMessageEncoder and MessageToMessageDecoder

StringDecoder converts ByteBuf toString, calling bytebuf.tostring (charset),

StringEncoder converts String to ByteBuf, calling bytebufutil.encodeString (ctx.alloc(), charbuffer.wrap (MSG), charset)

Server side:

final StringServerHandler stringServerHandler = new StringServerHandler();

// Add a handler to the pipe
ChannelPipeline pipeline = ch.pipeline();
// Add string codec,
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
// self-implemented
pipeline.addLast("serverHandler", stringServerHandler);
Copy the code

Our processor StringServerHandler implements SimpleChannelInboundHandler < String >.

private static class StringServerHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        logger.info("[server] received message: {}", msg);
        ctx.writeAndFlush("Server received message"); }}Copy the code

Test case: Found OK

Io.net ty. Handler. Codec. String. StringEncoder basic implementation code

@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
    if (msg.length() == 0) {
        return;
    }

    out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}
Copy the code

Io.net ty. Handler. Codec. String. StringDecoder basic implementation

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
    out.add(msg.toString(charset));
}
Copy the code

3. FixedLengthFrameDecoder – fixed length decoder

1. The source code

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    / / fixed length
    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        checkPositive(frameLength, "frameLength");
        this.frameLength = frameLength;
    }

    // ByteToMessageDecoder
    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // Execute this method first -- >
        Object decoded = decode(ctx, in);   
        // null is thrown directly into the buffer
        if(decoded ! =null) {
            // add it if it is not emptyout.add(decoded); }}// -- > here
    protected Object decode(
            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        
        < < returns null
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            // Is equal to or equal to cut fixed length
            returnin.readRetainedSlice(frameLength); }}}Copy the code

2. Basic usage

ByteBuf buf = Unpooled.buffer(8);

// 0000 0000, 0000 0000, 0010 0111, 0001 0000
/ 0   0   39   16 
 buf.writeInt(10000);
//0000 0000, 0000 0000, 0000 0000, 0110 0100
// 00 0 100
buf.writeInt(100);

// We changed the length of server new FixedLengthFrameDecoder(6) to 6 bytes
// Then the client sends two buF objects to the server
Copy the code

Results:

For the first time:0	0	39	16	0	0The second:0	100	 0	 0	39	16	
Copy the code

We find that the last two bytes of the first send are the first two bytes of the second send, and then we know for sure that the first send that was not received is in the buffer, and this buffer is that every client that connects to the server has a buffer corresponding to that client, Not a common buffer…

4. LengthFieldBasedFrameDecoder – custom decoder

1. Construction method

new LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip, boolean failFast);

// byteOrder: big end \ small end

// maxFrameLength: Frame refers to the maximum buffer that a client receives each packet sent by the server.

LengthFieldOffset: specifies the offset of the length of a packet

LengthFieldLength: Specifies the length of the packet length, e.g. short is 2 bytes and int is 4 bytes

// lengthAdjustment : For example, if the input is (10, 0, 2,-2,0) and the input is [10(short),100(int),1000(int)], then we need to adjust the value, namely -2, to make the length -2, which is our actual length 8. The data can be read only when the value is (100,10000)

Initialbytesttestis the number of bytes that are skipped when testtestis returned

// failFast: Fails quickly
Copy the code

2. Easy to use

1. The first simple case (10, 0, 2,0,0)

1.pipeline().addLast()new LengthFieldBasedFrameDecoder(8.0.2.0.0));


2. The client sends data// Data sent by the client to the server
ByteBuf buf = Unpooled.buffer(10);
// The length is 8
buf.writeShort(8);
buf.writeInt(10000);
buf.writeInt(100);

3. Server-side code@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        ByteBuf messages = (ByteBuf) msg;
        System.out.println("Data length =" + messages.readShort());
        System.out.println("Data 1 =" + messages.readInt());
        System.out.println("Data 2 =" + messages.readInt());
    } else {
        super.channelRead(ctx, msg); }}Copy the code

The output

Data length =8data1 = 10000data2 = 100
Copy the code

All OK,

2. The second case (10, 0, 2,-2,0)

ByteBuf buf = Unpooled.buffer(10);
// The length is 10, including the length field, so adjust to -2
buf.writeShort(10);
buf.writeInt(10000);
buf.writeInt(100); The server-side code is unchangedCopy the code

Output, the output data is consistent

Data length =10data1 = 10000data2 = 100
Copy the code

3. The third case (10, 0, 2,-2,2)

ByteBuf buf = Unpooled.buffer(10);
// The length is 10, including the length field, so adjust to -2
buf.writeShort(10);
buf.writeInt(10000);
buf.writeInt(100); The server-side code just drops the length@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        ByteBuf messages = (ByteBuf) msg;
        System.out.println("Data 1 =" + messages.readInt());
        System.out.println("Data 2 =" + messages.readInt());
    } else {
        super.channelRead(ctx, msg); }}Copy the code

Output, the result is consistent, very correct, will not play………

data1 = 10000data2 = 100
Copy the code

3. Source code analysis:

// Note in direct memory
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
    // Discard too long, false at first
   if (discardingTooLongFrame) {
        discardingTooLongFrame(in);
    }
    
    // lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
    // The readable length is less than the offset at the end of the length
    if (in.readableBytes() < lengthFieldEndOffset) {
        return null;
    }

    // True offset of length = r pointer + length offset
    int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;

    // The length of each frame is read according to the number of bytes in your frame
    long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

    // The length is less than 0
    if (frameLength < 0) {
        failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
    }

    For example, if my length contains bytes of length, I need to adjust the number of bytes to remove the length
    // True packet length = frameLength + Adjusted length + offset at the end of the length
    frameLength += lengthAdjustment + lengthFieldEndOffset;

    // The length is less than
    if (frameLength < lengthFieldEndOffset) {
        failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
    }

    // If the packet length is greater than maxFrameLength, an exception will be thrown
    if (frameLength > maxFrameLength) {
        exceededFrameLength(in, frameLength);
        return null; }...// Skip initialized InitialByteststrip
    in.skipBytes(initialBytesToStrip);

    / / r pointer
    int readerIndex = in.readerIndex();
    // Real Length = Length - InitialByteststrip
    int actualFrameLength = frameLengthInt - initialBytesToStrip;
    
    // This is also a slice, accounting +1, which we'll talk about later
    ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
    
    // Change the reader position
    in.readerIndex(readerIndex + actualFrameLength);
    
    return frame;
}
Copy the code

5. DelimiterBasedFrameDecoder – custom delimiter decoder

Custom delimiter decoder

1. Construction method

new DelimiterBasedFrameDecoder(50.false.true, Unpooled.copiedBuffer("a", CharsetUtil.UTF_8))

// maxFrameLength Specifies the maximum number of packets per packet
// stripDelimiter is enough to remove delimiters
// failFast If an exception occurs, it is quickly thrown
// delimiter delimiter
Copy the code

2. Easy to use

// 1. On the server side, add the decoder
// separator decoder
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(50.false.true, Unpooled.copiedBuffer("a", CharsetUtil.UTF_8)));
// String decoder
ch.pipeline().addLast(new StringDecoder());
// Own decoder
 ch.pipeline().addLast(new MyDecoder());

/ / channelRead () method
if (msg instanceof String) {
	System.out.println(msg);
}

// 2. Whether we registered on the client sent:
run.channel().writeAndFlush(Unpooled.copiedBuffer("Hello a, you're really good.", CharsetUtil.UTF_8));
run.channel().writeAndFlush(Unpooled.copiedBuffer("A", CharsetUtil.UTF_8));
Copy the code

Output result:

Server output: Hello a you are really good ACopy the code

We find that the delimiter is split at the end of the sentence. If there is no delimiter, the other part is put into the buffer, waiting for the next read.

6. LineBasedFrameDecoder – line-by-line decoder

1. Construction method

LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast)
// maxLength: maximum frame
// failFast: Fast throws exceptions
// stripDelimiter: Removes delimiters
Copy the code

2. Quick use

// 1. Newline decoder
ch.pipeline().addLast(new LineBasedFrameDecoder(1000.true.true));
// 2. String decoder
ch.pipeline().addLast(new StringDecoder());
// 3. Own processor
ch.pipeline().addLast(newMyHandler()); Server sends: ctx.writeAndFlush(Unpooled. CopiedBuffer ("HELLO, this is server \n", CharsetUtil.UTF_8)); The client sends: run.channel().writeAndFlush(unpooled.copiedBuffer ()"Hello, you're really good.", CharsetUtil.UTF_8))
run.channel().writeAndFlush(Unpooled.copiedBuffer("Ah, \ n", CharsetUtil.UTF_8))
Copy the code

Output result:

Server: HELLO, you are really good, client: HELLO, this is the serverCopy the code

3. Analyze the use of the -buf.foreachbyte () method

ByteBuf buf = Unpooled.buffer(10);

buf.writeCharSequence("hell\n", CharsetUtil.UTF_8);

byte x = '\n';

//ByteProcessor.FIND_LF
int i = buf.forEachByte(0.10.new ByteProcessor() {
    @Override
    public boolean process(byte value) throws Exception {
        // do not want the loop to return false
        if (value == x) {
            return false;
        }
        // Return true if you want to continue the loop
        return true; }}); System.out.println("index : "+i);
Copy the code

Output:

index : 4
Copy the code

So it’s very easy to find where the newline is and just cut it…….

7. IdleStateHandler – Heartbeat detection processor

Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.

Raises an IdleStateEvent event when a channel has not performed read Write or both for a period of time, so it belongs to a stream of events

Recommended usage in the official source code, and in general,

The first parameter is: (READER_IDLE) No data was received for a while.

The first parameter is: (WRITER_IDLE) No data was sent for a while.

The first parameter is: (ALL_IDLE) No data was either received or sent for a while.

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
   public void initChannel(Channel channel) {
       channel.pipeline().addLast("idleStateHandler".new IdleStateHandler(60.30.0));
       channel.pipeline().addLast("myHandler".newMyHandler()); }}// Handler should handle the IdleStateEvent triggered by IdleStateHandler.
public class MyHandler extends ChannelDuplexHandler {
    @Override
   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
       if (evt instanceof IdleStateEvent) {
           IdleStateEvent e = (IdleStateEvent) evt;
           if (e.state() == IdleState.READER_IDLE) {
            
           } else if (e.state() == IdleState.WRITER_IDLE) {
               
           }
       }
   }
}
Copy the code

1. How to design a reasonable heartbeat detection

I can't find the article either. I refer to an article by Ali Dubbo. His idea is to focus on the client side and the server side, because our idea is to let the server side send heartbeat packets and periodically detect them. This is not good for us is the logic complex, consider the situation too much, so change to customer subject, especially suitable for client development. Thinking about is the client and server, the client when 60 s didn't receive the message from the server side, will take the initiative to give the server sends a heartbeat packets, at this point, when the server did not receive and send to the client's length more than 120 s we disconnect with the client, so the server is very simple, the client is also very simpleCopy the code

1. Server-side code

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {

	ChannelPipeline pipeline = socketChannel.pipeline();
	// Heartbeat detection, read/write timeout is 120S, no message is received or sent
	pipeline.addLast("idleStateHandler".new IdleStateHandler(0.0.120));

	// Heartbeat detection processor
	pipeline.addLast("serverHeartBeatHandler".new ServerHeartBeatHandler(listener));

}

// Processor logic
public class ServerHeartBeatHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		// Check whether it is IdleStateEvent
        if (evt instanceof IdleStateEvent) {
             // The heartbeat detection server is shut down due to timeout
             // pass to handlerRemove
            ctx.close();
        } else {
            // Otherwise, no processing is done
            super.userEventTriggered(ctx, evt); }}}Copy the code

2. Client code

@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
    ChannelPipeline pipeline = socketChannel.pipeline();

    // Heartbeat detection, if 60S we do not receive the server message, we will send a heartbeat packet
    pipeline.addLast("nettyHeartBeatHandler".new IdleStateHandler(60.0 , 0)

    pipeline.addLast("heartBeatHandler".new ClientHeartBeatHandler(listener));
}

/ / processor
public class ClientHeartBeatHandler extends ChannelDuplexHandler {
    private ChatBootListener listener;
    public ClientHeartBeatHandler(ChatBootListener listener) {
        this.listener = listener;
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            // Send a heartbeat packet
            ctx.channel().writeAndFlush(Constants.HEART_BEAT_NPACK).addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()) {
                    // Success is usually sent to the buffer, but does not mean that the server received the request
                    } else {
                        // TODO:2019/11/16 failure... See the demand}}}); }else {
            // Pass to the parent class
            super.userEventTriggered(ctx, evt); }}}Copy the code

2. Fundamentals

The EventExecutor of the ChannelHandlerContext executes a task periodically, passing in a runnable object and a delay time, and then executing………. periodically