Click “like” to see, form a habit, the public account search [dime technology] pay attention to more original technical articles. This article has been included in GitHub org_Hejianhui /JavaStudy.

preface

  • BIO, NIO, AIO summary
  • Five IO models in Unix network programming
  • In-depth understanding of IO multiplexing implementation mechanisms
  • Netty core functions and threading model

Before we talked about BIO, NIO, AIO and other basic knowledge and Netty core functions and threading model, this article focuses on understanding Netty codec, sticky packet unpacking, heartbeat mechanism and other implementation principles to explain.

Netty codec

Netty involves codec components such as Channel, ChannelHandler, ChannelPipe, etc. Let’s first understand the functions of these components.

ChannelHandler

ChannelHandler acts as the application logic container for handling inbound and outbound data. Realize ChannelInboundHandler interface (or ChannelInboundHandlerAdapter), for example, you can pick up inbound events and data, the data will then be your application’s business logic processing. You can also swipe data from the ChannelInboundHandler when you want to send a response to a connected client. Your business logic usually resides in one or more ChannelinboundHandlers.

ChannelOutboundHandler works the same way, except it handles outbound data.

ChannelPipeline

ChannelPipeline provides a container for the ChannelHandler chain. In a client application, for example, we call events outbound if they travel from client to server, That is, the data sent from the client to the server will pass through a series of ChannelOutboundHandler in the pipeline (ChannelOutboundHandler calls each handler from tail to head). And are processed by these Hadnlers, which are called inbound, Inbound only calls ChannelInboundHandler logic in the pipeline (ChannelInboundHandler calls each handler from head to tail).

codecs

When you send or receive a message via Netty, a data conversion occurs. Inbound messages are decoded: converted from bytes to another format (such as Java objects); If it is an outbound message, it is encoded in bytes.

Netty provides a number of useful codecs that implement the ChannelInboundHadnler or ChannelOutboundHandler interface. In these classes, the channelRead method has been overridden.

In the inbound case, this method is called for every message read from an inbound Channel. It then calls the decode() method provided by the known decoder to decode and forward the decoded bytes to the next ChannelInboundHandler in the ChannelPipeline.

Netty provides many codecs, such as StringEncoder and StringDecoder for codec strings, ObjectEncoder and ObjectDecoder for codec objects, and so on.

Of course, you can also customize the codec by integrating ByteToMessageDecoder.

The sample code

The full code is at Github: github.com/Niuh-Study/… The corresponding package com.niuh.netty.codec

Netty Sticking and unpacking packets

TCP sticky packet unpacking refers to that several packets sent by the sender are glued into one packet or a certain packet is unwrapped for receiving. As shown in the figure below, the client sends two packets, D1 and D2, but the server may receive data in one of the following situations.

demo

First, the client is responsible for sending the message, and sends the message for 5 consecutive times. The code is as follows:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
	for (int i = 1; i <= 5; i++) {
    	ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + "", Charset.forName("utf-8")); ctx.writeAndFlush(byteBuf); }}Copy the code

The server, acting as the receiver, then receives and prints the result:

// count variable, used to count
private int count;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
	System.out.println("Server reading thread" + Thread.currentThread().getName());

    ByteBuf buf = (ByteBuf) msg;
    byte[] bytes = new byte[buf.readableBytes()];
    // Read the ByteBuf data into the bytes array
    buf.readBytes(bytes);
    String message = new String(bytes, Charset.forName("utf-8"));
    System.out.println("Server receives data:" + message);
    // Print the number of times received
    System.out.println("The amount of data received is:"(+ + +this.count));
}
Copy the code

Start the server, then start the two clients to send the message, the server console can see:

The sticky bag problem is actually random, so the result is different every time.

The full code is at Github: github.com/Niuh-Study/… The corresponding package com.niuh. Splitpacket0

Why do sticky bags appear?

TCP is connection-oriented and stream-oriented and provides high reliability services. Sending and receiving ends (client and server) will have on the socket, therefore, the sender to send multiple packages at the receiving end, the more effective to send to each other, using the optimization method (Nagle algorithm), less and less data gap between multiple data, merged into one large block of data, then the packet, this provides the efficiency, However, the receiver will have difficulty distinguishing the complete packet because flow-oriented communication has no message protection boundary.

How to understand that TCP is byte stream oriented

  1. The application interacts with TCP one data block at a time (of varying sizes), but TCP treats the application as just a series of unstructured byte streams. TCP does not know the meaning of the byte stream being sent;
  2. Therefore TCP does not guarantee that the receiving application received data block and the sender application made by block of data has a corresponding relationship between the size of (for example, the sender application to sender TCP, a total of 10 data blocks, but the recipient’s TCP may have received only 4 byte stream delivery upper applications).
  3. In addition, TCP determines the number of bytes in a packet segment based on the window value provided by the peer and the extent of network congestion, regardless of the number of packets sent by the application process. (The length of UDP packets is determined by the application process.) If an application sends a block of data to the TCP cache that is too long, TCP can cut it up and send it back. If an application sends only one byte at a time, TCP can wait for enough bytes to accumulate before sending a packet segment.

TCP sends packets in three times

  1. Buffer data reaches, maximum packet length MSS;
  2. The application process of the sending end specifies the packet segment to be sent, that is, the push operation supported by TCP.
  3. Send when one of the sender’s timer deadlines is up, even if the length does not exceed MSS.

The solution

Generally, there are 4 ways to solve the problem of sticky packages and reports

  1. A special symbol is added to the end of the data to identify the boundary of the packet. \n, \r, \t or other symbols are usually added

Learn HTTP, FTP, etc., use the carriage return newline symbol;

  1. The length of the data is declared in the header of the data, and the data is obtained by the length

Split the message into head and body. The head contains the body length field. Generally, the first field of head uses an int value to represent the body length.

  1. Specify the length of the packet. If the packet length is insufficient, the space will be filled. Read at a specified length. For example, 100 bytes, if not enough space;

  2. Use more complex application layer protocols.

Using LineBasedFrameDecoder

LineBasedFrameDecoder is a built-in Netty decoder. The corresponding coder is LineEncoder.

The idea is to add a special symbol to the end of the data to indicate the boundary. The default is to use the newline character \n.

The usage is very simple, sender plus encoder:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	// Add the encoder, using the default symbol \n, with the character set utF-8
    ch.pipeline().addLast(new LineEncoder(LineSeparator.DEFAULT, CharsetUtil.UTF_8));
    ch.pipeline().addLast(new TcpClientHandler());
}
Copy the code

Receiver plus decoder:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	// The decoder needs to set the maximum length of the data, I set it to 1024
	ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
	// Set the business handler for the pipeline
	ch.pipeline().addLast(new TcpServerHandler());
}
Copy the code

Then on the sender, send the message with an identifier at the end:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    for (int i = 1; i <= 5; i++) {
		// Add the default identifier \n to the end
    	ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED, Charset.forName("utf-8")); ctx.writeAndFlush(byteBuf); }}Copy the code

So we start the server and client again, and from the console on the server side we can see:

Special symbols are added at the end of data to mark the boundary of data packets, and the problem of sticky and unpacked packets is solved.

Note: the end of the data must be a delimiter. Do not add data after the delimiter, otherwise it will be used as the beginning of the next data. Here is the error demo:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    for (int i = 1; i <= 5; i++) {
		// Add the default identifier \n to the end
    	ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED + "[I am the string after the delimiter]", Charset.forName("utf-8")); ctx.writeAndFlush(byteBuf); }}Copy the code

The console on the server side will see this printed message:

Use custom length frame decoder

The principle of using this decoder to solve the sticky packet problem is the second one mentioned above. The length of the data is declared in the header of the data, and the data is obtained according to the length. The decoder constructor needs to define 5 parameters, which are relatively complicated.

  • maxFrameLengthMaximum length of sent packets
  • lengthFieldOffsetThe offset of the length field. The length field is located at the beginning of the entire packet byte array.
  • lengthFieldLengthLength Indicates the length of bytes in the field. Length Indicates the length of bytes in the field.
  • lengthAdjustmentOffset correction of the length domain. If the value of the length field contains the length of other fields (such as the length field itself) in addition to the length of the valid data field, then correction is required. The corrected value is:Packet length – Length field value – Length field offset – Length field length.
  • initialBytesToStripThe initial number of bytes discarded. Discards bytes before this index value.

The first three parameters are relatively simple and can be illustrated with the following diagram:

What does it mean to correct the offset?

It is assumed that your length field contains the length of other fields besides the valid data, so you need to set this value for correction, otherwise the decoder can not get valid data.

The initial number of bytes discarded. Discard all data before the index value and discard all data after the index value. Generally, data in length fields is discarded. Of course, if you want to get all the data, set it to 0.

The following uses a custom length frame decoder at the message receiver to solve the sticky packet problem:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	// The maximum length of a packet is 1024
    // The initial index of the length field is 0
    // The length of the data field is 4
    // The correction value is 0 because the length field only has the value of the length of the valid data
    // Discard data starts at 4, since the length field is 4, I need to discard the length field to get valid data
    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024.0.4.0.4));
    ch.pipeline().addLast(new TcpClientHandler());
}
Copy the code

Then write the sender code, according to the Settings of the decoder, send:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
	for (int i = 1; i <= 5; i++) {
    	String str = "msg No" + i;
        ByteBuf byteBuf = Unpooled.buffer(1024);
        byte[] bytes = str.getBytes(Charset.forName("utf-8"));
        // Set the length field to the length of the valid data
        byteBuf.writeInt(bytes.length);
        // Set valid databyteBuf.writeBytes(bytes); ctx.writeAndFlush(byteBuf); }}Copy the code

Then start the server, client, and we can see the console print result:

It can be seen that the sticky packet problem is solved by using the custom length frame decoder.

Use the Google Protobuf codec

NettywebsiteIt clearly says support for Google Protobuf, as shown below:

What is Google Protobuf

From the official website: Protocol buffers are Google’s language-neutral, platform-neutral, Extensible Mechanism for serializing structured data — think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages.

Protocol Buffers is Google’s language-independent, platform-independent, extensible mechanism for serializing data, similar to XML, but smaller, faster, and simpler. You only need to define how your data is structured once, and then you can easily write and read structured data into a variety of data streams and support multiple languages using specially generated source code.

It can be used in many scenarios such as RPC or TCP communication. Generally speaking, if the client and server are using different languages, a data structure can be defined on the server, converted into a byte stream by a protobuf, and then sent to the client for decoding to obtain the corresponding data structure. That’s the magic of Protobuf. In addition, its communication efficiency is extremely high. “The size of a message data after protobuf serialization is 1/10 of JSON, 1/20 of XML format, and 1/10 of binary serialization”.

Google Protobuf website: developers. Google. Cn/protocol – bu…

Why use Google Protobuf

In some situations, the data needs to be on a different platform, transmission and use in different applications, such as a message is to use c + + programs, and the other is a program written in Java, currently produces a message data, need to be written in different languages in different applications, how to send messages and used in various applications? This requires designing a message format. Json and XML are commonly used, but protobuf comes later.

Google Protobuf advantages

  • The main advantages of protobuf are simplicity, speed;
  • Protobuf serializes data to binary and takes up relatively little space, basically keeping only the data part, while XML and JSON have message structures attached to the data.
  • Protobuf is easy to use, requiring only deserialization rather than layer parsing like XML and JSON.

Google Protobuf installation

Because I am a Mac system here, in addition to using DMG and PKG to install software on Mac, it is more convenient to use BREW command for installation, which can help install other required dependencies, thus reducing unnecessary trouble.

Install the latest version of Protoc

  1. Download protobuf3 from Github

Github.com/protocolbuf…

For Mac system, select the first one, as shown below:

  1. After the download succeeds, switch to user root
sudo -i
Copy the code
  1. Unzip the package and go to the directory you unzipped yourself
Tar xyf protobuf-all-3.13.0.tar.gz CD protobuf-3.13.0Copy the code
  1. Setting the compile directory
./configure --prefix=/usr/local/protobuf
Copy the code
  1. The installation
make
make install
Copy the code
  1. Configuring environment Variables

Step 1: Find the.bash_profile file and edit it

cd ~
open .bash_profile
Copy the code

Step 2: Then add the following configuration to the end of the opened bash_profile:

export PROTOBUF=/usr/local/protobuf 
export PATH=$PROTOBUF/bin:$PATH
Copy the code

Step 3: Source the file into effect

source .bash_profile
Copy the code
  1. Test installation results
protoc --version
Copy the code

Use Google Protobuf

Refer to the Github project guide for Google Protobuf for the following steps.

Github.com/protocolbuf…

Step 1: Add maven dependencies

<dependency>
  <groupId>com.google.protobuf</groupId>
  <artifactId>protobuf-java</artifactId>
  <version>3.11. 0</version>
</dependency>
Copy the code

Step 2: Write the proto file message.proto

How to write. Proto file related documentation, you can go to the official website to view the following to write an example, please see the demonstration:

syntax = "proto3"; / / version
option java_outer_classname = "MessagePojo";// The generated external class name, which is also the file name

message Message {
    int32 id = 1;// An attribute of the Message class. The attribute name is id and the sequence number is 1
    string content = 2;// An attribute of the Message class. The attribute name is content and the serial number is 2
}
Copy the code

Step 3: Use a compiler to generate code from a.proto file

After the above installation steps, go to the bin directory and you will see an executable file called Protoc

cd /usr/local/protobuf/bin/
Copy the code

And then copy what I wrote earlierMessage.protoFile to this directory, as shown in the figure:Enter the command:

protoc --java_out=. Message.proto
Copy the code

You can then see the generated Messagepojo.java file. Finally, copy the files into the IDEA project.

Step 4: Add encoder at sender and decoder at receiver

The client adds an encoder to encode the message.

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	// Add the Protobuf encoder to the sender
    ch.pipeline().addLast(new ProtobufEncoder());
	ch.pipeline().addLast(new TcpClientHandler());
}
Copy the code

The server adds a decoder to decode the message.

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	// Add a Protobuf decoder. The constructor needs to specify the specific object instance to decode
	ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance()));
	// Set the handler for the pipeline
	ch.pipeline().addLast(new TcpServerHandler());
}
Copy the code

Step 5: Send a message

The client sends the message with the following code:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
	// The builder pattern is used to create objects
	MessagePojo.Message message = MessagePojo
    		.Message
            .newBuilder()
            .setId(1)
            .setContent("Ten cents, take off.")
            .build();
    ctx.writeAndFlush(message);
}
Copy the code

The server receives the data and prints:

@Override
protected void channelRead0(ChannelHandlerContext ctx, MessagePojo.Message messagePojo) throws Exception {
    System.out.println("id:" + messagePojo.getId());
    System.out.println("content:" + messagePojo.getContent());
}
Copy the code

Test results are correct:

Analyze packet sticking and unpacking of Protocol

Actually using the Protocol codec directly has sticky packet problems.

To prove it, the sender loop 100 times to send 100 “dime, take off” messages, see the sender code demo:

@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 1; i <= 100; I ++) {messagepojo.message Message = messagepojo.message.newBuilder ().setid (I).setContent(I + "10", take off ~").build(); ctx.writeAndFlush(message); }}Copy the code

At this point, after starting the server, the client may only print a few messages or see the following error on the console:

com.google.protobuf.InvalidProtocolBufferException: While parsing a protocol message, the input ended unexpectedly in the middle of a field. This could mean either that the input has been truncated or that an embedded message misreported its own length.

The input unexpectedly ends in the middle of a field while parsing a Protocol message. This could mean that the input is truncated, or that the embedded message misrepresents its length.

In fact, it is a sticky packet problem. Multiple data are merged into one data, resulting in abnormal parsing.

This section describes how to resolve sticky or unpacked packets of the Protocol

Only need to add encoders ProtobufVarint32LengthFieldPrepender to the sender

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
    ch.pipeline().addLast(new ProtobufEncoder());
    ch.pipeline().addLast(new TcpClientHandler());
}
Copy the code

The receiver and decoder ProtobufVarint32FrameDecoder

@Override
protected void initChannel(SocketChannel ch) throws Exception {
	ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
	ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance()));
	// Set the handler for the pipeline
	ch.pipeline().addLast(new TcpServerHandler());
}
Copy the code

Then start the server and client again, we can see normal ~

ProtobufVarint32LengthFieldPrepender encoder work is as follows:

* BEFORE ENCODE (300 bytes)       AFTER ENCODE (302 bytes)
 * +---------------+               +--------+---------------+
 * | Protobuf Data |-------------->| Length | Protobuf Data |
 * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |
 * +---------------+               +--------+---------------+
@Sharable
public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        int bodyLen = msg.readableBytes();
        int headerLen = computeRawVarint32Size(bodyLen);
        // Write request header, message length
        out.ensureWritable(headerLen + bodyLen);
        writeRawVarint32(out, bodyLen);
        // Write dataout.writeBytes(msg, msg.readerIndex(), bodyLen); }}Copy the code

ProtobufVarint32FrameDecoder decoder work is as follows:

* BEFORE DECODE (302 bytes)       AFTER DECODE (300 bytes)
 * +--------+---------------+      +---------------+
 * | Length | Protobuf Data |----->| Protobuf Data |
 * | 0xAC02 |  (300 bytes)  |      |  (300 bytes)| * + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- +public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // marks the subscript position to read
        in.markReaderIndex();
        // Get the subscript position to read
        int preIndex = in.readerIndex();
        // Decode, get the length of the message, and move the read subscript position
        int length = readRawVarint32(in);
        // Compare the subscript positions before and after decoding, if they are equal. Indicates that there are not enough bytes to read and skip to the next round
        if (preIndex == in.readerIndex()) {
            return;
        }
        If the length of the message is less than 0, an exception is thrown
        if (length < 0) {
            throw new CorruptedFrameException("negative length: " + length);
        }
        // If it is not enough to read a complete data, reset restores the subscript position.
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
        } else {
            // If not, write the data to out and the receiver will get the complete dataout.add(in.readRetainedSlice(length)); }}Copy the code

Conclusion:

  • The sender uses the encoder to send a data block in front of the message body that describes the length of the data.
  • Through the decoder, the receiver first obtains a data block describing the length of the data, knows the length of the complete data, and then obtains a complete data according to the length of the data.

Netty Heartbeat detection mechanism

What is the heart

Heartbeat is a special data packet periodically sent between a client and a server in a TCP long connection to notify the other that the client is still online and ensure the validity of the TCP connection.

Note: Heartbeat packets have another function that is often overlooked: if a connection is not used for a long time, the firewall or router will disconnect the connection.

In Netty, the key to implementing the heartbeat mechanism is the IdleStateHandler, which has its constructor:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
	this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
Copy the code

The meanings of the three parameters are as follows:

  • ReaderIdleTimeSeconds: Read timeout. That is, when there is no from within the specified time intervalChannelOne is triggered when data is readREADER_IDLEIdleStateEventEvents.
  • WriterIdleTimeSeconds: Write timeout. That is, when no data is written to within the specified intervalChannel, triggers aWRITER_IDLEIdleStateEventEvents.
  • AllIdleTimeSeconds: Read/write timeout. That is, when there is no read or write operation within the specified interval, one is emittedALL_IDLEIdleStateEventEvents.

Note: The default time unit for these three parameters is seconds. If you need to specify other units of time, you can use another constructor:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
	this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
Copy the code

To implement the Netty server heartbeat detection mechanism, add the following codes to the ChannelInitializer for the server:

pipeline.addLast(new IdleStateHandler(3.0.0, TimeUnit.SECONDS));
Copy the code

Netty heartbeat source code analysis

IdleStateHandler (); IdleStateHandler ();

The red box code actually means that the method is passed through without doing any business logic, leaving the channelRead method to the next handler in channelPipe.

Let’s look at the channelActive method again:

Here’s an initialize method, the essence of IdleStateHandler, to explore:

This will trigger a Task, ReaderIdleTimeoutTask, this Task run method source:

The first red box subtracts the current time from the last channelRead call. If the result is 6s, it means that the last channelRead call was 6s ago. If you set 5s, the nextDelay is -1, which means that you have timed out. The second red box triggers the next handler’s userEventTriggered method:

If there is no timeout, the userEventTriggered method is not triggered.

Netty heartbeat detection code example

The service side

package com.niuh.netty.heartbeat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

public class HeartBeatServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder".new StringDecoder());
                            pipeline.addLast("encoder".new StringEncoder());
                            //IdleStateHandler's readerIdleTime parameter specifies that the client connection has not been received for more than 3 seconds.
                            // The IdleStateEvent event is raised and passed on to the next handler, which must
                            // Implement the userEventTriggered method to handle the corresponding event
                            pipeline.addLast(new IdleStateHandler(3.0.0, TimeUnit.SECONDS));
                            pipeline.addLast(newHeartBeatServerHandler()); }}); System.out.println("Netty server start.");
            ChannelFuture future = bootstrap.bind(9000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally{ worker.shutdownGracefully(); boss.shutdownGracefully(); }}}Copy the code

Server-side callback handler class

package com.niuh.netty.heartbeat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;

public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {

    int readIdleTimes = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(" ====== > [server] message received : " + s);
        if ("Heartbeat Packet".equals(s)) {
            ctx.channel().writeAndFlush("ok");
        } else {
            System.out.println("Other information processing... "); }}@Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;

        String eventType = null;
        switch (event.state()) {
            case READER_IDLE:
                eventType = "Read free";
                readIdleTimes++; // Read idle count increment by 1
                break;
            case WRITER_IDLE:
                eventType = "Write free";
                / / not processing
                break;
            case ALL_IDLE:
                eventType = "Read-write idle";
                / / not processing
                break;
        }



        System.out.println(ctx.channel().remoteAddress() + "Timeout event:" + eventType);
        if (readIdleTimes > 3) {
            System.out.println("[server] read idle more than 3 times, close the connection, release more resources");
            ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); }}@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("= = =" + ctx.channel().remoteAddress() + " is active ==="); }}Copy the code

The client

package com.niuh.netty.heartbeat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Random;

public class HeartBeatClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder".new StringDecoder());
                            pipeline.addLast("encoder".new StringEncoder());
                            pipeline.addLast(newHeartBeatClientHandler()); }}); System.out.println(Netty client start.);
            Channel channel = bootstrap.connect("127.0.0.1".9000).sync().channel();
            String text = "Heartbeat Packet";
            Random random = new Random();
            while (channel.isActive()) {
                int num = random.nextInt(10);
                Thread.sleep(2 * 1000); channel.writeAndFlush(text); }}catch (Exception e) {
            e.printStackTrace();
        } finally{ eventLoopGroup.shutdownGracefully(); }}static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(" client received :" + msg);
            if(msg ! =null && msg.equals("idle close")) {
                System.out.println("Server closes connection, client closes."); ctx.channel().closeFuture(); }}}}Copy the code

PS: The above code is submitted to Github: github.com/Niuh-Study/…

GitHub Org_Hejianhui /JavaStudy GitHub Hejianhui /JavaStudy