Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

If you are interested in Netty, you can click here for my Netty column.

In network communication, network links are unstable, and exceptions often occur, which are request timeout or response timeout. This type of exception affects system reliability. So how do you detect communication anomalies? What happens when you detect anomalies? I was supposed to talk about handling overtime.

Timeout monitoring

The Netty timeout type IdleState is classified into the following three types:

  • ALL_IDLE: no data is received or sent within a period of time.
  • READER_IDLE: No data has been received for a period of time.
  • WRITER_IDLE: No data is sent for a period of time.

Netty provides three types of ChannelHandlers to detect the above three types of timeout exceptions.

  • IdleStateHandler: An IdleStateEvent event is emitted when a Channel has not read, written, or both for a period of time.
  • ReadTimeoutHandler: Raises the ReadTimeoutEvent event when no data has been read for a certain amount of time.
  • WriteTimeoutHandler: Raises the WriteTimeoutEvent event when the write operation cannot be completed within a certain period of time.

IdleStateHandler class

IdleStateHandler includes read/write timeout state handling. Look at the following IdleStateHandler constructor source code.

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this.writeListener = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            IdleStateHandler.this.lastWriteTime = IdleStateHandler.this.ticksInNanos();
            IdleStateHandler.this.firstWriterIdleEvent = IdleStateHandler.this.firstAllIdleEvent = true; }};this.firstReaderIdleEvent = true;
    this.firstWriterIdleEvent = true;
    this.firstAllIdleEvent = true;
    ObjectUtil.checkNotNull(unit, "unit");
    this.observeOutput = observeOutput;
    if (readerIdleTime <= 0L) {
        this.readerIdleTimeNanos = 0L;
    } else {
        this.readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
    }

    if (writerIdleTime <= 0L) {
        this.writerIdleTimeNanos = 0L;
    } else {
        this.writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
    }

    if (allIdleTime <= 0L) {
        this.allIdleTimeNanos = 0L;
    } else {
        this.allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS); }}Copy the code

In the above source, the constructor can take the following arguments:

  • ReaderIdleTimeSecond: Specifies the read timeout period. 0 indicates that the timeout period is disabled.

  • WriterIdleTimeSecond: Specifies the write timeout period. 0 indicates that the write timeout period is disabled.

  • AllIdleTimeSecond: When the read and write timeout period is specified, 0 indicates that the read and write timeout period is disabled.

IdleStateHandler An example command output is as follows:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("idleStateHandler".new IdleStateHandler(60.30.0));
        channel.pipeline().addLast("myHandler".newMyHandler()); }}public class MyHandler extends ChannelDuplexHandler {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent e = (IdleStateEvent) evt;
            if(e.state() == IdleState.READER_IDLE){
                ctx.close();
            }else if(e.state() == IdleState.WRITER_IDLE){
                ctx.writeAndFlush(newPingMessage()); }}}}Copy the code

In the example above, IdleStateHandler sets the read timeout to 60 seconds and the write timeout to 30 seconds. MyHandler is a handler for the timeout event IdleStateEvent.

  • Example of sending a ping message if there is no outbound traffic within 30 seconds (write timeout).
  • If there is no inbound traffic for 60 seconds (read timeout), the connection is closed.

ReadTimeoutHandler class

The ReadTimeoutHandler class includes read timeout status handling. ReadTimeoutHandler class ReadTimeoutHandler

public class ReadTimeoutHandler extends IdleStateHandler {
    private boolean closed;

    public ReadTimeoutHandler(int timeoutSeconds) {
        this((long)timeoutSeconds, TimeUnit.SECONDS);
    }

    public ReadTimeoutHandler(long timeout, TimeUnit unit) {
        super(timeout, 0L.0L, unit);// Write timeout and read/write timeout are disabled
    }

    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;// Only read timeout is handled

        this.readTimedOut(ctx);
    }

    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);// Throw an exception
            ctx.close();
            this.closed = true; }}}Copy the code

ReadTimeoutHandler (IdleStateHandler) {READER_IDLE (IdleStateHandler) {ReadTimeoutHandler (IdleStateHandler) {READER_IDLE (IdleStateHandler); And raise a ReadTimeoutException.

The following is an example of the ReadTimeoutHandler:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("readTimeoutHandler".new ReadTimeoutHandler(30));
        channel.pipeline().addLast("myHandler".newMyHandler()); }}// The handler handles ReadTimeoutException
public class MyHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof ReadTimeoutException){
            / /...
        }else {
            super.exceptionCaught(ctx,cause); }}}Copy the code

In the example above, the ReadTimeoutHandler sets the read timeout to 30 seconds.

WriteTimeoutHandler class

The WriteTimeoutHandler class includes write timeout status handling. The WriteTimeoutHandler class has the following source code:

public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
    private static final long MIN_TIMEOUT_NANOS;
    private final long timeoutNanos;
    private WriteTimeoutHandler.WriteTimeoutTask lastTask;
    private boolean closed;

    public WriteTimeoutHandler(int timeoutSeconds) {
        this((long)timeoutSeconds, TimeUnit.SECONDS);
    }

    public WriteTimeoutHandler(long timeout, TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");
        if (timeout <= 0L) {
            this.timeoutNanos = 0L;
        } else {
            this.timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS); }}public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (this.timeoutNanos > 0L) {
            promise = promise.unvoid();
            this.scheduleTimeout(ctx, promise);
        }

        ctx.write(msg, promise);
    }

    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        WriteTimeoutHandler.WriteTimeoutTask task = this.lastTask;

        WriteTimeoutHandler.WriteTimeoutTask prev;
        for(this.lastTask = null; task ! =null; task = prev) {
            task.scheduledFuture.cancel(false);
            prev = task.prev;
            task.prev = null;
            task.next = null; }}private void scheduleTimeout(ChannelHandlerContext ctx, ChannelPromise promise) {
        WriteTimeoutHandler.WriteTimeoutTask task = new WriteTimeoutHandler.WriteTimeoutTask(ctx, promise);
        task.scheduledFuture = ctx.executor().schedule(task, this.timeoutNanos, TimeUnit.NANOSECONDS);
        if(! task.scheduledFuture.isDone()) {this.addWriteTimeoutTask(task); promise.addListener(task); }}private void addWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
        if (this.lastTask ! =null) {
            this.lastTask.next = task;
            task.prev = this.lastTask;
        }

        this.lastTask = task;
    }

    private void removeWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
        if (task == this.lastTask) {
            assert task.next == null;

            this.lastTask = this.lastTask.prev;
            if (this.lastTask ! =null) {
                this.lastTask.next = null; }}else {
            if (task.prev == null && task.next == null) {
                return;
            }

            if (task.prev == null) {
                task.next.prev = null;
            } else {
                task.prev.next = task.next;
                task.next.prev = task.prev;
            }
        }

        task.prev = null;
        task.next = null;
    }

    protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!this.closed) {
            ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
            ctx.close();
            this.closed = true; }}/ /...
}
Copy the code

WriteTimeoutHandler raises a WriteTimeoutException when handling a timeout.

The following is an example of WriteTimeoutHandler:

public class MyChannelInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel channel) throws Exception {
        channel.pipeline().addLast("writeTimeoutHandler".new WriteTimeoutHandler(30));
        channel.pipeline().addLast("myHandler".newMyHandler()); }}// The handler handles ReadTimeoutException
public class MyHandler extends ChannelDuplexHandler {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(cause instanceof WriteTimeoutException ){
            / /...
        }else {
            super.exceptionCaught(ctx,cause); }}}Copy the code

In the example above, WriteTimeoutHandler sets the write timeout to 30 seconds.

Implement the heartbeat mechanism

The solution to timeouts – the heartbeat mechanism.

The heartbeat mechanism is very common in program development. The idea is that a heartbeat can be sent to maintain a connection when it is idle. Generally speaking, a heartbeat is a small communication.

1. Define the heartbeat processor

public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
	
	// (1) Heartbeat content
	private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
			.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
					CharsetUtil.UTF_8));  

	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
			throws Exception {

		// (2) Determine the timeout type
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt;
			String type = "";
			if (event.state() == IdleState.READER_IDLE) {
				type = "read idle";
			} else if (event.state() == IdleState.WRITER_IDLE) {
				type = "write idle";
			} else if (event.state() == IdleState.ALL_IDLE) {
				type = "all idle";
			}

			// (3) Send heartbeat
			ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
					ChannelFutureListener.CLOSE_ON_FAILURE);
 
			System.out.println( ctx.channel().remoteAddress()+"Timeout type:" + type);
		} else {
			super.userEventTriggered(ctx, evt); }}}Copy the code

Description of the above code:

  1. Defines what to send in a heartbeat.

  2. Check whether IdleStateEvent is an event. If yes, the event is processed.

  3. Sends the heartbeat content to the client.

2. Define ChannelInitializer

HeartbeatHandlerInitializer encapsulates all kinds of ChannelHandler, the code is as follows:

public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {

	private static final int READ_IDEL_TIME_OUT = 4; / / read timeout
	private static final int WRITE_IDEL_TIME_OUT = 5;/ / write timeout
	private static final int ALL_IDEL_TIME_OUT = 7; // All timeouts

	@Override
	protected void initChannel(Channel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
				WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); / / (1)
		pipeline.addLast(new HeartbeatServerHandler()); / / (2)}}Copy the code

The above code is described as follows:

  1. Add aIdleStateHandlerTo ChannelPipeline, and set the read and write timeout time respectively. For the sake of the demonstration, set the timeout time to a short one.
  2. addedHeartbeatServerHandlerTo handle a timeout and send a heartbeat.

3. Write the server

The server code is relatively simple and listens on port 8083 after starting.

public final class HeartbeatServer {

    static final int PORT = 8083;

    public static void main(String[] args) throws Exception {

        // Configure the server
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new HeartbeatHandlerInitializer());

            / / start
            ChannelFuture f = b.bind(PORT).sync();

            f.channel().closeFuture().sync();
        } finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}}Copy the code

4. Test

Start the HeartbeatServer and use the Telnet program of the operating system on the client:

Telnet 127.0.0.1 8083Copy the code

You can see the interaction between the client and the server as shown below.

conclusion

If this article is helpful to you, please remember to like, follow and bookmark it.