Setting up a connection is an expensive behavior because of TCP’s three-way handshake and so on. So in a program that requires multiple interactions with a particular entity, you need to maintain a pool of connections that can be reused for reuse.

The most basic requirement for maintaining a pool of connections is thread safe, especially in languages where Golang is a Goroutine feature.

Implement simple connection pooling

type Pool struct {
	m sync.Mutex // Secure closed threads when multiple Goroutines are accessed
	res chan io.Closer // Connect the stored chan
	factory func(a) (io.Closer,error)// Create a factory method for the connectionclosed bool// Connection pool closing flag}Copy the code

For this simple connection pool, we use chan to store the connections in the pool. The method of creating a new structure is also relatively simple:

func New(fn func(a) (io.Closer, error).size uint) (*Pool, error) {
	if size <= 0 {
		return nil, errors.New("Size is too small.")}return &Pool{
		factory: fn,
		res:     make(chan io.Closer, size),
	}, nil
}
Copy the code

You only need to provide the corresponding factory function and the size of the connection pool.

Get connected

So how do we get resources from that? Since the structure of our internal storage connection is CHAN, a simple SELECT is thread-safe:

// Get a resource from the resource pool
func (p *Pool) Acquire(a) (io.Closer,error) {
	select {
	case r,ok := <-p.res:
		log.Println("Acquire: Share resources")
		if! ok {return nil,ErrPoolClosed
		}
		return r,nil
	default:
		log.Println("Acquire: Newly generated resources")
		return p.factory()
	}
}
Copy the code

We first get the connection from the pool’s RES chan, if not we use the factory function we have already prepared to construct the connection. We also use OK to verify that the connection pool is closed when we get the connection from res. If it is already closed we return a connection closed error that was already prepared.

Disabling connection pooling

So speaking of turning off connection pooling, how do we turn off connection pooling?

// Close the resource pool to release resources
func (p *Pool) Close(a) {
	p.m.Lock()
	defer p.m.Unlock()

	if p.closed {
		return
	}

	p.closed = true

	// Close the channel to stop writing
	close(p.res)

	// Close the resource in the channel
	for r:=range p.res {
		r.Close()
	}
}
Copy the code

Here we need to lock p.m.lock () because we need to read and write to closed inside the structure. After setting the flag bit, close the res chan, so that the Acquire method cannot Acquire new connections. Let’s Close the res chan connection.

Release the connection

To release a connection, you must first assume that the connection pool has not been closed. If the pool is closed and you send a connection to the RES, you can trigger panic.

func (p *Pool) Release(r io.Closer){
	// Ensure that this operation and the Close method are safe
	p.m.Lock()
	defer p.m.Unlock()

	// The resource pool is closed
	if p.closed {
		r.Close()
		return
	}

	select {
	case p.res <- r:
		log.Println("Resources are released into the pool.")
	default:
		log.Println("Resource pool is full, release this resource.")
		r.Close()
	}
}
Copy the code

This is a simple and thread-safe implementation of connection pooling. We can see that connection pooling is now implemented, but there are a few minor drawbacks:

  1. There is no limit on the maximum number of connections. If the thread pool is empty, we return a new connection by default. It’s easy (especially for MySQL) to create new connections when the concurrency is hightoo many connectionsAn error was reported.
  2. Since we need to ensure the maximum number of connections available, we don’t want the number to be too rigid. We want to maintain a certain number of free connections idleNum when idle, but we also want to limit the maximum number of connections available maxNum.
  3. The first case is the case of too much concurrency, so what if there’s too little concurrency? Now when we create a connection and return it, we don’t use it for a long time. The connection could have been made hours or more ago. There is no way to guarantee the availability of a connection that has been idle for a long time. There is a chance that the next connection we get will be one that is already dead.

We can take a look at the MySQL connection pool library and Redis connection pool library, which are already in use, and see how they solve these problems.

Sql connection pool for Golang standard library

Golang’s connection pooling is implemented in the standard library Database/SQL/SQL.go. When we run:

db, err := sql.Open("mysql"."xxxx")
Copy the code

“, a connection pool is opened. We can look at the structure of the returned DB:

type DB struct {
	waitDuration int64 // Total time waited for new connections.
	mu           sync.Mutex // protects following fields
	freeConn     []*driverConn
	connRequests map[uint64]chan connRequest
	nextRequest  uint64 // Next key to use in connRequests.
	numOpen      int    // number of opened and pending open connections
	// Used to signal the need for new connections
	// a goroutine running connectionOpener() reads on this chan and
	// maybeOpenNewConnections sends on the chan (one send per needed connection)
	// It is closed during db.Close(). The close tells the connectionOpener
	// goroutine to exit.
	openerCh          chan struct{}
	closed            bool
	maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
	maxOpen           int                    // <= 0 means unlimited
	maxLifetime       time.Duration          // maximum amount of time a connection may be reused
	cleanerCh         chan struct{}
	waitCount         int64 // Total number of connections waited for.
	maxIdleClosed     int64 // Total number of connections closed due to idle.
	maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
}
Copy the code

Above, some fields that do not need to be concerned about for the time being are omitted. We can see that the DB connection pool internal storage connection structure freeConn, is not chan, we used **[]driverConn**, a connection slice. We can also see that there are related variables such as maxIdle to control the number of idle connections. It is worth noting that the DB initialization function Open does not create a new database connection. Where is the new connection? We can go all the way back in the Query method and see this function: func (db * db) conn(CTX context. context, strategy connReuseStrategy) (*driverConn, error). The way we get connections from the pool starts here:

Get connected

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // Check whether db is shut down.
	db.mu.Lock()
	if db.closed {
		db.mu.Unlock()
		return nil, errDBClosed
	}
	// Check whether the context has been cancelled due to timeout or other reasons.
	select {
	default:
	case <-ctx.Done():
		db.mu.Unlock()
		return nil, ctx.Err()
	}
	lifetime := db.maxLifetime

	// If there are free connections in the freeConn section, left pop a column. It should be noted that because it is a slice operation, it needs to be locked in front of it and unlocked after obtaining it. Also determine whether the returned connection has expired.
	numFree := len(db.freeConn)
	if strategy == cachedOrNewConn && numFree > 0 {
		conn := db.freeConn[0]
		copy(db.freeConn, db.freeConn[1:])
		db.freeConn = db.freeConn[:numFree- 1]
		conn.inUse = true
		db.mu.Unlock()
		if conn.expired(lifetime) {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		// Lock around reading lastErr to ensure the session resetter finished.
		conn.Lock()
		err := conn.lastErr
		conn.Unlock()
		if err == driver.ErrBadConn {
			conn.Close()
			return nil, driver.ErrBadConn
		}
		return conn, nil
	}

	// This is the point of waiting for a connection. When the idle connection is empty, a new request will be created and wait
	if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
		// The following action is equivalent to inserting your numbers into the connRequests map.
		// After inserting the number card, there is no need to block and wait for the logic to continue.
		req := make(chan connRequest, 1)
		reqKey := db.nextRequestKeyLocked()
		db.connRequests[reqKey] = req
		db.waitCount++
		db.mu.Unlock()

		waitStart := time.Now()

		// Timeout the connection request with the context.
		select {
		case <-ctx.Done():
			// When context cancels, remember to retrieve your numbers from the connRequests map.
			db.mu.Lock()
			delete(db.connRequests, reqKey)
			db.mu.Unlock()

			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

			select {
			default:
			case ret, ok := <-req:
                // This side is notable because it is now cancelled by context. But I just put my number in the line. It may have already been sent, so be careful to return it!
				ifok && ret.conn ! =nil {
					db.putConn(ret.conn, ret.err, false)}}return nil, ctx.Err()
		case ret, ok := <-req:
            // The following is the operation after the connection has been obtained. Check the status of the acquired connection. Because it might be overdue and so on.
			atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

			if! ok {return nil, errDBClosed
			}
			if ret.err == nil && ret.conn.expired(lifetime) {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			if ret.conn == nil {
				return nil, ret.err
			}
			ret.conn.Lock()
			err := ret.conn.lastErr
			ret.conn.Unlock()
			if err == driver.ErrBadConn {
				ret.conn.Close()
				return nil, driver.ErrBadConn
			}
			return ret.conn, ret.err
		}
	}
	// This is what you need to do to create a connection if the above restriction does not exist.
	db.numOpen++ // optimistically
	db.mu.Unlock()
	ci, err := db.connector.Connect(ctx)
	iferr ! =nil {
		db.mu.Lock()
		db.numOpen-- // correct for earlier optimism
		db.maybeOpenNewConnections()
		db.mu.Unlock()
		return nil, err
	}
	db.mu.Lock()
	dc := &driverConn{
		db:        db,
		createdAt: nowFunc(),
		ci:        ci,
		inUse:     true,
	}
	db.addDepLocked(dc, dc)
	db.mu.Unlock()
	return dc, nil
}
Copy the code

In short, the DB structure uses slice to store connections and a queue like connRequests mechanism to retrieve pending connections. At the same time in the judgment of connection health have a good consideration. So now that we have queuing, what happens when we return the connection?

Release the connection

Func (db * db) putConnDBLocked(DC *driverConn, err Error) bool As the comment states, the main purpose of this method is:

Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.

Let’s focus on the key lines:

.// If you have exceeded the maximum open number, you do not need to return to the pool
	if db.maxOpen > 0 && db.numOpen > db.maxOpen {
		return false
	}
	// This is the main point, basically drawing a random queue of requests from the connRequest map. Take it out and send it to him. You don't have to return the pool.
	if c := len(db.connRequests); c > 0 {
		var req chan connRequest
		var reqKey uint64
		for reqKey, req = range db.connRequests {
			break
		}
		delete(db.connRequests, reqKey) // Delete the queued request.
		if err == nil {
			dc.inUse = true
		}
        // Assign the connection to the queued connection.
		req <- connRequest{
			conn: dc,
			err:  err,
		}
		return true
	} else if err == nil && !db.closed {
        // If there is no queue, let's see if the maximum number of connections is reached. Return it to freeConn before it arrives.
		if db.maxIdleConnsLocked() > len(db.freeConn) {
			db.freeConn = append(db.freeConn, dc)
			db.startCleanerLocked()
			return true
		}
		db.maxIdleClosed++
	}
...
Copy the code

And we can see that when we return the connection, if there’s a request that’s waiting in the queue it’s not returned to the pool and it’s sent directly to the person who’s waiting in the queue.

Now basically solve the small problem said in front. There will never be too many connections to control. It is also good to maintain a minimum number of connection pools. The health of the connection is also checked.

It’s worth noting that as standard library code, the comments and code are perfect and really refreshing to read.

redisGolang implementation of Redis client

Golang’s Redis client implements connection pooling. The idea here is very wonderful, still can learn a lot of good ideas. Of course, it’s a little confusing at first because the code has fewer comments. The code address in https://github.com/go-redis/redis/blob/master/internal/pool/pool.go can see.

Its connection pool structure is as follows

type ConnPool struct{... queuechan struct{}

	connsMu      sync.Mutex
	conns        []*Conn
	idleConns    []*Conn
	poolSize     int
	idleConnsLen int

	stats Stats

	_closed  uint32 // atomic
	closedCh chan struct{}}Copy the code

We can see that the connection structure is still slice. But we can focus on variables like queue, conns, idleConns, which we’ll talk about later. But it’s worth noting! Conn** []Conn** conns; idleConns;

Where does the connection exist?

Create a connection pool connection

Let’s start by creating a new pool connection:

func NewConnPool(opt *Options) *ConnPool{... p.checkMinIdleConns()if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
		go p.reaper(opt.IdleCheckFrequency)
	}
	....
}
Copy the code

The function that initializes the connection pool differs from the previous two.

  1. checkMinIdleConnsMethod to fill the pool with free connections when the pool is initialized.
  2. go p.reaper(opt.IdleCheckFrequency)When initializing the connection pool, a go procedure is initiated to periodically flush out connections in the pool.

Get connected

func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
	if p.closed() {
		return nil, ErrClosed
	}
	
    // This is not the same as the previous SQL get join function flow. SQL is to check whether the connection pool is free, if there is no access to queue. On this side, the token is directly queued first, and the queueing function will be analyzed later.
	err := p.waitTurn(ctx)
	iferr ! =nil {
		return nil, err
	}
	// If there is no error, the queue is already waiting for you. The next step is the acquisition process.
	for {
		p.connsMu.Lock()
        // Get a free connection from the free connection.
		cn := p.popIdle()
		p.connsMu.Unlock()

		if cn == nil {
            // Break out of the loop when there are no idle connections.
			break
		}
		// Check if it is obsolete, if it is, close is off and continue fetching.
		if p.isStaleConn(cn) {
			_ = p.CloseConn(cn)
			continue
		}

		atomic.AddUint32(&p.stats.Hits, 1)
		return cn, nil
	}

	atomic.AddUint32(&p.stats.Misses, 1)
	
    // If there are no free connections, create a new connection.
	newcn, err := p.newConn(ctx, true)
	iferr ! =nil {
        // Return the token.
		p.freeTurn()
		return nil, err
	}

	return newcn, nil
}
Copy the code

We can try to answer the first question: Where does the connection actually exist? IdleConns = idleConns = idleConns = idleConns = idleConns; But is it really true? We’ll look at that later.

At the same time, my understanding is:

  1. SQL queuing means that after I apply for a connection to the pool, I tell the pool my number. Call my number as soon as the connection sees an opening. I said yes, and the pool just gave me a connection. If I don’t return it, the connection pool never calls the next number.
  2. What redis is saying is, instead of requesting a connection, I’m requesting a token from the connection pool. I wait in line until the connection pool gives me a token, and then I go to the warehouse to find a free connection or create a new one myself. When you run out of connections, you must return the token in addition to the connection. Of course, if I make a mistake creating a connection myself, and I don’t even get the connection home, I have to give the token back to the pool, otherwise the pool will have fewer tokens and the maximum number of connections will be smaller.

And:

func (p *ConnPool) freeTurn(a) {
	<-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error{...case p.queue <- struct{} {} :return nil. }Copy the code

It’s the chan queue that keeps the token count up.

So what does Conns do? We can look at the new connection function:

A new connection

func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
	cn, err := p.dialConn(ctx, pooled)
	iferr ! =nil {
		return nil, err
	}

	p.connsMu.Lock()
	p.conns = append(p.conns, cn)
	if pooled {
		// If the pool is full, it will be removed later.
		if p.poolSize >= p.opt.PoolSize {
			cn.pooled = false
		} else {
			p.poolSize++
		}
	}
	p.connsMu.Unlock()
	return cn, nil
}
Copy the code

The basic logic is there. If I create a connection, I don’t put it in idleConns, I put it in Conns first. And let’s see if the pool is full. If it is full, it will be marked and deleted. So when does this get deleted later? That’s when it’s time to return the connection.

Return the connection

func (p *ConnPool) Put(cn *Conn) {
	if cn.rd.Buffered() > 0 {
		internal.Logger.Printf("Conn has unread data")
		p.Remove(cn, BadConnError{})
		return
	}
	// The pool is not allowed to be pooled. Of course, there will be freeTurn operations inside.
	if! cn.pooled {// This method is the flag bit in front of it. If the flag is not pooled, it will be deleted.
		p.Remove(cn, nil)
		return
	}

	p.connsMu.Lock()
	p.idleConns = append(p.idleConns, cn)
	p.idleConnsLen++
	p.connsMu.Unlock()
    // We can see the obvious action of returning the number card.
	p.freeTurn()
}
Copy the code

The answer is that all connections are actually stored in conns slices. If the connection is idle, add a pointer to idleConns!

In fact, the return process is to check whether the connection I intend to return is the product of overbooking. If it is, there is no need to pool it, just delete it directly. Otherwise, the connection itself (a pointer) is appended in idleConns as well.

Wait, the logic doesn’t seem right? Let’s get the connection process straight:

  1. First,waitTurn, get the token. The number of tokens is based on the number in the poolqueueA decision.
  2. Got the token. Go to the warehouseidleConnsInside take the free connection. If not, by yourselfnewConnOne, and put him on recordconnsThe inside.
  3. When finished, callputReturn: that is, fromconnsAdd the pointer to the connectionidleConns. Check it when you return itnewConnIs it already marked oversold? If it is, it will not be transferred toidleConns.

I was wondering for a long time, since you always need to get a token to get a connection, the number of tokens is fixed. Why is it oversold? Looking through the source code, my answer is:

Although the Get method is private newConn, it is token-controlled and does not oversold. But this method accepts the parameter pooled bool. If someone else calls this method and passes true, the poolSize will get bigger and bigger.

In general, connection number control for the connection pool redis is still done in queue, which I call the chan token.

conclusion

As you can see above, the most basic guarantee for connection pooling is thread-safe access to connections. But many additional features are implemented from a different Angle. It was very interesting. However, whether the storage structure is chan or slice, this can be achieved well. If you use slice to store connections like SQL or Redis, you have to maintain a structure that represents the effect of queuing.