Ants is a widely used Goroute pool that can effectively control the number of coroutines and prevent excessive coroutines from affecting program performance.

Why do WE need a coroutine pool

Goroute is a lightweight user-mode thread that can implement time-sharing multiplexing on a thread-based basis, suitable for IO intensive applications. Goroute has the advantages of light weight, low resource occupancy and low switching overhead. However, limited by resources, the number of coroutines created is also limited. Creating too many coroutines will cause the program to consume too many system resources, thus affecting the normal operation of the program.

Next, we use two examples to illustrate the impact of creating too many coroutines.

  1. Open coroutines infinitely, causing the standard output to be manipulated concurrently, resulting in panic.

func main(a) {
    var wg sync.WaitGroup
    for i := 0; i < math.MaxInt32; i++ {
        wg.Add(1)
        go func(i int) {
                defer wg.Done()
                fmt.Println(i)
                time.Sleep(time.Second * 100)
        }(i)
    }

    wg.Wait()
}

panic: too many concurrent operations on a single file or socket (max 1048575)

goroutine 1980639 [running]:
internal/poll.(*fdMutex).rwlock(0xc000094060.0xc0ee9bd600.0x1097c75)
        /usr/local/go/src/internal/poll/fd_mutex.go:147 +0x13f.Copy the code
  1. Unlimited open coroutine, resulting in the program to occupy too much memory, there is insufficient memory and crash risk.
func main(a) {
    var wg sync.WaitGroup
    for i := 0; i < math.MaxInt32; i++ {
        wg.Add(1)
        go func(i int) {
                defer wg.Done()
                time.Sleep(time.Second * 100)
        }(i)
    }

    wg.Wait()
}

PID    COMMAND      %CPU  TIME     #TH    #WQ  #PORTS  MEM    PURG   CMPRS
29954  ___go_build_ 241.1 03:33.68 18/5   0    27      24G+   0B     17G+
Copy the code

Quick to use

The ants coroutine pool is very simple to use. It supports the default coroutine pool defaultAntsPool, custom coroutine pool NewPool(size,options), and specified method coroutine pool NewPoolWithFunc.

// Use the default coroutine pool
_ = ants.Submit(func(a) {
    fmt.Println("hello")})// Use a custom coroutine pool
p, _ := ants.NewPool(1000)
_ = p.Submit(func(a) {
    fmt.Println("hello")})// Use the coroutine pool of the specified method
p, _ := ants.NewPoolWithFunc(1000.func(i interface{}) {
    fmt.Println(i)
})
_ = p.Invoke(Param)
Copy the code

Realize the principle of

In the antsPool coroutine pool, each worker corresponds to a Goroute. During worker initialization, a Goroute will be created using the go keyword. Then the Goroute will continuously monitor and execute tasks in taskChan, similar to the producer-consumer mode.

AntsPool coroutine pool manages workers through workerArray. WorkerArray stores workers in an orderly manner according to their joining time, which has FILO characteristics. AntsPool also has the feature of periodically clearing expired workers. It will periodically search for expired workers from workerArray and put them into workerPool sync.Pool to wait for GC.

The source code parsing

Next, take ants.submit (Task func()) as an example to analyze the source code of ANTS. The ants.submit () method uses the defaultAntsPool default coroutine pool for task execution.

const DefaultAntsPoolSize = math.MaxInt32

var (
    defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
)

// Submit submits a task to pool.
func Submit(task func(a)) error {
    return defaultAntsPool.Submit(task)
}
Copy the code

Structure of coroutine pool

The structure of ANTS coprogramming pool is relatively simple and will not be introduced here.

// Coroutine pool structure
type Pool struct {
    // The capacity of the coroutine pool
    capacity int32
    // The number of running coroutines
    running int32
    // lock protects workerArray
    lock sync.Locker 
    // Save the available worker
    workers workerArray
    // Whether the standard program pool is closed
    state int32 
    // cond is used to wait for idle workers
    cond *sync.Cond 
    // Temporarily store expired workers, which will be GC if they are not used again
    workerCache sync.Pool
    // The number of blocked coroutines, which refers to the number of submitted tasks
    blockingNum int
    // The configuration of the coroutine pool, including expiration time, whether preallocation is supported, maximum blocking number, panic handling, logging, etc
    options *Options
}

// Worker structure
type goWorker struct {
    // Used to record which coroutine pool the current worker belongs to
    pool *Pool
    // The chan size is 1 in the multi-core environment and 0 in the single-core environment.
    task chan func(a)
    // When the worker enters the queue
    recycleTime time.Time
}
Copy the code

Creation of a coroutine pool

The following operations are performed to create the ants coroutine pool:

  1. Personalized configuration according to the defined option;
  2. The specifiedworkerCache sync.PoolMethod to create worker;
  3. Initialize workerArray. There are two ways to implement workerArray in ANTS, using preallocationloopQueueCircular queue implementation, do not use pre-allocation usedworkerStackStack implementation;
  4. Start a coroutine to periodically clean workers in workerArray;
func NewPool(size int, options ... Option) (*Pool, error) {
    // Configure according to option
    opts := loadOptions(options...)
    // Verify the coroutine pool capacity
    if size <= 0 {
        size = - 1
    }
    // Set expiration time
    if expiry := opts.ExpiryDuration; expiry < 0 {
        return nil, ErrInvalidPoolExpiry
    } else if expiry == 0 {
        opts.ExpiryDuration = DefaultCleanIntervalTime
    }
    // Log output Settings
    if opts.Logger == nil {
        opts.Logger = defaultLogger
    }
    // Create a Pool instance
    p := &Pool{
        capacity: int32(size),
        lock:     internal.NewSpinLock(),
        options:  opts,
    }
    // Specify sync.Pool to create the worker method
    p.workerCache.New = func(a) interface{} {
        return &goWorker{
            pool: p,
            task: make(chan func(a).workerChanCap),}}// Different workerArray implementations are used depending on the pre-allocated flags
    if p.options.PreAlloc {
        if size == - 1 {
            return nil, ErrInvalidPreAllocSize
        }
        p.workers = newWorkerArray(loopQueueType, size)
    } else {
        p.workers = newWorkerArray(stackType, 0)
    }

    p.cond = sync.NewCond(p.lock)

    // Start a coroutine cycle to clean expired workers
    go p.purgePeriodically()

    return p, nil
}
Copy the code

Task submitted

Task submission First obtains idle workers from the coroutine pool, then submits tasks to the worker’s taskChan, and waits for the worker to consume tasks.

func (p *Pool) Submit(task func(a)) error {
    if p.IsClosed() {
        return ErrPoolClosed
    }
    var w *goWorker
    if w = p.retrieveWorker(); w == nil {
        return ErrPoolOverload
    }
    w.task <- task
    return nil
}
Copy the code

The acquisition of idle workers adopts the priority policy, whose priority is as follows:

  1. Get available workers from workerArray first
  2. If the coroutine currently running does not reach the capacity of the coroutine pool, fetch and start a worker from workerCache
  3. The coroutine pool supports non-blocking and directly returns an empty worker
  4. The coroutine pool does not support non-blocking, so it blocks waiting for available workers
// retrieveWorker returns a available worker to run the tasks.
func (p *Pool) retrieveWorker(a) (w *goWorker) {
    spawnWorker := func(a) {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }

    p.lock.Lock()
    // 1. Obtain the worker from workerArray first
    w = p.workers.detach() 
    ifw ! =nil {
        p.lock.Unlock()
    } else if capacity := p.Cap(); capacity == - 1 || capacity > p.Running() {
    // 2. You can expand the capacity and obtain it from workerCache
        p.lock.Unlock()
        spawnWorker()
    } else { 
    // Support non-blocking, return nil worker directly
        if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
    // Block waiting for available workers
    retry:
        ifp.options.MaxBlockingTasks ! =0 && p.blockingNum >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }

        p.blockingNum++
        p.cond.Wait()
        p.blockingNum--

        var nw int
        if nw = p.Running(); nw == 0 {
            p.lock.Unlock()
            if! p.IsClosed() { spawnWorker() }return
        }
        if w = p.workers.detach(); w == nil {
            if nw < capacity {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }

        p.lock.Unlock()
    }
    return
}
Copy the code

The creation of a Worker

When workers cannot be retrieved from workerArray and workerCache, the coroutine pool creates a new worker and calls worker.run() to start the worker. When the worker starts, a Goroute listens and executes tasks in taskChan. Until the worker receives the termination signal nil or the coroutine pool is full and cannot be put back into the coroutine pool, the worker will exit taskChan’s monitoring and enter the cleaning process.

func (p *Pool) retrieveWorker(a) (w *goWorker) {
    spawnWorker := func(a) {
        // instantiate worker
        w = p.workerCache.Get().(*goWorker)
        / / start the worker
        w.run()
    }
    / /...
}

func (w *goWorker) run(a) {
    w.pool.incRunning()  // Number of running coroutines +1
    go func(a) {
        defer func(a) {  // Clear the correlation
            w.pool.decRunning() // Number of running coroutines -1
            w.pool.workerCache.Put(w) // Put the worker into workerCache to wait for GC
            if p := recover(a); p ! =nil { / / panic
                ifph := w.pool.options.PanicHandler; ph ! =nil {
                    ph(p)
                } else {
                    w.pool.options.Logger.Printf("worker exits from a panic: %v\n", p)
                    var buf [4096]byte
                    n := runtime.Stack(buf[:], false)
                    w.pool.options.Logger.Printf("worker exits from panic: %s\n".string(buf[:n]))
                }
            }
            w.pool.cond.Signal() // Wake up the coroutine to get the worker} ()// Constantly consume tasks in taskChan
        for f := range w.task {
            if f == nil { // Exit the loop and enter the cleanup session when a nil termination signal is received
                return
            }
            f()	// Execute the task
            ifok := w.pool.revertWorker(w); ! ok {// Put the worker back into the workerArray
                return(1)}}}}Copy the code

Cleaning of coprogram pool

Since workerArray is sorted according to the insertion time of workers, the expired workers can be easily found by binary search when obtaining expired workers. After finding the expired worker, it will send termination signal nil to the expired worker and empty the reference of the expired worker to facilitate the worker to be GC.

func (p *Pool) purgePeriodically(a) {
    heartbeat := time.NewTicker(p.options.ExpiryDuration)
    defer heartbeat.Stop()

    for range heartbeat.C {
        // If the coroutine pool is already closed, exit the scheduled task of cleanup
        if p.IsClosed() {
            break
        }

        // Obtain the expired worker from workers
        p.lock.Lock()
        expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration)
        p.lock.Unlock()

        // Clear expired workers
        for i := range expiredWorkers {
            // Send a termination signal to worker's taskChan; When the worker receives a nil task, it enters workerCache and waits for GC
            expiredWorkers[i].task <- nil
            // Clear worker references for GC
            expiredWorkers[i] = nil
        }

        // Wake up the coroutine to get the worker
        if p.Running() == 0 {
            p.cond.Broadcast()
        }
    }
}
Copy the code

reference

  • Geektutu.com/post/hpg-co…
  • Github.com/panjf2000/a…
  • Strikefreedom. Top/high – perfor…