I translated worker pools into thread pools for ease of understanding.

What is a buffer Channel

All channels discussed earlier are bufferless, so reading and writing are blocked. It is also possible to create a buffered channel, which blocks only when the buffer is full and an empty channel is written or read.

Creating a buffered channel requires an additional parameter capacity to indicate the buffer size:

ch := make(chan type, capacity)
Copy the code

Capacity in the above code needs to be greater than 0, if it is 0 then it is the bufferless channel we learned earlier.

example

package main

import (  
    "fmt"
)


func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}
Copy the code

In the example above, we created a channel of capacity 2, so write operations are not blocked until two strings are written. Then read from line 12 and line 13, respectively. The program output is as follows:

naveen  
paul
Copy the code

Another example

Let’s look at an example where we write in a concurrent goroutine and then read in the main Goroutine to help us understand the buffer channel.

package main

import (  
    "fmt"
    "time"
)

func write(ch chan int) {  
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")
    }
    close(ch)
}
func main() {  
    ch := make(chan int, 2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)

    }
}
Copy the code

In the above code, we create a buffer channel of capacity 2 and pass it as an argument to the write function, followed by sleep for 2 seconds. Write the function concurrently, using the for loop in the function to write 0-4 to ch. Since the capacity is 2, 0 and 1 can be written to the channel immediately, and then block until at least one value is read. So the program immediately prints the following two lines:

successfully wrote 0 to ch  
successfully wrote 1 to ch
Copy the code

After sleep2 seconds in main, enter the for range loop and start reading data, then continue sleep2 seconds. So the program will then print:

read value 0 from ch  
successfully wrote 2 to ch
Copy the code

This loop continues until the channel is closed, and the program outputs the following output:

successfully wrote 0 to ch  
successfully wrote 1 to ch  
read value 0 from ch  
successfully wrote 2 to ch  
read value 1 from ch  
successfully wrote 3 to ch  
read value 2 from ch  
successfully wrote 4 to ch  
read value 3 from ch  
read value 4 from ch
Copy the code

A deadlock

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}
Copy the code

In the program above, we want to write three strings to a channel of capacity 2. The program will block at line 11 because the channel buffer is full. If no other Goroutine reads from it, the program will be deadlocked. Error as follows:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:  
main.main()  
    /tmp/sandbox274756028/main.go:11 +0x100
Copy the code

Length and capacity

Capacity is the maximum amount of data a buffered channel can store at one time. This value is used when creating a channel using the make keyword. The length refers to how many data are stored in the current channel. Let’s look at the following code:

package main

import (  
    "fmt"
)

func main() {  
    ch := make(chan string, 3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is", cap(ch))
    fmt.Println("length is", len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is", len(ch))
}
Copy the code

In the above code we created a channel of capacity 3 and wrote 2 strings into it, so now the length of the channel is 2. We then read 1 string from the channel, so the length is now 1. The program output is as follows:

capacity is 3  
length is 2  
read value naveen  
new length is 1
Copy the code

WaitGroup

In the next section we’ll look at worker pools. To better understand this, we need to first introduce WaitGroups, and then implement thread pools based on this.

The WaitGroup is used to wait for a set of Goroutines to complete, until the program is blocked. If we have three goroutines, the main program will wait for all three to finish executing before exiting. Without further ado, look at the code:

package main

import (  
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {  
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {  
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}
Copy the code

WaitGroup is a struct type, and we create a default WaitGroup at line 18, which is implemented internally based on counters. We call the Add method and pass it a number as a parameter, and the counter increments the value of the passed parameter. When the Done method is called, the counter decays by 1. The Wait method blocks the Goroutine until the counter returns to zero.

The above code makes the counter 3 by calling WG.add (1) in a loop, starts three goroutines at the same time, and then blocks the main goroutine with WG.wait () until the counter goes to zero. In the function process, wG.done () is called to reduce the counter, and once the three goroutine executions are complete, WG.done () is executed three times, the counter goes to zero, and the main goroutine is unblocked.

It is very important to pass wg’s address to Goroutine! If the address is not passed, there will be a copy of each goroutine, so that the end of each goroutine cannot be notified to main.

The program output is as follows:

started Goroutine 2 started Goroutine 0 started Goroutine 1 Goroutine 0 ended Goroutine 2 ended Goroutine 1 ended All go  routines finished executingCopy the code

Your output may be slightly different from the above.

Worker pools

An important use of buffer channels is to implement thread pools.

Generally speaking, a thread pool is a collection of threads waiting for a task to be assigned to them, and once the task is completed, they continue to wait for the next task.

Next we implement a thread pool to calculate the sum of each digit of the input number. For example, if 123 is entered, 9(1+2+3) is returned, and the number entered to the thread pool is generated by a pseudo-random algorithm.

Here are the core steps we need:

  • Create a set of Goroutine collections to listen for buffer channel waiting tasks.
  • Add tasks to buffer channel.
  • After the task ends, the result is written to another buffer channel.
  • Read and output data from the channel where the results are stored.

First we create a structure to store tasks and results:

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}
Copy the code

Each Job has an ID and a Randomno to store the random number to be computed. The Result type contains the Job attribute and sumofdigits to store the Result.

Next create a buffer channel to receive tasks and results:

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)
Copy the code

Goroutine gets the task from Jobs and writes the result to Results.

The following digits function evaluates the sum and returns the result, which we simulated with Sleep.

func digits(number int) int { sum := 0 no := number for no ! = 0 { digit := no % 10 sum += digit no /= 10 } time.Sleep(2 * time.Second) return sum }Copy the code

The next function creates a goroutine:

func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
Copy the code

The Result structure is created by reading the task in JOBS and storing the Result of the function digits calculation, which is then written to the Results channel. This function takes a pointer parameter wg of type WaitGroup and calls Wg.done () when the calculation is complete.

The createWorkerPool function is used to create a thread pool:

func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
Copy the code

The above function creates a thread pool containing noOfWorkers goroutine, calls WG.add (1) to increment the counter before creating the goroutine, and then passes wg’s address to the worker function. Once created, use Wg.wait () to Wait for all goroutines to complete, and then call close to close the Results channel so that no goroutines can write to it.

Next, we write a function to assign tasks to the thread pool:

func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
Copy the code

The above function determines the number of jobs to write by passing in arguments, with a maximum random number of 998, and uses the counter I in the loop as the ID to create the job structure, write jobs, and close Jobs when done.

Next create a function that reads the results channel and prints it:

func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
Copy the code

The above function reads results and prints the ID, random number, and result, and finally writes data to the done channel to indicate that it has printed all results.

With everything in place, let’s complete the main function:

func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
Copy the code

First we record the time when the program starts executing, and finally we calculate the elapsed time of the program by subtracting the elapsed time from the elapsed time. We need this elapsed time to compare the difference between different number of thread pools.

Create a channel named done and pass it to the result function so that you can print out the output and be notified when all the output is done.

We ended up creating a thread pool of 10 Goroutines and waiting for the calculation to complete by reading Done.

The complete code is as follows:

package main import ( "fmt" "math/rand" "sync" "time" ) type Job struct { id int randomno int } type Result struct { job  Job sumofdigits int } var jobs = make(chan Job, 10) var results = make(chan Result, 10) func digits(number int) int { sum := 0 no := number for no ! = 0 { digit := no % 10 sum += digit no /= 10 } time.Sleep(2 * time.Second) return sum } func worker(wg *sync.WaitGroup) { for job := range jobs { output := Result{job, digits(job.randomno)} results <- output } wg.Done() } func createWorkerPool(noOfWorkers int) { var wg sync.WaitGroup for  i := 0; i < noOfWorkers; i++ { wg.Add(1) go worker(&wg) } wg.Wait() close(results) } func allocate(noOfJobs int) { for i := 0; i < noOfJobs; i++ { randomno := rand.Intn(999) job := Job{i, randomno} jobs <- job } close(jobs) } func result(done chan bool) { for result := range results { fmt.Printf("Job id %d,  input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits) } done <- true } func main() { startTime := time.Now() noOfJobs := 100 go allocate(noOfJobs) done :=  make(chan bool) go result(done) noOfWorkers := 10 createWorkerPool(noOfWorkers) <-done endTime := time.Now() diff := endTime.Sub(startTime) fmt.Println("total time taken ", diff.Seconds(), "seconds") }Copy the code

The running results are as follows:

Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23  
Job id 9, input random no 150, sum of digits 6  
...
total time taken  20.01081009 seconds
Copy the code

It’s going to have 100 lines of output, and because we created 100 jobs, your output order might be different than mine, and your output time might be different, depending on the hardware configuration. It took me a total of 20 seconds.

Next, by increasing noOfWorkers to 20, we increased the number of goroutines in the thread pool (doubled), and the running time should definitely be reduced (nearly halved). On my machine, the program output looks like this:

. Total Time taken 10.004364685 secondsCopy the code

As a result, increasing the number of goroutines in the thread pool reduces the running time of the program. Feel free to adjust the values of noOfJobs and noOfWorkers in Main to analyze the results.