This time, I will introduce the use of golang coroutine pool, using throttler implementation as an example.

First of all, how to use ~

func ExampleThrottler() {
	var urls = []string{
		"http://www.golang.org/"."http://www.google.com/"."http://www.somestupidname.com/"} parameter 1: number of coroutines started Parameter 2: number of tasks to execute t := New(2, len(urls))for_, Url := range urls {// goroutine starts go func(url string) {// Request url err := http.get (url) // let Throttler knows when goroutines are complete, and then Throttler will appoint a new worker.if errorCount > 0 {
			break}}}Copy the code

Although the author’s readme.md is not written, we can use this as well

package main

import (
	"github.com/throttler"
	"fmt"
)

func main() {

	p := throttler.New(10, 5)

	go func() {
		fmt.Println("hello world1")
		defer p.Done(nil)
	}()
	fmt.Println(1)
	p.Throttle()
	go func() {
		fmt.Println("hello world2")
		p.Done(nil)
	}()
	fmt.Println(2)
	p.Throttle()
	go func() {
		fmt.Println("hello world3")
		p.Done(nil)
	}()
	fmt.Println(3)
	p.Throttle()
	//fmt.Println(err + 3)
	go func() {
		fmt.Println("hello world4")
		p.Done(nil)
	}()
	fmt.Println(4)
	p.Throttle()
	//fmt.Println(err + 2)
	go func() {
		fmt.Println("hello world5")
		p.Done(nil)
	}()
	fmt.Println(5)
	p.Throttle()
}

Copy the code

That’s Throttle, and it looks pretty simple, but how does it work?

First let’s look at throttle’s body structure, which is what the rest of the operations are implemented around

// Throttler stores all the information about the number of workers, the active workers and error information
typeThrottler struct {maxWorkers int32 // Maximum number of workers workerCount int32 // Maximum number of workers batchingTotal int32 batchSize int32 // totalJobs int32 // totalJobs int32jobsStarted INT32 // Number of tasks Started (initial value is 0)jobsCompleted Int32 // Number of Completed tasksdoneMutex *sync.Mutex // errMutex concurrency errs []error // A collection of error arrays, typically error errorCount int32} returned by business processesCopy the code

The New operation creates a coroutine pool

Func New(maxWorkers, totalJobs int) *Throttler {// If less than 1 panicif maxWorkers < 1 {
		panic("maxWorkers has to be at least 1")}return// maxWorkers: int32(maxWorkers), batchSize: 1, // totalJobs: int32(totalJobs),doneChan:   make(chan struct{}, totalJobs),
		errsMutex:  &sync.Mutex{},
	}
}
Copy the code

When a coroutine action is completed

func (t *Throttler) Done(err error) {
	iferr ! T.elsmutex.lock () t.elrs = append(t.elrs, err) // errorCount ++ atomic.AddInt32(&t.errorCount, 1) t. errrsmutex.unlock ()} // Whenever a goroutine comes in, write a piece of data to the struct.Copy the code

The implementation of a function waiting for the coroutine to complete can be a little more complicated

Func (t *Throttler) Throttle() int {// Load tasks < 1 return the number of errorsif atomic.LoadInt32(&t.totalJobs) < 1 {
		returnint(atomic.LoadInt32(&t.errorCount)) } // jobStarted + 1 atomic.AddInt32(&t.jobsStarted, 1) // workerCount + 1 atomic.AddInt32(&t.workerCount, 1) // Check whether the number of the current worker is consistent with the number of maxworker, and wait for the completion of this workerifAtomic.loadint32 (&t.wokerCount) == atomic.loadInt32 (&t.maxworkers) {// DonejobsCompleted - 1
		atomic.AddInt32(&t.jobsCompleted, 1)
		// workerCount - 1
		atomic.AddInt32(&t.workerCount, -1)
		<-t.doneChan
	}

	// check to see if all of the jobs have been started, and if so, wait until all
	// jobsIf the number of tasks started is the same as the total number of tasksifAtomy.loadint32 (&t.obsStarted) == atomy.loadInt32 (&t.totalJobs) {// Wait for jobs to complete if the number of completed jobs is less than the total number of jobsfor atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) {
			// jobcomplete + 1
			atomic.AddInt32(&t.jobsCompleted, 1)
			<-t.doneChan
		}
	}

	return int(atomic.LoadInt32(&t.errorCount))
}
Copy the code

The following process is simply enumerated:

Let’s say I have a 2 request limit,3 requests, and its sequence diagram looks like this

The first round

totaljobs = 3
jobstarted = 1 workercount = 1   jobscompleted = 0 totaljobs = 3
Copy the code

The second round

jobstarted = 2 worker count = 2   jobscompleted = 0 totaljobs = 3
Copy the code

In the third round

Jobstarted = 3 worker count = 3 jobscompleted = 0 totalJobs = 3 Coroutine pool release: Jobstarted = 3 worker count = 2 jobsCompleted = 1 TotalJobs = 3 Jobstarted = 3 worker count = 2 jobscompleted = 3 TotalJobs = 3Copy the code

In general, this implementation also borrows the channel’s ability to block, implementation is very simple ~