Netty provides many preset codecs and processors almost out of the box, reducing the time and effort spent on cumbersome transactions

Idle connections and timeouts

Detecting idle connections and timeouts is critical to freeing resources, and Netty provides several ChannelHandler implementations for this

The name of the describe
IdleStateHandler When the connection is idle for too long, an IdleStateEvent event is emitted, which you can then handle by overwriting the userEventTriggered() method in ChannelInboundHandler
ReadTimeoutHandler If no inbound data is received within the specified time interval, a ReadTimeoutException is thrown and the corresponding Channel is closed. This ReadTimeoutException can be detected by overriding the exceptionCaught() method in your ChannelHandler
WriteTimeoutHandler If no outbound data is written within the specified time interval, a WriteTimeoutException is thrown and the corresponding Channel is closed. This WriteTimeoutException can be detected by overriding the exceptionCaught() method in your ChannelHandler

The code below shows that when using the usual method of sending a heartbeat message to a remote node, we are notified if no data is received or sent within 60 seconds, and if there is no response, the connection is closed

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // IdleStateHandler will send an IdleStateEvent event when triggered
        pipeline.addLast(new IdleStateHandler(0.0.60, TimeUnit.SECONDS));
        // add a HeartbeatHandler to the ChannelPipeline
        pipeline.addLast(new HeartbeatHandler());
    }

    public static final class HeartbeatHandler extends SimpleChannelInboundHandler {

        // Heartbeat message sent to the remote node
        private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
                .unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                // Sends a heartbeat message and closes the connection if sending fails
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                        .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt); }}@Override
        protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {}}}Copy the code

Decodes separator – based protocols

Delimiter based messaging protocols use defined characters to mark the beginning or end of a message or message segment. The following table lists decoders to help you define custom decoders that can extract frames separated by any sequence of tags

The name of the describe
DelimiterBasedFrameDecoder Frames are extracted using user-supplied separators
LineBasedFrameDecoder Frames are separated by a line terminator (\n or \r\n)

The following code shows how to use LineBasedFrameDecoder to handle end-of-line delimited frames

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // This LineBasedFrameDecoder forwards the extracted frame to the next ChannelInboundHandler
        pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
        // Add a FrameHandler to receive frames
        pipeline.addLast(new FrameHandler());
    }

    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            // do something}}}Copy the code

If you also use besides end-of-line character of separators to separate frames, you can also use DelimiterBasedFrameDecoder, just need to specific delimiters sequence assigned to its constructor

As an example, we will use the following protocol specification:

  • The incoming data stream is a series of frames, each separated by the newline character \n
  • Each frame consists of a series of elements, each separated by a single space character
  • The contents of a frame represent a command, defined as a command name followed by a variable number of arguments

Based on this protocol, our custom decoder will define the following classes:

  • Cmd – Stores the commands for frames in ByteBuf, one for names and one for parameters
  • CmdDecoder – Takes a line of string from the decode() method overridden and builds a Cmd instance from its contents
  • CmdHandler – Gets the decoded Cmd object from CmdDecoder and does some processing with it
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {

    static final byte SPACE = (byte) ' ';

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new CmdDecoder(64 * 1024));
        pipeline.addLast(new CmdHandler());
    }

    /** * Cmd POJO */
    public static final class Cmd {

        private final ByteBuf name;
        private final ByteBuf args;

        public Cmd(ByteBuf name, ByteBuf args) {
            this.name = name;
            this.args = args;
        }

        public ByteBuf getArgs(a) {
            return args;
        }

        public ByteBuf getName(a) {
            returnname; }}public static final class CmdDecoder extends LineBasedFrameDecoder {

        public CmdDecoder(int maxLength) {
            super(maxLength);
        }

        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
            // Extract frames separated by a line end sequence from ByteBuf
            ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
            // If there are no frames in the input, null is returned
            if (frame == null) {
                return null;
            }
            // Find the index of the first space character, preceded by the command name, followed by the argument
            int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), SPACE);
            // Create a new Cmd object with a slice containing the command name and parameters
            return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex())); }}public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Cmd msg) throws Exception {
            // Handle ChannelPipeline Cmd objects}}}Copy the code

Length based protocol

A length-based protocol defines a frame by encoding its length to the head of the frame, rather than using a special delimiter to mark its end. The following table lists two decoders provided by Netty for handling this type of protocol

The name of the describe
FixedLengthFrameDecoder Extracts the fixed-length frame specified when the constructor is called
LengthFieldBasedFrameDecoder Extract the frame based on the length value in the frame header: the offset of this field and the length are specified in the constructor

You often encoded into the message header of the frame size is not a fixed value of agreement, in order to deal with the longer frames, you can use LengthFieldBasedFrameDecoder, it will be from the head field frame length is determined, and then extracted from the data stream specifies the number of bytes

The following figure shows an example where the length field has an offset of 0 in the frame and is 2 bytes long

The following code shows how to use its three constructors, the maxFrameLength, lengthFieldOffser, and lengthFieldLength constructors. In this scenario, the length of the frame is encoded in the first 8 bytes of the frame

public class LengthBasedInitializer extends ChannelInitializer<Channel> {

    / * * * use LengthFieldBasedFrameDecoder frame length decoding coded into the frame of 8 bytes before the start of the message * /
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024.0.8));
        pipeline.addLast(new FrameHandler());
    }

    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {

        @Override
        protected void messageReceived(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            // do something}}}Copy the code

Writing large data

Because of the potential for network saturation, how to efficiently write large chunks of data in an asynchronous framework is a particular problem. Since the write operation is non-blocking, even if not all data has been written out, the write operation will be returned and notified to ChannelFuture when it is complete. When this happens, you run the risk of running out of memory if you keep writing. Therefore, when writing large data, you need to consider the case that the connection of the remote node is slow connection, which can cause the delay of memory release. Let’s consider writing out the contents of a file to the network

NIO’s zero-copy feature eliminates the copying process of moving the contents of a file from the file system to the network stack. All of this is happening in the heart of Netty, so all the application needs to do is use an implementation of the FileRegion interface

The following code shows how to create a DefaultFileRegion from a FileInputStream and write it to a Channel

Create a FileInputStream
leInputStream in = new FileInputStream(file);
// Create a new DefaultFileRegion with the full length of the file
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
// Send the DefaultFileRegion and register a ChannelFutureListener
channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        // Processing failed
        if(! future.isSuccess()) { Throwable cause = future.cause();// do something}}});Copy the code

This example applies only to direct transfers of file contents, not to any processing of the data by the application. When you need to copy data from the file system to user memory, you can use ChunkedWriteHandler, which allows you to write large data streams asynchronously without incursing significant memory consumption

The type parameter B in interface ChunkedInput<B> is the type returned by the readChunk() method. Netty presets four implementations of this interface, as shown in the table, each representing a stream of variable length that will be processed by ChunkedWriteHandler

The name of the describe
ChunkedFile Retrieve data block by block from a file, for use when your platform does not support zero copy or when you need to convert data
ChunkedNioFile Similar to ChunkedFile, except that it uses a FileChannel
ChunkedStream Transfer content block by block from InputStream
ChunkedNioStream Progressively transfer content from ReadableByteChannel

The following code illustrates the use of ChunkedStream, which is the most common implementation in practice. The class shown is instantiated with a File and an SSLContext, and when the initChannel() method is called, it initializes the Channel with the ChannelHandler chain shown

When a Channel’s state becomes active, WriteStreamHandler will block out data from the file as ChunkedStream

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {

    private final File file;
    private final SslContext sslContext;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslContext) {
        this.file = file;
        this.sslContext = sslContext;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SslHandler(sslContext.newEngine(ch.alloc())));
        // Add ChunkedWriteHandler to handle data passed in as ChunkedInput
        pipeline.addLast(new ChunkedWriteHandler());
        // Once the connection is established, WriteStreamHandler starts writing file data
        pipeline.addLast(new WriteStreamHandler());
    }

    public final class WriteStreamHandler extends SimpleChannelInboundHandler<Channel> {

        /** * When a connection is established, the channelActive() method writes file data using ChunkedInput */
        @Override
        protected void messageReceived(ChannelHandlerContext ctx, Channel msg) throws Exception {
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(newFileInputStream(file))); }}}Copy the code

Serialized data

The JDK provides ObjectOutputStream and ObjectInputStream for serializing and deserializing the basic data types and graphs of POJOs over the network. The API is not complex and can be applied to any object that implements the Java.io.Serializable interface. But its performance isn’t very efficient. In this section, we’ll see how Netty implements serialization

1. JDK serialization

If your application must interact with remote nodes using ObjectOutputStream and ObjectInputStream, and for compatibility, JDK serialization would be the right choice. The following table lists Netty’s serialized classes for interacting with the JDK

The name of the describe
CompatibleObjectDecoder Decoders that interoperate with non-Netty based remote nodes that use JDK serialization
CompatibleObjectEncoder Encoders that interoperate with non-Netty based remote nodes serialized with the JDK
ObjectDecoder A decoder built on top of JDK serialization that uses custom serialization to decode
ObjectEncoder Encoders built on TOP of JDK serialization encoded using custom serialization

2. Protocol Buffers serialization

Protocol Buffers is an open source data exchange format developed by Google that encodes and decodes structured data in a compact and efficient way and can be used across multiple languages. The following table shows the ChannelHandler implementation provided by Netty to support Protobuf

The name of the describe
ProtobufDecoder The message is decoded using Protobuf
ProtobufEncoder The message is encoded using Protobuf
ProtobufVarint32FrameDecoder The received ByteBuf is dynamically split based on the value of the Base 128 Varints integer length field of Google Protobuf Buffers in the message
ProtobufVarint32LengthFieldPrepender Prefix ByteBuf with a length field of Google Protobuf Buffers Base 128 Varints integer