Connection pooling’s mission!

Both thread pools and DB connection pools have one common feature: resource reuse. In a common scenario, we use a connection, and its lifecycle might look like this:

What does connection pooling need to do?

As the name suggests, the word “pool” in connection pool is very vivid. It controls the creation and destruction of connections uniformly by putting all connections into a “pool”. In contrast to the original lifetime, connection pools have the following features:

  • Create is not actually create, but select a free connection from the pool.
  • A destroy is not an actual destroy, but rather puts the connection in use back into the pool (logical close).
  • The actual creation and destruction is determined by the thread pool’s characteristic mechanism.

Therefore, when using connection pooling, our lifetime of using a connection will look like this:

Analysis of plan

Psychic – Portal: github.com/ainilili/ho… , DEMO for Java language implementation!

In advance, we need to light a cigarette to analyze the time and what a connection pool needs to do:

  • A container to hold connections is essential, which also supports adding and removing connections and is thread-safe.
  • We need to because to make logical adjustments to the destruction of the connection, we need to rewrite itcloseAs well asisClosedMethods.
  • We need a way to manage the pool of connections, such as reclaim free connections.

Connection pooling not only controls the Connection life cycle, but also includes features such as initial number of connections, maximum number of connections, minimum number of connections, maximum idle time, and wait time to get a Connection, which we also briefly support.

Target to clear, start work.

Connection pool container selection

To ensure thread safety, we can be targeted in JUC package under the avatar, we want the container to x, then x need not only meet the basic add and delete function, but also to provide for a timeout function, this is to ensure that when there is no free connection pool for a long time will not lead to a business block, fuse immediately. In addition, X needs to satisfy bidirectional operation, so that the connection pool can identify saturated idle connections and facilitate recycling operations.

Above all, LinkedBlockingDeque is the most appropriate choice, it USES InterruptibleReentrantLock to ensure thread safety, use Condition to do get block elements, in addition to support two-way operation.

In addition, we can split the connection pool into three types:

  • Workpool: Stores the connections that are being used.
  • Free pool: Stores free connections.
  • Reclaim pool: Connections that have been reclaimed (physically closed).

It is not necessary for the workpool and the reclamation pool to have bidirectional pairs, perhaps using a unidirectional queue or Set instead:

private LinkedBlockingQueue<HoneycombConnection> workQueue;
private LinkedBlockingDeque<HoneycombConnection> idleQueue;
private LinkedBlockingQueue<HoneycombConnection> freezeQueue;
Copy the code

The adornment of the Connection

The output of the Connection pool is Connection, which represents a DB Connection. After the upstream service uses it, it calls its close method to release the Connection. What we have to do is change its close logic without the caller being aware of it. Make it reusable!

So we need to decorate the Connection, which is very simple, but very tiring. Here we create a new class to implement the Connection interface, overriding all methods to implement a **” editable “** Connection, which we call a Connection decorator:

public class HoneycombConnectionDecorator implements Connection{

    protected Connection connection;
    
    protected HoneycombConnectionDecorator(Connection connection) {
        this.connection = connection; } Omit the 300 lines of code that implement the method... }Copy the code

After that, we need to create a new Connection of our own that inherits the decorator and overwrites the corresponding method:

public class HoneycombConnection extends HoneycombConnectionDecorator implements HoneycombConnectionSwitcher{
    @Override
    public void close(a) { do some things }

    @Override
    public boolean isClosed(a) throws SQLException { doSome things} }Copy the code

The DataSource rewriting

A DataSource is a specification defined by the JDK for better integration and management of data sources. To make our connection pool more functional, we need to implement a DataSource that can:

public class HoneycombWrapperDatasource implements DataSource{
    protectedHoneycombDatasourceConfig config; Omit the implementation of other methods...@Override
    public Connection getConnection(a) throws SQLException {
        return DriverManager.getConnection(config.getUrl(), config.getUser(), config.getPassword());
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        returnDriverManager.getConnection(config.getUrl(), username, password); } omit implementations of other methods... }Copy the code

We completed the implementation of data source, but the way here for connection is physical creation, we need to meet the goal of pooling, needs to be rewritten HoneycombWrapperDatasource acquisition logic, the connection approach is to create a new class of the parent class method to rewrite:

public class HoneycombDataSource extends HoneycombWrapperDatasource{
    private HoneycombConnectionPool pool;
    @Override
    public Connection getConnection(a) throws SQLException Select * from pool;} }Copy the code

Feature extension

Under the current structure system, our connection pool emerging out of the prototype, but it is not enough, we need to can do the expansion of freedom in this structure, which controls the connection pool to connect more flexible, so we can introduce characteristics of this concept, it allows us to in its internal access to the connection pool, and the expansion of the connection pool to do a series of operations:

public abstract class AbstractFeature{
    public abstract void doing(HoneycombConnectionPool pool);
}
Copy the code

An AbstractFeature superclass needs to implement an doing method. We can implement control over a pool of connections within a method. A typical example is to recycle a pool of idle connections:

public class CleanerFeature extends AbstractFeature{
    @Override
    public void doing(HoneycombConnectionPool pool) }}Copy the code

To carry out the plan

After the above analysis, to complete a connection pool, the cooperation of these modules is required. The overall process is as follows:

Step 1: Set the data source properties

Before initializing the DataSource, we need to set the various properties, used here in HoneycombWrapperDatasource HoneycombDatasourceConfig to carry all the attributes:

public class HoneycombDatasourceConfig {

    //db url
    private String url;

    //db user
    private String user;

    //db password
    private String password;

    / / driver drive
    private String driver;

    // Initialize the number of connections. The default is 2
    private int initialPoolSize = 2;

    // Maximum number of connections. Default is 10
    private int maxPoolSize = 10;

    // Minimum number of connections. Default is 2
    private int minPoolSize = 2;
    
    // The maximum waiting duration is 60s by default
    private long maxWaitTime = 60 * 1000;

    // The maximum idle duration, which is 20 seconds by default
    private long maxIdleTime = 20 * 1000;
    
    // Feature list
    private List<AbstractFeature> features;
    
    public HoneycombDatasourceConfig(a) {
        features = new ArrayList<AbstractFeature>(5); } omit getter and setter....Copy the code

Step 2: Initialize the connection pool

After setting the properties, we need to initialize the connection pool in the init method of the HoneycombDataSource:

private void init(a) throws ClassNotFoundException, SQLException {
    // Block other thread initialization operations until initialization is complete
    if(initialStarted || ! (initialStarted = ! initialStarted)) {
        if(! initialFinished) {
            try {
                INITIAL_LOCK.lock();
                INITIAL_CONDITION.await();
            } catch (InterruptedException e) {
            } finally{ INITIAL_LOCK.unlock(); }}return;
    }
    
    // Config parameter verification
    config.assertSelf();
    
    Class.forName(getDriver());
    
    // Instantiate the thread pool
    pool = new HoneycombConnectionPool(config);
    
    // Initialize the minimum connection
    Integer index = null;
    for(int i = 0; i < config.getInitialPoolSize(); i ++) {
        if((index = pool.applyIndex()) ! =null) { pool.putLeisureConnection(createNativeConnection(pool), index); }}// Trigger feature
    pool.touchFeatures();
    
    // Complete initialization and wake up other blocks
    initialFinished = true;
    try {
        INITIAL_LOCK.lock();
        INITIAL_CONDITION.signalAll();
    }catch(Exception e) {
    }finally{ INITIAL_LOCK.unlock(); }}Copy the code

Step 3: Create an initial connection

In the init method, if initialPoolSize is greater than 0, a specified number of physical connections will be created into the connection pool, which is smaller than maxPoolSize:

public HoneycombConnection createNativeConnection(HoneycombConnectionPool pool) throws SQLException {
    return new HoneycombConnection(super.getConnection(), pool);
}
Copy the code

Once initialization is complete, the next step is to get the connection.

Step 4: Obtain it from the free pool

We previously divided the connection pool into three groups: the free pool, the working pool, and the reclaim pool.

We can obtain a connection through the HoneycombDataSource getConnection method. When we need to obtain a connection, we first consider whether there are free connections in the free pool. This avoids creating and activating new connections:

@Override
public Connection getConnection(a) throws SQLException {
    try {
    	// Initialize the connection pool
        init();
    } catch (ClassNotFoundException e) {
        throw new RuntimeException(e);
    }
    
    HoneycombConnection cn = null;
    Integer index = null;
    
    if(pool.assignable()) {
    	// The free pool can be allocated and is fetched from the free pool
        cn = pool.getIdleConnection();
    }else if(pool.actionable()) {
    	// The reclamation pool can be allocated and taken from the reclamation pool
        cn = pool.getFreezeConnection();
    }else if((index = pool.applyIndex()) ! =null) {
    	// If not, create a new physical connection
        cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
    }
    
    if(cn == null) {
    	// If the connection cannot be obtained, block and wait for the free pool to connect
        cn = pool.getIdleConnection();
    }
    
    if(cn.isClosedActive()) {
    	// If the physical connection is closed, a new connection is obtained
        cn.setConnection(super.getConnection());
    }
    return cn;
}
Copy the code

Step 5: Obtain it from the reclamation pool

If the free pool is not allocated, then the supply of connections is in short supply. Perhaps some of the free connections have been reclaimed (physically closed). Before creating a new connection, we can check to see if there are any reclaimed connections in the reclamation pool.

else if(pool.actionable()) {
	// The reclamation pool can be allocated and taken from the reclamation pool
    cn = pool.getFreezeConnection();
}
Copy the code

Step 6: Create a new connection

If the reclamation pool cannot be allocated, check whether the number of connections in the connection pool has reached the maximum. If not, create new physical connections and directly add them to the working pool.

else if((index = pool.applyIndex()) ! =null) {
	// If the number of connections is not sufficient, create a new physical connection and add it to the working pool
    cn = pool.putOccupiedConnection(createNativeConnection(pool), index);
}
Copy the code

Step 7: Wait for the free pool to connect

If none of the above three conditions is met, then you have to wait for other connections to be released from the free pool:

if(cn == null) {
	// If the connection cannot be obtained, block and wait for the free pool to connect
    cn = pool.getIdleConnection();
}
Copy the code

The specific logic is encapsulated in the HoneycombConnectionPool getIdleConnection method:

public HoneycombConnection getIdleConnection(a) {
    try {
    	// Get the maximum wait time
        long waitTime = config.getMaxWaitTime();
        while(waitTime > 0) {
            long beginPollNanoTime = System.nanoTime();
            
            // Set timeout and block until other connections are released
            HoneycombConnection nc = idleQueue.poll(waitTime, TimeUnit.MILLISECONDS);
            if(nc ! =null) {
            	// State transition
                if(nc.isClosed() && nc.switchOccupied() && working(nc)) {
                    returnnc; }}long timeConsuming = (System.nanoTime() - beginPollNanoTime) / (1000 * 1000);
            
            // If the connection is obtained within the timeout period, but the state transition fails, the timeout period is refreshedwaitTime -= timeConsuming; }}catch (Exception e) {
        e.printStackTrace();
    }finally{}throw new RuntimeException("Get connection timeout");
}
Copy the code

Step 8: Activate the connection

Finally, determine if the connection has been physically closed. If so, we need to open a new connection to replace the one that has been reclaimed:

if(cn.isClosedActive()) {
	// If the physical connection is closed, a new connection is obtained
    cn.setConnection(super.getConnection());
}
Copy the code

Recovery of connection

If our business volume increases sharply in a certain period of time, there will be many connections that need to work at the same time. After a while, our business volume decreases, and the previously created connections are obviously saturated, we need to reclaim them. We can operate the connection pool through AbstractFeature entry.

For recycling, we implement the CleanerFeature:

public class CleanerFeature extends AbstractFeature{

    private Logger logger = LoggerFactory.getLogger(CleanerFeature.class);

    public CleanerFeature(boolean enable, long interval) {
       //enable Indicates whether to enable the function
       //interval Indicates the scanning interval
       super(enable, interval);
    }

    @Override
    public void doing(HoneycombConnectionPool pool) {
        LinkedBlockingDeque<HoneycombConnection> idleQueue = pool.getIdleQueue();
        Thread t = new Thread() {
            @Override
            public void run(a) {
                while(true) {
                    try {
                        // Recycle the scan interval
                    	Thread.sleep(interval);
                        
                    	// The free pool is locked during reclamation
                        synchronized (idleQueue) {
                            logger.debug("Cleaner Model To Start {}", idleQueue.size());
                            // Recycle operation
                            idleQueue.stream().filter(c -> { return c.idleTime() > pool.getConfig().getMaxIdleTime(); }).forEach(c -> {
                                try {
                                    if(! c.isClosedActive() && c.idle()) { c.closeActive(); pool.freeze(c); }}catch(SQLException e) { e.printStackTrace(); }}); logger.debug("Cleaner Model To Finished {}", idleQueue.size()); }}catch(Throwable e) {
                        logger.error("Cleaner happended error", e); }}}}; t.setDaemon(true); t.start(); }}Copy the code

The operation is very simple, to the free pool lock, scan all connections, release the leisure time more than the largest leisure time set connection, actually here want to know the current connection of free time to be clear at a glance, we were connected into the free pool to refresh his free time points, then the current free length is equal to the current time minus the free start time:

idleTime = nowTime - idleStartTime
Copy the code

Refresh idle start time when switching to idle:

 @Override
public boolean switchIdle(a) {
    return unsafe.compareAndSwapObject(this, statusOffset, status, ConnectionStatus.IDLE) && flushIdleStartTime();
}
Copy the code

Test the

The fastest way to experience results is to use them. Here’s a unit test:

static ThreadPoolExecutor tpe = new ThreadPoolExecutor(1000.1000.0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    
@Test
public void testConcurrence(a) throws SQLException, InterruptedException{
    long start = System.currentTimeMillis();
    HoneycombDataSource dataSource = new HoneycombDataSource();
    dataSource.setUrl("jdbc:mysql://localhost:3306/test? useUnicode=true&characterEncoding=UTF-8&useSSL=false&transformedBitIsBoolean=true&zeroDateTimeBehavior=CONVERT_TO_NULL&s erverTimezone=Asia/Shanghai");
    dataSource.setUser("root");
    dataSource.setPassword("root");
    dataSource.setDriver("com.mysql.cj.jdbc.Driver");
    dataSource.setMaxPoolSize(50);
    dataSource.setInitialPoolSize(10);
    dataSource.setMinPoolSize(10);
    dataSource.setMaxWaitTime(60 * 1000);
    dataSource.setMaxIdleTime(10 * 1000);
    dataSource.addFeature(new CleanerFeature(true.5 * 1000));
    
    test(dataSource, 10000);
    System.out.println(System.currentTimeMillis() - start + " ms");
}

public static void test(DataSource dataSource, int count) throws SQLException, InterruptedException {
    CountDownLatch cdl = new CountDownLatch(count);
    for(int i = 0; i < count; i ++) {
        tpe.execute(() -> {
            try {
                HoneycombConnection connection = (HoneycombConnection) dataSource.getConnection();
                Statement s = connection.createStatement();
                s.executeQuery("select * from test limit 1");
                connection.close();
            }catch(Exception e) {
            }finally{ cdl.countDown(); }}); } cdl.await(); tpe.shutdown(); }Copy the code

PC configuration: Intel(R) Core(TM) i5-8300H CPU @ 2.30ghz 2.30ghz 4-core 8G 512SSD

10000 queries, time:

938 ms
Copy the code

Conclusion: Call the Portal again: github.com/ainilili/ho…