“This is the 22nd day of my participation in the First Challenge 2022. For details: First Challenge 2022”

Netty heartbeat

Heartbeat is a special data packet periodically sent between a client and a server in a TCP long connection to notify the other that the client is still online and ensure the validity of the TCP connection. In Netty, the key to implementing the heartbeat mechanism is the IdleStateHandler, which has its constructor:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {

    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);

}
Copy the code

The following three parameters are explained here:

ReaderIdleTimeSeconds: Read timeout. That is, an IdleStateEvent event of READER_IDLE is fired when no data has been read from a Channel within the specified time interval.

WriterIdleTimeSeconds: Write timeout. That is, when no data is written to a Channel within the specified time interval, an IdleStateEvent event with WRITER_IDLE is emitted.

AllIdleTimeSeconds: Read/write timeout. That is, an ALL_IDLE IdleStateEvent event is emitted when there is no read or write operation within the specified time interval.

Note: The default time unit for these three parameters is seconds. If you need to specify other units of time, you can use another constructor:

IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
Copy the code

Adding a heartbeat on the Netty server

To implement the Netty server heartbeat detection mechanism, add the following codes to the ChannelInitializer for the server:

pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
Copy the code

IdleStateHandler source

IdleStateHandler (); IdleStateHandler ();

The red box code means that the channelRead method is passed through without doing any business logic, leaving the channelRead method to the next handler in channelPipe:

There’s an initialize method here, and that’s the essence of IdleStateHandler, so explore that

This will trigger a Task, ReaderIdleTimeoutTask, this Task run method source:

The first red box subtracts the current time from the last channelRead call. If the result is 6s, it means that the last channelRead call was 6s ago. If you set 5s, the nextDelay is -1, which means that you have timed out. The second red box triggers the next handler’s userEventTriggered method:

If there is no timeout, the userEventTriggered method is not triggered.

The implementation code

Server code

package com.jony.netty.heartbeat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; public class HeartBeatServer { public static void main(String[] args) throws Exception { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); // The IdleStateHandler readerIdleTime parameter specifies that no connection to the client has been received for more than 3 seconds. The IdleStateEvent event is raised and passed to the next handler. The next handler must // implement the userEventTriggered method to handle the corresponding event pipeline.addLast(new IdleStateHandler(3, 0, 0, timeUnit.seconds)); pipeline.addLast(new HeartBeatServerHandler()); }}); System.out.println("netty server start." ); ChannelFuture future = bootstrap.bind(9000).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); }}}Copy the code

Client code

package com.jony.netty.heartbeat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Random; public class HeartBeatClient { public static void main(String[] args) throws Exception { EventLoopGroup eventLoopGroup =  new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new HeartBeatClientHandler()); }}); System.out.println("netty client start." ); Channel = bootstrap.connect("127.0.0.1", 9000).sync().channel(); String text = "Heartbeat Packet"; Random random = new Random(); while (channel.isActive()) { int num = random.nextInt(10); Thread.sleep(num * 1000); channel.writeAndFlush(text); } } catch (Exception e) { e.printStackTrace(); } finally { eventLoopGroup.shutdownGracefully(); } } static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(" client received :" + msg); if (msg ! = null && msg.equals("idle close")) {system.out.println (" server closed, client closed "); ctx.channel().closeFuture(); }}}}Copy the code

In order to make the server heartbeat monitoring abnormal, the server set 3s to monitor a heartbeat, then our client do the following processing, random generation of a random code, can produce abnormal heartbeat.

Random random = new Random();
while (channel.isActive()) {
    int num = random.nextInt(10);
    Thread.sleep(num * 1000);
    channel.writeAndFlush(text);
}
Copy the code

Message receiving processor

package com.jony.netty.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> { int readIdleTimes = 0; @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { System.out.println(" ====== > [server] message received : " + s); if ("Heartbeat Packet".equals(s)) { ctx.channel().writeAndFlush("ok"); } else {system.out.println (" other information processing... "); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; Switch (event.state()) {case READER_IDLE: eventType = "Read idle "; readIdleTimes++; // Read idle count + 1 break; Case WRITER_IDLE: eventType = "Write idle "; // do not handle break; Case ALL_IDLE: eventType = "Read-write idle "; // do not handle break; } system.out.println (ctx.channel().remoteAddress() + "timeout event:" + eventType); If (readIdleTimes > 3) {system.out.println (" [server] read idle more than 3 times, close the connection, release more resources "); ctx.channel().writeAndFlush("idle close"); ctx.channel().close(); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.err.println("=== " + ctx.channel().remoteAddress() + " is active ==="); }}Copy the code

If the heartbeat detection is abnormal, close the pipe connection to the client.

The output

The service side

====== > [server] message received: Heartbeat Packet /127.0.0.1:61306 Timeout event: Read idle ====== > [server] message received: Heartbeat Packet ====== > [server] message received : ====== > [server] message received: Heartbeat Packet /127.0.0.1:61306 Heartbeat Packet /127.0.0.1:61306 Timeout event: Reading is idle /127.0.0.1:61306 Timeout event: Reading is idle [Server] If reading is idle for more than three times, the connection is closed to release more resourcesCopy the code

The client

Client received: OK Client received: OK client received: OK client Received: OK Client Received: IDLE close The server closes the connection. The client is also closedCopy the code

Based on the above information, you can see that the service shuts down automatically when the read is idle for more than 3 times.

Netty zero copy

Netty receives and sends bytebuffers using DIRECT BUFFERS, which use out-of-heap DIRECT memory for Socket reading and writing without the need for secondary copy of byte BUFFERS. If traditional JVM HEAP BUFFERS are used for Socket reads and writes, the JVM copies the HEAP Buffer to direct memory before writing to the Socket. JVM heap memory data cannot be written directly into sockets. The message is sent with an extra memory copy of the buffer compared to direct out-of-heap memory. NioByteUnsafe. Read ()

Direct memory

Direct Memory is not part of the run-time data portion of the virtual machine, nor is it defined in the Java VIRTUAL Machine specification, and it can be used frequently in some cases and can cause OutofMemoryErrors. JavaDirectByteBuffer can allocate a block of direct memory (off-heap memory). The corresponding memory of the meta space is also called direct memory, which corresponds to the physical memory of the machine.

Note: At this point only the DirectByteBuffer object is in JVM memory, and its data is in out-of-heap memory, i.e. direct memory.

The difference between direct and heap memory

package com.jony.netty.directbuffer; import java.nio.ByteBuffer; Public class DirectMemoryTest {public static void heapAccess() {long startTime = System.currentTimeMillis(); // Allocate memory ByteBuffer = allocate memory ByteBuffer. Allocate (1000); for (int i = 0; i < 100000; i++) { for (int j = 0; j < 200; j++) { buffer.putInt(j); } buffer.flip(); for (int j = 0; j < 200; j++) { buffer.getInt(); } buffer.clear(); } long endTime = System.currentTimeMillis(); System.out.println(" heap memory access :" + (endtime-startTime)); } public static void directAccess() { long startTime = System.currentTimeMillis(); / / allocate memory directly ByteBuffer buffer = ByteBuffer. AllocateDirect (1000); for (int i = 0; i < 100000; i++) { for (int j = 0; j < 200; j++) { buffer.putInt(j); } buffer.flip(); for (int j = 0; j < 200; j++) { buffer.getInt(); } buffer.clear(); } long endTime = System.currentTimeMillis(); System.out.println(" direct memory access :" + (endtime-startTime)); } public static void heapAllocate() { long startTime = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { ByteBuffer.allocate(100); } long endTime = System.currentTimeMillis(); System.out.println(" heap memory request :" + (endtime-startTime)); } public static void directAllocate() { long startTime = System.currentTimeMillis(); for (int i = 0; i < 100000; i++) { ByteBuffer.allocateDirect(100); } long endTime = System.currentTimeMillis(); System.out.println(" direct memory request :" + (endtime-startTime)); } public static void main(String args[]) { for (int i = 0; i < 10; i++) { heapAccess(); directAccess(); } System.out.println(); for (int i = 0; i < 10; i++) { heapAllocate(); directAllocate(); }}}Copy the code

The output

Heap memory access :101 Direct memory access :63 Direct memory access :75 Direct memory access :46 direct memory access :47 Direct memory access :67 Direct memory access :135 Direct memory access :43 direct memory access :117 Direct memory access :54 direct memory access :70 Direct memory access :52 Heap memory access :74 Direct memory access :39 Heap memory access :56 Direct memory access :40 Heap memory access :56 Direct memory access :36 heap memory access :58 Direct memory access :39 Heap memory request :15 Direct memory request :50 heap memory request :11 Direct memory request :41 Heap memory application :106 Direct memory application :57 Heap memory application :2 Direct memory application :30 Heap memory application :2 Direct memory application :112 heap memory application :2 Direct memory application :31 heap memory application :2 Direct memory application :25 heap memory application :3 Direct memory application :27 heap memory application :7 Direct memory application :30 Heap memory application :6 Direct memory application :185Copy the code

It can be seen from the program running results that direct memory request is slow, but access efficiency is high. On Java VIRTUAL machine implementations, local I/OS operate directly on direct memory (direct memory => System call => Disk/NIC), while non-direct memory requires a secondary copy (heap memory => Direct memory => System call => Disk/NIC).

Direct memory allocation source code analysis:

public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); 3} DirectByteBuffer(int cap) {package‐private super(1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); ‐XX:MaxDirectMemorySize=<size> to determine whether there is sufficient direct memory space to be allocated. // A call to system.gc () triggers a full GC to reclaim some of the unused direct memory reference objects. Direct memory will also be released at the same time / / if the release of the allocated space is not enough throws an exception. Java lang. OutOfMemoryError Bits. ReserveMemory (size, cap); long base = 0; // Call the unsafe local method to allocate immediate memory. Base = unsafe. AllocateMemory (size); } catch (OutOfMemoryError x) {/ Allocate failed, free memory Bits. UnreserveMemory (size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps ! Boundary address = base + ps ‐ (base & (ps ‐ 1)); } else { address = base; } // Use the Cleaner mechanism to register the memory collection processing function, when the direct memory reference object is cleaned up by the GC, Cleaner = cleaner. create(this, new Deallocator(base, size, cap)); } // Request a block of local memory. The memory space is uninitialized and its contents are unpredictable. Public native long allocateMemory(long bytes); // openjdk8/hotspot/src/share/vm/prims/unsafe.cpp UNSAFE_ENTRY(jlong, Unsafe_AllocateMemory(JNIEnv *env, jobject unsafe, jlong size)) UnsafeWrapper("Unsafe_AllocateMemory"); size_t sz = (size_t)size; if (sz ! = (julong)size || size < 0) { THROW_0(vmSymbols::java_lang_IllegalArgumentException()); } if (sz == 0) { return 0; } 54 sz = round_to(sz, HeapWordSize); Void * x = OS ::malloc(sz, mtInternal); void* x = OS ::malloc(sz, mtInternal); if (x == NULL) { THROW_0(vmSymbols::java_lang_OutOfMemoryError()); } //Copy::fill_to_words((HeapWord*)x, sz / HeapWordSize); return addr_to_java(x); 62 UNSAFE_ENDCopy the code

Advantages and disadvantages of using direct memory:

Advantages:

Does not occupy heap memory space, reducing the likelihood of GC occurring

On a Java VM, local I/O operations directly on the direct memory (direct memory => System call => Hard disk/NIC), whereas non-direct memory requires a secondary copy (heap memory => Direct memory => System call => Hard disk/NIC).

Disadvantages:

Initial allocation is slow

Without a JVM to help manage memory directly, memory overruns can occur. To avoid never having a FULL GC, you end up with direct memory running out of physical memory. We can specify the maximum value of direct memory by -xx: MaxDirectMemorySize. When this threshold is reached, system.gc is called to perform a FULL GC, indirectly reclaiming unused direct memory.