Generally speaking, the client and server only need to establish one connection during network communication, but in some scenarios, we need to establish multiple connections. For example, if load balancing is used, there may be load imbalance if only one connection is established. Sometimes, connection pools need to be established to increase client throughput.

The biggest difficulty in creating a connection pool is how to ensure that the specified number of connections can be created under the condition of high concurrency, and how to do a good job in the management of the connection pool, for example, there are no available connections in the connection pool. How to add a new connection to the connection pool after a connection has been faked. Netty provides us with two connection pools for these functions. SimpleChannelPool encapsulates the basic functions of connection pooling, but it cannot specify the number of connections to the pool and therefore cannot be used in production. FixedChannelPool is a more powerful connection pool that extends SimpleChannelPool to be used in production.

Netty connection pool the simplest use posture

public class ClientMock {
    private static SimpleChannelPoolMap poolMap;

    public static void main(String[] args) {
        NioEventLoopGroup group = new NioEventLoopGroup(1, new DefaultThreadFactory("Client-Event".false)); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, Boolean.TRUE) .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); poolMap = new SimpleChannelPoolMap(bootstrap); SimpleChannelPool channelPool = poolMap.get(new InetSocketAddress(8090)); Channelpool.acquire ().addListener(new FutureListener<Channel>() {@override public void operationComplete(Future<Channel> future) throws Exception {if (future.isSuccess()) {
                    Channel channel = future.getNow();
                    channel.writeAndFlush(Unpooled.copiedBuffer("hello", CharsetUtil.UTF_8)); Channelpool.release (channel); }if(future.cause() ! = null) { System.out.println(future.cause()); }}}); }}Copy the code
public class SimpleChannelPoolMap extends AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool> {
    private Bootstrap bootstrap;
    SimpleHandler simpleHandler = new SimpleHandler();

    public SimpleChannelPoolMap(Bootstrap bootstrap) {
        this.bootstrap = bootstrap;
    }

    @Override
    protected SimpleChannelPool newPool(InetSocketAddress key) {
        return new SimpleChannelPool(bootstrap.remoteAddress(key), new ChannelPoolHandler() {
            @Override
            public void channelReleased(Channel ch) throws Exception {
                System.out.println("channelReleased: " + ch);
            }

            @Override
            public void channelAcquired(Channel ch) throws Exception {
                System.out.println("channelAcquired: "+ ch); } @override public void channelCreated(Channel CH) throws Exception {// Adds a handler for a Channel ch.pipeline().addLast(simpleHandler); }}); }}Copy the code

It is easy to build a Netty connection pool.

SimpleChannelPool

Question 1: How to ensure that only one connection pool is created for a key?

AbstractChannelPoolMap Uses ConcurrentHashMap putIfAbsent to ensure that only one connection pool is created.

private final ConcurrentMap<K, P> map = PlatformDependent.newConcurrentHashMap();

@Override
    public final P get(K key) {
        P pool = map.get(checkNotNull(key, "key"));
        if(pool == null) {// Create a connection pool pool = newPool(key); P old = map.putifAbsent (key, pool); P old = map.putifAbsent (key, pool);if (old != null) {
                // We need to destroy the newly created pool as we not use it.
                poolCloseAsyncIfSupported(pool);
                pool = old;
            }
        }
        return pool;
    }
Copy the code

Question 2: How to create a connection?

SimpleChannelPool#acquire()

public class SimpleChannelPool implements ChannelPool {
    private static final AttributeKey<SimpleChannelPool> POOL_KEY =
        AttributeKey.newInstance("io.netty.channel.pool.SimpleChannelPool");
    private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
    
    public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck,
                             boolean releaseHealthCheck, boolean lastRecentUsed) {
        this.handler = checkNotNull(handler, "handler");
        this.healthCheck = checkNotNull(healthCheck, "healthCheck");
        this.releaseHealthCheck = releaseHealthCheck;
        // Clone the original Bootstrap as we want to set our own handler
        this.bootstrap = checkNotNull(bootstrap, "bootstrap").clone(); this.bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { assert ch.eventLoop().inEventLoop(); handler.channelCreated(ch); }}); this.lastRecentUsed = lastRecentUsed; } @Override public final Future<Channel>acquire() {
        return acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
    }    
Copy the code
  • The data structure used by connection pooling to hold connections is a thread-safe, two-ended queue.
  • releaseHealthCheckIndicates whether a health check is performed on a connection when it is acquired or released.
  • lastRecentUsedIf true, the connection is fetched from the tail of the queue. If false, get from the queue header. FIFO is recommended, otherwise you may end up getting a connection all the time.

The ChannelPoolHandler#channelCreated method is called to initialize a channel when the pool is created.

The assertion here is really quite detailed. When the initChannel method is called, the EventLoop for the channel is already initialized, so an assertion is made.

Next, take a look at the real acquire connection method

    @Override
    public final Future<Channel> acquire() {// Get the thread selector and create a Promisereturn acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        return acquireHealthyFromPoolOrNew(checkNotNull(promise, "promise"));
    }

    private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
        try {
            final Channel ch = pollChannel();
            if (ch == null) {
                // No Channel left inthe pool bootstrap a new Channel Bootstrap bs = bootstrap.clone(); bs.attr(POOL_KEY, this); ChannelFuture f = connectChannel(bs);if (f.isDone()) {
                    notifyConnect(f, promise);
                } else {
                    f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { notifyConnect(future, promise); }}); }return promise;
            }
            EventLoop loop = ch.eventLoop();
            if (loop.inEventLoop()) {
                doHealthCheck(ch, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doHealthCheck(ch, promise); }}); } } catch (Throwable cause) { promise.tryFailure(cause); }return promise;
    }
Copy the code

The method of getting connections is completely asynchronous programming, which is really hard to understand if you don’t understand Netty source code and the principle of EevntLoop.

public void promiseTest() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        EventExecutor executorA = new DefaultEventExecutor(new DefaultThreadFactory("EventA")); EventExecutor executorB = new DefaultEventExecutor(); Channel channel = new NioSocketChannel(); ExecutorA.<Channel>newPromise(); // Register a Promise for EventLoop. System.out.println(Thread.currentThread().getName()); newPromise.addListener(f -> {if(f.isSuccess()) { Assert.assertEquals(channel, f.getNow()); System.out.println(Thread.currentThread().getName()); latch.countDown(); }}); Assert.assertEquals(false, executorB.inEventLoop());
        executorB.execute(new Runnable() {
            @Override
            public void run() { newPromise.setSuccess(channel); }}); latch.await(); }Copy the code

This code can help you understand Netty’s asynchronous programming style. Then we look at the acquire method in SimpleChannelPool.

An EventLoop is first obtained through the EventLoop thread selector of Netty client BootStrap and a Promise is created. The EventLoop will then be used to connect to the Channel and notify the Promise when the Channel succeeds or fails.

Then determine whether to use LIFO or FIFO to get a connection based on the value of lastRecentUsed.

protected Channel pollChannel() {
        return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
    }
Copy the code

Then, if no connection is obtained, the method to establish the connection is executed, and if a connection is obtained from the connection pool, the connection is checked for health.

 ChannelFuture f = connectChannel(bs);
Copy the code

ConnectChannel is the method that actually establishes the connection. This method uses the same logic as calling BootStrap#connect(), creating a channel and binding an EventLoop to the channel. And submit the connection to the taskQueue in EventLoop.

Private void notifyConnect(ChannelFuture Future, Promise<Channel> Promise) throws Exception {// The execution is successfulif(future.isSuccess()) { Channel channel = future.channel(); handler.channelAcquired(channel); // Write back the resultif(! promise.trySuccess(channel)) { // Promise was completedinthe meantime (like cancelled), just release the channel again release(channel); }}else{ promise.tryFailure(future.cause()); }}Copy the code

If the connection is successfully established, the channel is acquired through the future, and the ChannelPoolHandler#channelAcquired method is executed, and the trySuccess method of the Promise is called to try to set the channel to the Promise result.

⚠️ one important piece of information here is that the acquire method does not put the connection into the connection pool after establishing the connection and writing the result to the promise. Instead, the promise fails and the connection is put into the connection pool.

If pollChannel() gets a connection that is not empty, the connection is checked for health.

If the connection is active, execute ChannelPoolHandler#channelAcquired and write the channel to the promise.

If the connection is inactive, it is closed. And re-execute acquireHealthyFromPoolOrNew method, from the connection pool to obtain a new connection,

SimpleChannelPool#relesae()

    @Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        checkNotNull(channel, "channel");
        checkNotNull(promise, "promise"); try { EventLoop loop = channel.eventLoop(); // Check if it is an Eventloop threadif (loop.inEventLoop()) {
                doReleaseChannel(channel, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doReleaseChannel(channel, promise); }}); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); }return promise;
    }

    private void doReleaseChannel(Channel channel, Promise<Void> promise) {
        assert channel.eventLoop().inEventLoop();
        // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
        if(channel.attr(POOL_KEY).getAndSet(null) ! = this) { closeAndFail(channel, // Better include a stacktrace here as this is an user error. new IllegalArgumentException("Channel " + channel + " was not acquired from this ChannelPool"),
                         promise);
        } else {
            try {
                if (releaseHealthCheck) {
                    doHealthCheckOnRelease(channel, promise);
                } else {
                    releaseAndOffer(channel, promise);
                }
            } catch (Throwable cause) {
                closeAndFail(channel, cause, promise);
            }
        }
    }

    private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
        final Future<Boolean> f = healthCheck.isHealthy(channel);
        if (f.isDone()) {
            releaseAndOfferIfHealthy(channel, promise, f);
        } else{ f.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { releaseAndOfferIfHealthy(channel, promise, f); }}); }}Copy the code

The release method determines whether the current thread is an EventLoop thread for the channel, and if not, submits the task to the EventLoop thread for execution.

ReleaseHealthCheck determines whether a connection is checked for health when it is added to the connection pool.

  • Health check on: if the connection is active, place the connection into the pool. If it is successful, execute the ChannelPoolHandler#channelRelease method, otherwise the connection is closed and Promise is notified of the result. If the connection is not active, just execute ChannelPoolHandler#channelRelease and notify Promise of the result.
  • Health check Close: Put the connection directly into the connection pool, and close the connection if it fails.

SimpleChannelPool implements the basic functionality of connection pooling, but it does not support limiting the number of connections in a connection pool, so in production we need to use FixedChannelPool

FixedChannelPool

public FixedChannelPool(Bootstrap bootstrap,
                            ChannelPoolHandler handler,
                            ChannelHealthChecker healthCheck, AcquireTimeoutAction action,
                            final long acquireTimeoutMillis,
                            int maxConnections, int maxPendingAcquires,
                            boolean releaseHealthCheck, boolean lastRecentUsed) {
        super(bootstrap, handler, healthCheck, releaseHealthCheck, lastRecentUsed);
        if (maxConnections < 1) {
            throw new IllegalArgumentException("maxConnections: " + maxConnections + " (expected: >= 1)");
        }
        if (maxPendingAcquires < 1) {
            throw new IllegalArgumentException("maxPendingAcquires: " + maxPendingAcquires + " (expected: >= 1)");
        }
        if (action == null && acquireTimeoutMillis == -1) {
            timeoutTask = null;
            acquireTimeoutNanos = -1;
        } else if(action == null && acquireTimeoutMillis ! = -1) { throw new NullPointerException("action");
        } else if(action ! = null && acquireTimeoutMillis < 0) { throw new IllegalArgumentException("acquireTimeoutMillis: " + acquireTimeoutMillis + " (expected: >= 0)");
        } else {
            acquireTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(acquireTimeoutMillis);
            switch (action) {
            case FAIL:
                timeoutTask = new TimeoutTask() {
                    @Override
                    public void onTimeout(AcquireTask task) {
                        // Fail the promise as we timed out.
                        task.promise.setFailure(new TimeoutException(
                                "Acquire operation took longer then configured maximum time") {
                            @Override
                            public Throwable fillInStackTrace() {
                                returnthis; }}); }};break;
            case NEW:
                timeoutTask = new TimeoutTask() {
                    @Override
                    public void onTimeout(AcquireTask task) {
                        // Increment the acquire count and delegate to super to actually acquire a Channel whichwill // create a new connection. task.acquired(); FixedChannelPool.super.acquire(task.promise); }};break;
            default:
                throw new Error();
            }
        }
        executor = bootstrap.config().group().next();
        this.maxConnections = maxConnections;
        this.maxPendingAcquires = maxPendingAcquires;
    }
Copy the code
  • maxConnectionsMaximum number of connections in the connection pool.
  • acquireTimeoutNanosMaximum time to wait for a connection pool connection, in milliseconds.
  • maxPendingAcquiresCreates the maximum number of scheduled tasks waiting to establish connections when the number of requests for obtaining/establishing connections is greater than maxConnections. For example, if maxConnections=2, 2 connections have been established but not put into the connection pool, subsequent requests are put into a scheduled task that executes in the background. If there is no connection in the connection pool by the time, the connection can be establishedmaxPendingAcquiresIf there are any connections in the connection pool.
  • executorAn EventLoop is used to execute the get and release connection.
  • TimeoutTask.FAIL: If no connections are available in the pool, waitacquireTimeoutNanosThrows a timeout exception.
  • TimeoutTask.NEW: If no connections are available in the pool, waitacquireTimeoutNanosAfter, create a new connection.

FixedChannelPool#acquire

Override public Future<Channel> acquire(final Promise<Channel> Promise) {if (executor.inEventLoop()) {
                acquire0(promise);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() { acquire0(promise); }}); } } catch (Throwable cause) { promise.setFailure(cause); }return promise;
    }
    
    private void acquire0(final Promise<Channel> promise) {
        assert executor.inEventLoop();

        if (closed) {
            promise.setFailure(new IllegalStateException("FixedChannelPool was closed"));
            return;
        }
        if (acquiredChannelCount.get() < maxConnections) {
            assert acquiredChannelCount.get() >= 0;

            // We need to create a new promise as we need to ensure the AcquireListener runs inThe correct // EventLoop // create a newPromise Promise<Channel> p = executor.newpromise (); AcquireListener l = new AcquireListener(promise); l.acquired(); p.addListener(l); super.acquire(p); }else {
            if (pendingAcquireCount >= maxPendingAcquires) {
                tooManyOutstanding(promise);
            } else {
                AcquireTask task = new AcquireTask(promise);
                if (pendingAcquireQueue.offer(task)) {
                    ++pendingAcquireCount;

                    if (timeoutTask != null) {
                        task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
                    }
                } else{ tooManyOutstanding(promise); } } assert pendingAcquireCount > 0; }}Copy the code

FixedChannelPool overwrites SimpleChannelPool acquire(Final Promise

Promise) and passes all connection acquire0 to an EventLoop. AcquiredChannelCountd < maxConnections security in case of high concurrency instead of simply echannelPool using thread selector for each connection acquiredChannelCountd < maxConnections The expected number of connections can be created. So in acquire0 you need to create a new Promise for EventLoop in FixedChannelPool, The SimpleChannelPool acquire(Final Promise

Promise) method is then called to establish a new connection or to obtain one from the connection pool.

AcquiredChannelCount is incremented by 1 each time a connection is created and not placed in the connection pool to ensure that maxConnections are not exceeded. AcquiredChannelCountd > maxConnections FixedChannelPool will tell you whether to initiate a timed task to establish a new connection based on the pendingacquielevents value.

private abstract class TimeoutTask implements Runnable {              
    @Override                                                         
    public final void run() {                                         
        assert executor.inEventLoop();                                
        long nanoTime = System.nanoTime();                            
        for(;;) { AcquireTask task = pendingAcquireQueue.peek(); // Check whether the execution time is upif (task == null || nanoTime - task.expireNanoTime < 0) { 
                break;                                                
            }                                                         
            pendingAcquireQueue.remove();                             
                                                                      
            --pendingAcquireCount;                                    
            onTimeout(task);                                          
        }                                                             
    }                                                                 
                                                                      
    public abstract void onTimeout(AcquireTask task);                 
}                                                                     
                                                                      
Copy the code

FixedChannelPool#release

Also use EventLoop in FixedChannelPool to ensure thread safety in high concurrency when releasing a connection. DecrementAndRunTaskQueue () is implemented when releasing a connection, and decrementAndRunTaskQueue() tries to terminate the scheduled task on success. Return the connection to the Promise from the connection pool.

private void decrementAndRunTaskQueue() {
        // We should never have a negative value.
        int currentCount = acquiredChannelCount.decrementAndGet();
        assert currentCount >= 0;
        runTaskQueue();
    }
    
private void runTaskQueue() {
        while (acquiredChannelCount.get() < maxConnections) {
            AcquireTask task = pendingAcquireQueue.poll();
            if (task == null) {
                break;
            }

            // Cancel the timeout ifone was scheduled ScheduledFuture<? > timeoutFuture = task.timeoutFuture;if(timeoutFuture ! = null) { timeoutFuture.cancel(false);
            }

            --pendingAcquireCount;
            task.acquired();

            super.acquire(task.promise);
        }

        // We should never have a negative value.
        assert pendingAcquireCount >= 0;
        assert acquiredChannelCount.get() >= 0;
    }    
    
    
Copy the code

The most important thing is to understand how Netty connection pools are used and how they work, and how Netty connection pools solve the problem of creating a specified number of connections with high concurrency. In the next article, I’ll look at the differences between Sofa- Bolt’s connection pooling implementation and Netty’s.