preface

When it comes to RPC, TCP communication is inevitable, and the mainstream RPC framework relies on Netty and other communication frameworks. At this time, we also need to consider whether to use long connection or short connection:

  • Short connection: Close the connection after each communication and re-create the connection for the next communication. The advantage is that there is no need to manage the connection, no need to live the connection;
  • Long connection: the connection is not closed at the end of each communication, and the connection can be reused to ensure performance. The disadvantage is that the connection needs to be unified management, and need to keep alive;

Mainstream RPC frameworks pursue performance and choose to use long connections, so how to keep the connection alive is an important topic, which is also the topic of this article. Some keepalive strategies will be introduced below.

Why is it necessary to keep alive

The long-link and short-link functions described above are not provided by TCP. Therefore, applications need to implement long-link functions by themselves, including unified connection management and maintenance. How to keep alive before we talk about why do we need to keep alive? The main reason is that the network is not 100% reliable. The connection created by us may be unavailable due to network reasons. If there are messages coming and going through the connection, the system will immediately perceive that the connection is disconnected. However, there may be no message in our system for a long time, leading to the failure of the system to timely perceive the unavailability of the connection, that is, to timely process the reconnection or release the connection; Common Keepalive policies are implemented by the application layer using the heartbeat mechanism and the TCP Keepalive detection mechanism provided by the network layer.

TCP Keepalive mechanism

The TCP Keepalive function is implemented by the operating system and is not part of THE TCP protocol. It needs to be configured in the operating system. If no data is transmitted during a period of time, TCP sends a Keepalive probe to confirm the availability of the connection. Keepalive several kernel parameters configuration:

  • Tcp_keepalive_time: indicates the length of time during which no probe request is sent. The default value is 7200s (2h).
  • Tcp_keepalive_probes: the default value is 10.
  • Tcp_keepalive_intvl: The default retry interval is 75 seconds. You can modify the preceding parameters in the /etc/sysctl.conf file. Whether to use Keepalive is enough. In fact, it is not enough. Keepalive only works on the network layer. So often need to be combined with the heartbeat mechanism to use together;

heartbeat

What is the heartbeat mechanism? Simply speaking, the client starts a timer to send a request periodically. The server receives the request and responds. Let’s take Dubbo as an example to see how this is implemented;

Dubbo2.6. X

In HeaderExchangeClient started the timer ScheduledThreadPoolExecutor to regularly perform heart request:

ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat".true));
Copy the code

Start heartbeat timer when HeaderExchangeClient is instantiated:

private void startHeartbeatTimer() {
        stopHeartbeatTimer();
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
                    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
                        @Override
                        public Collection<Channel> getChannels() {
                            returnCollections.<Channel>singletonList(HeaderExchangeClient.this); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); }}Copy the code

The default heartbeat value is 60 seconds, and the default heartbeatTimeout value is heartbeat*3. HeartBeatTask indicates the HeartBeatTask:

public void run() {
		long now = System.currentTimeMillis();
		for (Channel channel : channelProvider.getChannels()) {
			if (channel.isClosed()) {
				continue;
			}
			Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
			Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
			if((lastRead ! = null && now - lastRead > heartbeat) || (lastWrite ! = null && now-lastWrite > heartbeat) {// Send heartbeat}if(lastRead ! = null && now - lastRead > heartbeatTimeout) {if (channel instanceof Client) {
					((Client) channel).reconnect();
				} else{ channel.close(); }}}}Copy the code

Because both Dubbo ends send heartbeat requests, you can see that there are two points in time: lastRead and lastWrite; Of course, if the interval between the last read and the last write is greater than heartbeat, the heartbeat request will be sent; If the result is not returned after multiple heartbeats, that is, the time to read the message is longer than heartbeatTimeout, the Client will determine whether it is currently a Client or a Server. If the Client initiates a reconnect, the Server will close the connection. Client calls are strongly dependent on available connections, while the server can wait for the client to re-establish the connection; The above is just the introduction of the Client, the same Server side also has the same heartbeat processing, you can view headerExchange erver;

Dubbo2.7.0

The heartbeat mechanism of Dubbo2.7.0 has been enhanced on the basis of 2.6.X, and HashedWheelTimer is also used to start heartbeat detection in HeaderExchange Lient, which is a time wheel timer provided by Netty. In addition, HashedWheelTimer performs better than Schedule when the task execution time is very short, which is especially suitable for heartbeat detection.

HashedWheelTimer heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat".true), tickDuration,
                    TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
Copy the code

Two scheduled tasks are started: startHeartBeatTask and startReconnectTask:

private void startHeartbeatTimer() {
        AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);

        long heartbeatTick = calculateLeastDuration(heartbeat);
        long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
        HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
        ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);

        // init task and start timer.
        heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);
        heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
    }
Copy the code

HeartbeatTimerTask: sends heartbeat requests periodically. The default heartbeat interval is 60 seconds. Here, the time is recalculated, which is actually divided by 3 on the original basis. In fact, the detection interval is shortened, and the probability of timely discovery of dead chain is increased. Take a look at the two tasks:

protected void doTask(Channel channel) {
		Long lastRead = lastRead(channel);
		Long lastWrite = lastWrite(channel);
		if((lastRead ! = null && now() - lastRead > heartbeat) || (lastWrite ! = null && now() - lastWrite > heartbeat)) { Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); }}Copy the code

As above, check the size of the last read/write time and heartbeat. Note: Both normal and heartbeat requests update the read/write time.

protected void doTask(Channel channel) {
		Long lastRead = lastRead(channel);
		Long now = now();
		if(lastRead ! = null && now - lastRead > heartbeatTimeout) {if (channel instanceof Client) {
				((Client) channel).reconnect();
			} else{ channel.close(); }}}Copy the code

Similarly, in the case of timeout, the Client reconnects and the Server closes the connection. The Server also has the same heartbeat processing. You can view HeaderExchangeServer.

Dubbo2.7.1 – X

After Dubbo2.7.1, we use the IdleStateHandler provided by Netty to implement the heartbeat mechanism service:

public IdleStateHandler(
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }
Copy the code
  • ReaderIdleTime: read timeout time.
  • WriterIdleTime: write timeout time.
  • AllIdleTime: timeout for all types; Based on the timeout set, the loop checks how long the read/write event has not occurred. After adding IdleSateHandler to the pipeline, the IdleStateEvent event can be detected in the userEventTriggered method of any of the Pipeline handlers. Let’s look at the IdleStateHandler added to the Client and Server side:

The Client side

	protected void initChannel(Channel ch) throws Exception {
		final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
		int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
		ch.pipeline().addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
				.addLast("handler", nettyClientHandler);
	}
Copy the code

The Client adds IdleStateHandler to NettyClient and sets the read/write timeout to 60 seconds by default. If no read/write event occurs for 60 seconds, the IdleStateEvent event will be handled by NettyClientHandler:

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            try {
                NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
                Request req = new Request();
                req.setVersion(Version.getProtocolVersion());
                req.setTwoWay(true); req.setEvent(Request.HEARTBEAT_EVENT); channel.send(req); } finally { NettyChannel.removeChannelIfDisconnected(ctx.channel()); }}else{ super.userEventTriggered(ctx, evt); }}Copy the code

The IdleStateEvent event was received to send a heartbeat request. As for how the Client handles reconnection, two tasks are also started in HeaderExchange Lient using the HashedWheelTimer timer: The heartbeat task and the reconnect task can also be triggered by userEventTriggered.

The Server side

protected void initChannel(NioSocketChannel ch) throws Exception {
		int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
		final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
		ch.pipeline().addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
				.addLast("handler", nettyServerHandler);
	}
Copy the code

The default timeout specified on the Server side is 60*3 seconds to process userEventTriggered in NettyServerHandler

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            try {
                channel.close();
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            }
        }
        super.userEventTriggered(ctx, evt);
    }
Copy the code

If no read or write occurs on the Server within the specified timeout period, the connection is directly closed. Compared with the previous heartbeat, only the Client sends the heartbeat one-way. Similarly, HeaderExchangeServer does not start multiple entries, only one CloseTimerTask is started, which is used to detect timeout to close the connection. IdleStateHandler already implements this function.

** To summarize: ** Both the heartbeat + reconnection mechanism is enabled in HeaderExchangeClient with IdleStateHandler, and the closed connection mechanism is enabled in HeaderExchangeServer; This is mainly because IdleStateHandler is specific to Netty frameworks, while Dubbo supports a variety of low-level communication frameworks including Mina, Grizzy, etc.

conclusion

This paper first introduces the long connection mode introduced in RPC, and then introduces the long connection survival mechanism. Why is it necessary to survive? Then, TCP Keepalive mechanism and application layer heartbeat mechanism are introduced respectively. Finally, we take Dubbo as an example to see the evolution of heartbeat mechanism in each version.