1 introduction

A few days ago, we encountered another Netty connection timeout from the connection pool, resulting in the entire service unavailable exception. Exception accurred when acquire channel from channel pool:TimeoutException At that time, I saw this exception message, and I had a feeling of deja vu. In my impression, I first came into contact with this exception when I encountered a Netty timeout error not long ago, which caused the entire service to be unavailable. Finally, I had to restart the server to solve the problem. So I went to look at the previous exception message, found that the reported error really is also from the connection pool to get the connection timeout exception! I remember that some time ago when Netty reported this error, it happened that the relevant network department had made network adjustment. At that time, we thought that Netty’s access to connection timed out due to network reasons. However, we still knew nothing about why the service was unavailable after Netty connection timed out due to network reasons. The “ghost” Bug was a bit of a mystery to us for the time being.

The reoccurrence of the ghost Bug gives us hope of solving the Bug

Fortunately, this time the relevant colleagues reproduce the Bug, and then the other party said as long as inLarger concurrency and longer processing time of back-end business logicThis Bug will recur, and this Bug is accompanied by the foreground thread request background timeout (This is a request timeout exception, not a connection timeout exception). Therefore, I increased the concurrency and tested the simulated service timeout in the background. Indeed, the “ghost” Bug was repeated, and this Bug caused the whole service to be unavailable later. The error is shown in the screenshot below:The recurrence of this ghost Bug gives us hope of solving it,What causes the connection from the Netty pool to time out after the foreground request times out?

Note: There are two timeout exceptions, please distinguish them: one is the connection timeout exception from the connection pool; After the connection is successfully obtained from the connection pool, the foreground requests the background. Because the background service logic takes too long to execute, the request times out

We have no idea, except to look at the code for throwing exceptions. Our Netty connection pool implementation looks something like this:

// CustomChannelPool.java

public class CustomChannelPool {
  
  private ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
  acquireTimeoutAction = null;
  acquireTimeoutMillis = -1;
  maxConnect = 8;
  maxPendingAcquires = Integer.MAX_VALUE
  releaseHealthCheck=true
  / /... Omit irrelevant attribute

  static ChannelPool fixpool = 
  	new FixedChannelPool(b, handler, healthCheck, acquireTimeoutAction, 
    acquireTimeoutMillis, maxConnect, maxPendingAcquires, releaseHealthCheck, lastRecentUsed); / / [0]
	
  // Get the connection
  public Channel acquire(int timeoutMillis) throws Exception {
        try {
            Future<Channel> fch = fixpool.acquire(); / / 【 1 】
            Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);/ / [2]
            return ch;
        } catch (Exception e) {
            logger.error("Exception accurred when acquire channel from channel pool.", e);/ / [3]
            throw e; / / [4]}}// Release the connection
    public void release(Channel channel) {
		try {
			if(channel ! =null) { fixpool.release(channel); }}catch(Exception e) { e.printStackTrace(); }}}Copy the code

Then the code for the business to get the connection looks something like this:

// BusineseService.java

public class BusineseService {
	public Response rpcCall(a) throw Exception{
    	// Get the connection
    	Channel channel = CustomChannelPool.fixpool.acquire(10000); / / [5]
        try {
         	/ /... Omit relevant business logic
        	// Finally make the underlying remote call
        	channel.writeAndFlush(data);
            / /... Omit relevant business logic
        } finally {
        	// Release the connection
            // If the foreground requests the background timeout, the connection is released
        	CustomChannelPool.fixpool.release(channel); / / [6]}}}Copy the code

According to the anomalies before the information is available on the remote call call CustomChannelPool. Acquire method of Channel ch = FCH. Get (timeoutMillis, TimeUnit. MILLISECONDS); This code gets the connection timeout (after 10 seconds) from the Netty connection pool and throws a TimeoutException, Then in CustomChannelPool. Acquire method of the catch block of code to print out the Exception accurred when acquire the channel from the channel Pool :TimeoutException Exception information Then throw the exception out out, which will end in BusineseService label code [5] in the Channel Channel. = CustomChannelPool fixpool. Acquire (10000); A TimeoutException is thrown, and because the code at label [5] is not contained in the try block, the finally block at label [6] does not execute the connection logic.

Analysis here, we breathed a sigh of relief, the original cause of the “ghost” Bug because access to connect this code Channel Channel. = CustomChannelPool fixpool. Acquire (10000); Finlly block release join logic is not executed because it is not surrounded by try block.

Please calm down here!

Even if we connect to get this code Channel Channel. = CustomChannelPool fixpool. Acquire (10000); Surrounded by a try block, what will finally await us when we execute the finally block’s logic to release the connection? Apparently, a null-pointer exception awaits us! Why? Because execution Channel Channel = CustomChannelPool. Fixpool. Acquire (10000); When this code throws a TimeoutException, the received channel will be null, and then we will release the connection with a null channel, which will naturally throw an NPE.

A glimmer of hope has been extinguished by NPE, the cause of the ghost Bug is still unknown! Now we have lost our direction again!

Since the choice of the distance, they only trials and hardships, hey hey, here we self-encouragement, don’t lose heart, efforts always can solve it, is not a small bug?

So we calmly analyzed the next two sentences of the problem code:

Future<Channel> fch = fixpool.acquire(); / / 【 1 】
Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);/ / [2]
Copy the code

[1] the fixPool.acquire () method is called to acquire a connection and immediately returns a Future

object FCH, Get (timeoutMillis, timeunit.milliseconds); Method to wait for the pool of available connections to return, block until time out, when a TimeoutException is thrown.

Get (timeoutMillis, timeunit.milliseconds); Netty retrievesconnections asynchronously. Blocking waiting thread after code.

Let’s recall that the only way to reproduce this bug is if the exception is thrown with a large amount of concurrency and a large number of requests from the foreground thread that timed out. The finally block is finally released from the connection pool by the thread that has successfully obtained the connection from the connection pool.

So what could cause a connection timeout exception to be thrown to fetch from the pool? So we can’t help but speculate:

Guess 1: It is possible that a high number of concurrent requests cause the connection pool to run out of resources, resulting in a large number of connection timeouts. However, after the high number of concurrent requests, the entire service becomes unavailable (this does not mean that the application is down, but always reports a connection timeout). After high concurrency, connections should be returned to the connection pool, so there is no guarantee that the service will not become unavailable. So this guess can be ruled out, the only reason is that the connection did not return to the connection pool properly!!

As for why the connection failed to return to the connection pool, we have the following guesses:

Guess 2: The channel connection that requests the background timeout cannot be returned to the connection pool properly. After the request times out, the connection cannot be returned to the connection pool properly. As a result, the available connections in the channel connection pool are exhausted and other threads obtain the connection timeout from the connection pool. If this is the case, then why are connections that request a background timeout not properly put back into the connection pool?

Speculation 3: The request background timeout channel connection can be returned to the connection pool normally, because the channel connection from the connection pool is asynchronous, when the connection timeout, we care about the asynchronous thread can not successfully obtain a connection from the connection pool. There are two possibilities: 1) A connection cannot be fetched from the pool after a connection timeout, even if the previous code implemented a connection timeout does not matter if the connection is not released, because there is no connection at all; 1) A connection can still be successfully obtained after a connection timeout, but we can know from the previous implementation code analysis process, the connection is not released, if this is the case, then the connection pool resources will be exhausted, resulting in service unavailable!

Obviously, we need to go in the direction of guess 2 and guess 3, as to what causes the connection to fail to return to the connection pool? We’re still scratching our heads! Because the Netty connection pool is a black box for us, it’s time to open the black box and find out what it is!

3 Netty Connection Pool FixedChannelPool Source code analysis for obtaining and releasing connections

Netty channel connection pool (acquire, release); Netty channel connection pool (acquire, release) The Exception accurred when acquire channel from channel pool:TimeoutException is the reason for the connection TimeoutException.

3.1 Understanding of the overall class structure of connection pool

So this is going to beNettytheFixedChannelPoolConnection pooling at the same timeFixedChannelPoolinheritedSimpleChannelPoolAnd theSimpleChannelPoolAnd to achieve theChannelPoolInterface, as shown below:Let’s look at it firstChannelPoolInterface source code:

// ChannelPool.java

public interface ChannelPool extends Closeable {
    Future<Channel> acquire(a);
    Future<Channel> acquire(Promise<Channel> promise);
    Future<Void> release(Channel channel);
    Future<Void> release(Channel channel, Promise<Void> promise);
    void close(a);
}
Copy the code

The ChannelPool interface implements the basic interface for Netty connection pool to obtain and release a connection. The corresponding return result classes for obtaining and releasing a connection are of the Future type. The operations for Netty connection pool to obtain and release a connection are asynchronous.

Source code here first subsidy, we look atSimpleChannelPoolClass structure:First of all,SimpleChannelPoolTo achieve theNettythechannelThe basic functions of connection pooling include obtaining connections, releasing connections, and matchingchannelConnect for health check, etc. In addition,SimpleChannelPoolHow is it storedchannelThe connection? At this point, from number 4 in the figure above, you can see that a two-ended queue is defineddequeTo storechannelThe connection.

Look at theFixedChannelPoolClass structure:You can seeFixedChannelPoolinSimpleChannelPoolOn the basis of the control of connection pool quantity, the processing of pending connection timeout task, the processing strategy of pending connection timeout task and some logic of waking up pending connection after releasing the connection. Detailed analysis is as follows:

  1. Member variablesmaxPendingAcquiresIndicates the maximum number of connections in the connection pool, that is, the capacity of the connection pool.pendingAcquireCountRepresents the number of connections obtained (both from the pool and additional connections created). These two variables are used to determine whether there are any connections available in the connection pool;
  2. The inner classAcquireTaskWhen the connection pool resources are exhausted, the connection to be obtained is encapsulated into oneAcquireTaskTask;
  3. Defines aArrayDequeType of a double-ended queuependingAcquireQueueWhen the connection pool is availablechannelWhen connections are exhausted, the connections to be obtained are encapsulated into oneAcquireTaskAnd thenpendingAcquireQueueQueues are for storageAcquireTask;
  4. Member variablesmaxPendingAcquiressaidpendingAcquireQueueQueue size,pendingAcquireCountWaiting to obtainchannelThe number of connections that these two variables controlpendingAcquireQueueQueue capacity full or not;
  5. Member variablesacquireTimeoutNanosThe value is obtained from the connection poolchannelConnection timeout, internal enumeration classAcquireTimeoutActionEncapsulates the policy to be executed when the task to obtain a connection times out. The policy is created by defaultNEWAnd failureFAILStrategy;
  6. Inner abstract classTimeoutTaskTo achieve theRunnableInterface. If the connection task to be obtained times out, based onAcquireTimeoutActionPolicies to perform the task;

3.2 Source code analysis of obtaining connections from connection pools

Let’s first analyze the source of the connection pool to obtain the connection, directly on the source:

// FixedChannelPool.java

@Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        try {
            // If the current thread is executor's thread, call the acquire0 method to get the connection,
            If the future.get method is called, as long as the connection is not obtained, it will block until the connection is completed.
            if (executor.inEventLoop()) {
                acquire0(promise);
            // If the current thread is not executor's thread, the executor thread will call the acquire0 method to obtain the connection
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run(a) { acquire0(promise); }}); }}catch (Throwable cause) {
            // Failed to set callback
            promise.setFailure(cause);
        }
        // Return the guarantee that gets the Channel. The Promise inherits the Future
        re
Copy the code

Acquire0 (acquire0) acquire0 (acquire0);

// FixedChannelPool.java

private void acquire0(final Promise<Channel> promise) {
        assert executor.inEventLoop();
        // Check whether the FixedChannelPool connection pool is closed
        if (closed) {
            promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
            return;
        }
        [1] If the number of connections acquiredChannelCount is smaller than the number of Channel connections, the pool still has connections available
        AcquiredChannelCount = acquiredChannelCount ()
        if (acquiredChannelCount.get() < maxConnections) {
            assert acquiredChannelCount.get() >= 0;

            // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
            // EventLoop
            Promise<Channel> p = executor.newPromise();
            // Create a new AcquireListener, which is an internal class of FixedChannelPool.
            // TODO is used to retrieve the connection callback inside its operationComplete method.
            AcquireListener l = new AcquireListener(promise);
            AcquiredChannelCount = acquiredChannelCount AcquireListener = acquiredChannelCount = acquiredChannelCount
            AcquiredChannelCount = acquiredChannelCount = acquiredChannelCount = 1
            l.acquired();
            // Add an AcquireListener to the guarantee
            p.addListener(l);
            SimpleChannelPool = SimpleChannelPool = SimpleChannelPool
            // SimpleChannelPool simply implements new connections, health checks, etc
            super.acquire(p);
        AcquiredChannelCount acquiredChannelCount is greater than or equal to the number of Channel connections.
        AcquireTimeoutAction AcquireTimeoutAction AcquireTimeoutAction AcquireTimeoutAction AcquireTimeoutAction
        } else {
            Pendingacquires If the number of pending connections exceeds the maximum queue capacity maxPendingAcquires, cast an exception directly
            if (pendingAcquireCount >= maxPendingAcquires) {
                tooManyOutstanding(promise);
            PendingAcquireQueue pendingAcquireQueue if the pendingAcquireQueue queue is not full
            } else {
                // Encapsulate the guarantee promise waiting for the connection as an AcquireTask task
                AcquireTask task = new AcquireTask(promise);
                // Place the encapsulated AcquireTask task in the pendingAcquireQueue queue, which is an ArrayDeque queue, at the bottom
                if (pendingAcquireQueue.offer(task)) {
                    // if someone tells a story or series a series of events a incremented 1
                    ++pendingAcquireCount;
                    [Important] If the timeoutTask is not null, the processing policy for obtaining connection timeout has been set. In the current netty connection pool, the policy is either NEW or FAIL
                    if(timeoutTask ! =null) {
                        // If the connection timeout policy is set, then the timeoutTask is put into the scheduled task. If the connection timeout policy is set, then the timeoutTask is executed
                        // If the policy is NEW, a NEW connection will be created and return; If the policy is FAIL, throw an exception directly
                        Task.timeoutfuture = task.timeoutFuture = task.timeoutFuture = task.timeoutFuture = task.timeoutFuture
                        PendingAcquireQueue continues to "wake up" a task that must not have timed out at that time, so the timed task needs to be cancelled
                        task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
                    }
                The pendingAcquireQueue queue is full and an exception is thrown
                } else{ tooManyOutstanding(promise); }}assert pendingAcquireCount > 0; }}Copy the code

Here’s a look at the source logic of this TimeoutTask when the fetch connection times out:

// FixedChannelPool.TimeoutTask.java

private abstract class TimeoutTask implements Runnable {
        @Override
        public final void run(a) {
            assert executor.inEventLoop();
            // Get the current system time
            long nanoTime = System.nanoTime();
            // Enter an infinite loop
            for (;;) {
                PendingAcquireQueue obtains a connection from the pendingAcquireQueue queue. This is peek and does not remove queue elements
                PendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue
                // All tasks (including tasks not timeout) are checked? To be sure, this is the case, as shown in code comment analysis below
                AcquireTask task = pendingAcquireQueue.peek();
                // Compare nanoTime as descripted in the javadocs of System.nanoTime()
                //
                // See https://docs.oracle.com/javase/7/docs/api/java/lang/System.html#nanoTime()
                / / See https://github.com/netty/netty/issues/3705 has had the bug here estimate, the repair, hey hey. I will check this issue later when I have time
                PendingAcquireQueue () if the task obtained from the pendingAcquireQueue queue is empty, then there are no tasks to be fetched.
                If the pendingAcquireQueue task is not empty, remove the pendingAcquireQueue task
                // If there are any untimed tasks,
                // 2.1) nanotime-task. expireNanoTime < 0, if < 0 break, wait for the task timeout to execute the code here.
                // Consider the following reasons for not doing this:
                // if the connection timeout task is not executed, it will cause functional mess;
                // b) If there are too many timeout tasks, then there will be more stress on the system
                PendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue
                The pendingAcquireQueue queue is used for the rest of the connection acquisition tasks, so when a thread runs out of connections from the connection pool and returns them to the pool,
                PendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue
                // The task will wake up and then go to the connection pool to get the connection
                // 2.2) If greater than or equal to 0, the connection timeout task is executed depending on whether the NEW or FAIL policy is used
                if (task == null || nanoTime - task.expireNanoTime < 0) {
                    break;
                }
                The pendingAcquireQueue task has timed out, so you can remove the task directly from the pendingAcquireQueue queue
                pendingAcquireQueue.remove();
                // Naturally, pendingacquireevent will go down 1
                --pendingAcquireCount;
                // Remember that a FixedChannelPool constructor ends up creating a NEW TimeoutTask based on AcquireTimeoutAction's NEW or FAIL policy,
                The pendingAcquireQueue connection task is added to the pendingAcquireQueue queue and the TimeoutTask is also added to the pendingAcquireQueue queue when no connection is available from the pool
                // A scheduled task is scheduled to be executed by a thread thread, remember?
                // Call the onTimeout method of the TimeoutTaskonTimeout(task); }}// Policy method for this callback when the connection task times out according to the band
        public abstract void onTimeout(AcquireTask task);
    }
Copy the code

If the connection times out by band, the onTimeout method will be executed. If the connection times out by band, the onTimeout method will be executed.

// FixedChannelPool.java

public FixedChannelPool(Bootstrap bootstrap,
                            ChannelPoolHandler handler,
                            ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
                            final long acquireTimeoutMillis,
                            int maxConnections, int maxPendingAcquires,
                            boolean releaseHealthCheck, boolean lastRecentUsed) {
        super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
        if (maxConnections < 1) {
            throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
        }
        if (maxPendingAcquires < 1) {
            throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
        }
        The action policy is null and acquireTimeoutMillis == -1
        if (action == null && acquireTimeoutMillis == -1) {
            timeoutTask = null;
            acquireTimeoutNanos = -1;
        // Do some improper parameter verification
        } else if (action == null&& acquireTimeoutMillis ! = -1) {
            throw new NullPointerException("action");
        // Do some improper parameter verification
        } else if(action ! =null && acquireTimeoutMillis < 0) {
            throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
        // Action! AcquireTimeoutMillis = null and acquireTimeoutMillis >= -1, that is, the slave processing policy for obtaining connection timeout is set
        } else {
            acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
            // Determine the NEW or FAIL policy
            switch (action) {
                PendingAcquireQueue pendingAcquireQueue queue fails to obtain a connection if the connection fails.
                case FAIL:
                    timeoutTask = new TimeoutTask() {
                        @Override
                        public void onTimeout(AcquireTask task) {
                            // Fail the promise as we timed out.
                            task.promise.setFailure(new TimeoutException(
                                    "Acquire operation took longer then configured maximum time") {
                                @Override
                                public Throwable fillInStackTrace(a) {
                                    return this; }}); }};break;
                PendingAcquireQueue pendingAcquireQueue (); // If the connection has timed out, the thread task in the pendingAcquireQueue queue has failed to obtain the connection.
                If pendingacquireevent is set to real events, a large number of connections will be set in high concurrency
                // There is a risk of running out of resources
                case NEW:
                    timeoutTask = new TimeoutTask() {
                        @Override
                        public void onTimeout(AcquireTask task) {
                            // Increment the acquire count and delegate to super to actually acquire a Channel which will
                            // create a new connection.
                            AcquiredChannelCount acquiredChannelCount acquiredChannelCount +1 and acquiredChannelCount = true
                            task.acquired();
                            / / call the superclass SimpleChannelPool. Acquire has been a new connection to create
                            FixedChannelPool.super.acquire(task.promise); }};break;
                default:
                    throw newError(); }}// This executor is used to fetch connections, and the same executor is always used to fetch connections asynchronously
        executor = bootstrap.config().group().next();
        this.maxConnections = maxConnections;
        this.maxPendingAcquires = maxPendingAcquires;
    }
Copy the code

Visible, The timeouttask. onTimeout method is initialized in the FixedChannelPool constructor. When we create a FixedChannelPool, the timeouttask. onTimeout method is initialized according to the TimeoutTask policy The beginning is good, see the source notes for details.

3.3 Release (return) connection back to the connection pool source analysis

Netty FixedChannelPool = FixedChannelPool = FixedChannelPool = FixedChannelPool = FixedChannelPool = FixedChannelPool = FixedChannelPool = FixedChannelPool = FixedChannelPool Because releasing the connection calls SimpleChannelPool’s release method directly:

// SimpleChannelPool.java

@Override
    public final Future<Void> release(Channel channel) {
        Release (Final Channel Channel, final Promise
      
        Promise) from FixedChannelPool
      
        // Because FixedChannelPool overrides SimpleChannelPool's Release (Final Channel Channel, final Promise
      
        Promise) method
      
        return release(channel, channel.eventLoop().<Void>newPromise());
    }
Copy the code

SimpleChannelPool release(channel, channel.eventloop ().

newPromise()); Method, let’s take a look at the method:

// FixedChannelPool.java

 @Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        // Create a Promise
        final Promise<Void> p = executor.newPromise();
        SimpleChannelPool release(Final Channel Channel, Final Promise
      
        Promise)
      
        // [thinking] why is there such a way around?? The parent SimpleChannelPool release(Channel Channel) is called, and then the parent SimpleChannelPool release method is called
        Release (final Channel Channel, final Promise
      
        Promise) release(final Channel Channel, final Promise
       
         Promise)
       
      
        SimpleChannelPool implements only basic methods for pooling connections, releasing connections, and health checks
        PendingAcquireQueue a task in the queue? Right? So I'm going to add a FutureListener to the Promise, which is what SimpleChannelPool does
        PendingAcquireQueue calls the listener's operationComplete method to wake up a task in the pendingAcquireQueue after the release method puts the connection back into the connection pool
        super.release(channel, p.addListener(new FutureListener<Void>() {

            @Override
            public void operationComplete(Future<Void> future) throws Exception {
                assert executor.inEventLoop();
                // If the connection pool is closed, we have no choice but to close the channel and then call setFailure to rebound widely
                if (closed) {
                    // Since the pool is closed, we have no choice but to close the channel
                    channel.close();
                    promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
                    return;
                }
                // (1) If the connection is released back to the connection pool successfully
                // TODO [thinking] Which future is this? Can you find out where the future came from? Gnome male -"
                if (future.isSuccess()) {
                    AcquiredChannelCount = acquiredChannelCount = 1 and "wake up" the pendingAcquireQueue queue is a task for a pending connection
                    AcquireTask () {AcquireTask () {acquirequeue ();}}
                    decrementAndRunTaskQueue();
                    // Return to setSuccess
                    promise.setSuccess(null);
                AcquiredChannelCount acquiredChannelCount = 1
                // And "wake up" the pendingAcquireQueue queue for a task to fetch a connection
                // TODO Nani?? I could have more pools here, right? Now, why don't you decrementAndRunTaskQueue if you have an exception when you return the connection? Call setFailure directly,
                // What is the logic behind this setFailure method?
                } else {
                    Throwable cause = future.cause();
                    // Check if the exception was not because of we passed the Channel to the wrong pool.
                    if(! (causeinstanceof IllegalArgumentException)) {
                        decrementAndRunTaskQueue();
                    }
                    // Call back the setFailure methodpromise.setFailure(future.cause()); }}}));return promise;
    }
Copy the code

Release (channel, channel.eventloop ().

newPromise()); The FutureListener method is used to add a FutureListener, which is described in the comment, I’m going to go back to the SimpleChannelPool release(Final Channel Channel, Final Promise

Promise) method. Continue to look at the method source:

// SimpleChannelPool.java

public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        checkNotNull(channel, "channel");
        checkNotNull(promise, "promise");
        try {
            NioEventLoop thread (NioEventLoop thread, NioEventLoop thread, NioEventLoop thread)
            EventLoop loop = channel.eventLoop();
            // TODO [thinking] when will this be implemented? If you debug it, you're going to do the else branch
            if (loop.inEventLoop()) {
                doReleaseChannel(channel, promise);
            } else {
                / / this will release the connection operation is encapsulated into a Runnable tasks, and then add this task into SingleThreadEventExecutor taskQueue
                // Finally, the connection is released asynchronously
                loop.execute(new Runnable() {
                    @Override
                    public void run(a) { doReleaseChannel(channel, promise); }}); }}catch (Throwable cause) {
            closeAndFail(channel, cause, promise);
        }
        return promise;
    }
Copy the code

As a result, the release method of the parent SimpleChannelPool class continues to call the doReleaseChannel method to release the connection, but for lack of space, I won’t go into the more specific source here. But you can be sure that after calling the doReleaseChannel method to release the connection, you must call back the operationComplete method of FutureListener that you added earlier, and then continue calling the decrementAndRunTaskQueue method, So let’s go ahead and decrementAndRunTaskQueue source code:

// FixedChannelPool.java 

private void decrementAndRunTaskQueue(a) {
    // We should never have a negative value.
    // Since the connection was returned to the pool, it is natural to subtract 1 from the number of connections acquired
    int currentCount = acquiredChannelCount.decrementAndGet();
    assert currentCount >= 0;

    // Run the pending acquire tasks before notify the original promise so if the user would
    // try to acquire again from the ChannelFutureListener and the pendingAcquireCount is >=
    // maxPendingAcquires we may be able to run some pending tasks first and so allow to add
    // more.
    // Then "wake up" the pendingAcquireQueue queue for a task to acquire a connection to the connection pool,
    // The connection must be taken from the connection pool because it is waking up tasks that have not timed out
    runTaskQueue();
    }
Copy the code

Continue with the runTaskQueue method source:

// FixedChannelPool.java

private void runTaskQueue(a) {
    PendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue
    The pendingAcquireQueue queue is used for the rest of the connection acquisition tasks, so when a thread runs out of connections from the connection pool and returns them to the pool,
    PendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue pendingAcquireQueue
    // The task will wake up and then go to the connection pool to get the connection

    // If acquiredChannelCount is less than the number of connections available in the pool
    While (acquiredChannelCount. Get () < maxConnections)
    // Instead of creating a new connection, you wouldn't have to. So here's the question:
    // While (acquiredChannelCount () < maxConnections) no thread safety issues?? If the lock is not used, there may be a "sell more than one ticket" problem
    // There is no thread-safety problem unless there is a single thread execution.
    // If there is a thread-safety problem, when there is a large number of concurrent threads, there will be a "sell one ticket problem", that is, the pool will eventually run out of available connections, other threads that failed to get connections will still be created
    // Some connections come out. This is ok, but it violates the original intention of "the connection without timeout task can only wait for the connection of the thread pool, and the timeout task can be created by the scheduled task."
    This is because tasks fetched from the pendingAcquireQueue queue up to this point are generally untimed.
    // The answer should be single thread execution. To confirm? Basically the same thread is found when debugging
    //
    while (acquiredChannelCount.get() < maxConnections) {
        // Retrieve the first connection that has not timed out, because the timed connection has already been removed by the timed task
        AcquireTask task = pendingAcquireQueue.poll();
        // If there are no tasks waiting to get connections, just jump out
        if (task == null) {
            break;
        }
        // If there is a connection with fetch task that set the scheduled task clearing timeout, then the timeoutFuture is not Null, so you need to cancel the scheduled task execution
        // Cancel the timeout if one was scheduledScheduledFuture<? > timeoutFuture = task.timeoutFuture;if(timeoutFuture ! =null) {
            timeoutFuture.cancel(false);
        }
        / / pendingAcquireCount minus 1
        --pendingAcquireCount;
        / / acquiredChannelCount plus 1
        task.acquired();
        // Call the acquire method of the parent SimpleChannelPool:
        // 1) There are available connections in the connection pool.
        // 2) There is no connection available in the pool
        super.acquire(task.promise);
    }

    // We should never have a negative value.
    assert pendingAcquireCount >= 0;
    assert acquiredChannelCount.get() >= 0;
}
Copy the code

As you can see, the thread that acquired the connection continues to wake up some after it puts the connection back into the poolpendingAcquireQueueQueue tasks to get connections that have not timed out. Let’s continue with a flow chart to summarize the process of releasing the connection:

3.4 Process for Obtaining and releasing Netty Connection Pools

Again, summarize with a flow chartNettyConnection pool access and release process:Here is no longer the text cumbersome summary, more detailedNettySource code comments can be seen on my Github website:Github.com/yuanmabiji/…

The Netty connection pool access and release process is as follows:

Guess 2: The channel connection that requests the background timeout cannot be returned to the connection pool properly. After the request times out, the connection cannot be returned to the connection pool properly. As a result, the available connections in the channel connection pool are exhausted and other threads obtain the connection timeout from the connection pool. If this is the case, then why are connections that request a background timeout not properly put back into the connection pool?

A channel connection that requests a timeout in the background can be released back into the pool normally, and the connection returned is healthy and usable.

Speculation 3: The request background timeout channel connection can be returned to the connection pool normally, because the channel connection from the connection pool is asynchronous, when the connection timeout, we care about the asynchronous thread can not successfully obtain a connection from the connection pool. There are two possibilities: 1) A connection cannot be fetched from the pool after a connection timeout, even if the previous code implemented a connection timeout does not matter if the connection is not released, because there is no connection at all; 1) A connection can still be successfully obtained after a connection timeout, but we can know from the previous implementation code analysis process, the connection is not released, if this is the case, then the connection pool resources will be exhausted, resulting in service unavailable!

Guess the answer: get connection timeout after can still successfully obtain a connection, but from the front in the process of the implementation code analysis can know, access to connection timeout, the access to the connection is not released, if this is the case, then leads to the connection pool resources exhausted resulting in service is not available!

4 Obtain the Bug that connection pool resources are exhausted due to connection timeout

The Netty connection pool obtains and releases connections from the Netty connection pool, and the reason for the “ghost” Bug has been revealed. Let’s get straight to the answer here, but now let’s reply to our question code:

// CustomChannelPool.java


public class CustomChannelPool {
  
  private ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
  AcquireTimeoutAction acquireTimeoutAction ¶
  acquireTimeoutAction = null;
  AcquireTimeoutAction = null and acquireTimeoutMillis = -1
  acquireTimeoutMillis = -1;
  // The connection pool capacity is 8
  maxConnect = 8;
  // axPendingAcquires The capacity is integer.max_value
  maxPendingAcquires = Integer.MAX_VALUE
  releaseHealthCheck=true
  / /... Omit irrelevant attribute
  // 【 important, problem code 】
  static ChannelPool fixpool = 
  	new FixedChannelPool(b, handler, healthCheck, acquireTimeoutAction, 
    acquireTimeoutMillis, maxConnect, maxPendingAcquires, releaseHealthCheck, lastRecentUsed); / / [0]
	
  // Get the connection
  public Channel acquire(int timeoutMillis) throws Exception {
        try {
            Future<Channel> fch = fixpool.acquire(); / / 【 1 】
            // 【 important, problem code 】
            Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);/ / [2]
            return ch;
        } catch (Exception e) {
            logger.error("Exception accurred when acquire channel from channel pool.", e);/ / [3]
            throw e; / / [4]}}// Release the connection
    public void release(Channel channel) {
		try {
			if(channel ! =null) { fixpool.release(channel); }}catch(Exception e) { e.printStackTrace(); }}}Copy the code

The reason for the “ghost” Bug is that after the connection acquisition task timed out, there is still an asynchronous thread performing the operation of obtaining a connection from the connection pool. After the connection is removed, it cannot be returned to the business thread normally, because the business thread is abnormal due to the connection acquisition timeout. Because normally, the operation of releasing the connection is triggered by the business thread. When the connection timeout task has fetched all available connections from the connection pool, the service becomes unavailable. AcquireTimeoutAction is not implemented because static ChannelPool fixPool = new is used to construct a FixedChannelPool pool FixedChannelPool(b, handler, healthCheck, acquireTimeoutAction, acquireTimeoutMillis, maxConnect, maxPendingAcquires, releaseHealthCheck, lastRecentUsed); The cquireTimeoutAction parameter is null and acquireTimeoutMillis -1. This means that after the fetch task has timed out, no scheduled task will fetch the timed fetch task from the pendingAcquireQueue queue and return it to the business thread! PendingAcquireQueue queue has more and more connection tasks to be obtained as the connection pool resources are exhausted and the backlog of requests increases. When the over-squeezed tasks exceed the capacity of the pendingAcquireQueue queue, PendingAcquireQueue queue size is integ.max_value, so there is an OOM risk, but the risk should be very small.

Channel ch = fch.get(timeoutMillis, timeunit.milliseconds); AcquireTimeoutAction this code returns prematurely because of timeout, so it will not work even if we implement the connection timeout policy acquireTimeoutAction. AcquireTimeoutMillis If the timeout duration of FCH. Get is smaller than the timeout duration of the connection acquireTimeoutMillis task, the scheduled task that processes the connection timeout task cannot return the connection to the business thread.

5 Fix the connection pool resource exhaustion Bug caused by connection timeout

Believe that after the previous analysis, so how to fix the connection timeout exception caused by connection pool resource depletion Bug? Now that you have the answer in mind, let’s go straight to the fixed code:

// CustomChannelPool.java


public class CustomChannelPool {
  
  private ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
  New policy: Create a new connection when a connection timeout is detected
  acquireTimeoutAction = AcquireTimeoutAction.NEW;
  // [fix] Timeout is set to 10 seconds
  acquireTimeoutMillis = -1;
  // [fix] The connection pool capacity is set to 100, 8 is too small
  maxConnect = 100
  MxPendingAcquires Capacity changed from Integer.MAX_VALUE to 100000 to avoid oom risk
  maxPendingAcquires = 100000
  releaseHealthCheck=true
  / /... Omit irrelevant attribute
  static ChannelPool fixpool = 
  	new FixedChannelPool(b, handler, healthCheck, acquireTimeoutAction, 
    acquireTimeoutMillis, maxConnect, maxPendingAcquires, releaseHealthCheck, lastRecentUsed); / / [0]
	
  // Get the connection
  public Channel acquire(int timeoutMillis) throws Exception {
        try {
            Future<Channel> fch = fixpool.acquire(); / / 【 1 】
            // The connection pool will run out of available connections and the service will become unavailable. Note: n is the number of connection pools
			// Solution: You need to implement the AcquireTimeoutAction NEW or FAIL policy because the code you implemented creates the FixedChannelPool connection pool and AcquireTimeoutAction passes NULL
			// Channel ch = fch.get(timeoutMillis, TimeUnit.MILLISECONDS);
			AcquireTimeoutAction (AcquireTimeoutAction) ¶
			Channel ch = fch.get();
            return ch;
        } catch (Exception e) {
            logger.error("Exception accurred when acquire channel from channel pool.", e);/ / [3]
            throw e; / / [4]}}// Release the connection
    public void release(Channel channel) {
		try {
			if(channel ! =null) { fixpool.release(channel); }}catch(Exception e) { e.printStackTrace(); }}}Copy the code

6 summarizes

It’s not easy to fix a Bug, so keep going.