• 原文地址:Part 23: Buffered Channels and Worker Pools
  • By Naveen R
  • Please note the source.

What is bufferingchannel

All channels we discussed in the last tutorial are essentially unbuffered. As we discussed in detail in the Channel tutorial, both sending and receiving unbuffered channels are blocked.

You can create channels using buffers. Sending to the buffered channel is blocked only if the buffer is full. Similarly, receiving from a buffered channel is blocked only if the buffer is empty.

A buffer channel can be created by adding a capacity argument to the make function, which specifies the size of the buffer.

ch := make(chan type, capacity)
Copy the code

For channels with buffers, the capacity in the above syntax should be greater than 0. By default, unbuffered channels have a capacity of 0, so the capacity parameter was omitted when creating channels in the previous tutorial.

Let’s create a buffer channel,

package main

import (
    "fmt"
)


func main(a) {
    ch := make(chan string.2)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println(<- ch)
    fmt.Println(<- ch)
}
Copy the code

Run in playgroud

In the program above, line 9 creates a buffer channel of capacity 2. Since a channel has a capacity of 2, two strings can be written without blocking. We write two strings on lines 10 and 11, then read the written strings and print them,

naveen
paul
Copy the code

Another example

Let’s look at another example of a buffered channel, where the value of the channel is written to the Goroutine and read from the main Goroutine. This example will help us better understand when to write a buffered channel.

package main

import (
    "fmt"
    "time"
)

func write(ch chan int) {
    for i := 0; i < 5; i++ {
        ch <- i
        fmt.Println("successfully wrote", i, "to ch")}close(ch)
}
func main(a) {
    ch := make(chan int.2)
    go write(ch)
    time.Sleep(2 * time.Second)
    for v := range ch {
        fmt.Println("read value", v,"from ch")
        time.Sleep(2 * time.Second)

    }
}
Copy the code

Run in playgroud

In the above program, a buffer channel CH of capacity 2 is created at line 16. Main Goroutine passes ch to Write Goroutine, and main Goroutine sleeps for 2 seconds. In the meantime, write Goroutine is running. Write Goroutine has a for loop that writes the numbers 0 through 4 to Channel CH. Since capacity is 2, it is possible to write values 0 and 1, and then block until at least one value is read from Channel CH. So the program will immediately print the following two lines,

successfully wrote 0 to ch
successfully wrote 1 to ch
Copy the code

After printing the above two lines, writes in write Goroutine are blocked until data is read from Channel CH. Since main Goroutine sleeps for two seconds, the program does not print anything for the next two seconds. When the main Goroutine is woken up, it starts reading and printing the read value from Channel CH using the for range loop, then sleeps again for 2 seconds, the loop continues until ch is closed. So the program will print the following line after 2 seconds,

read value 0 from ch
successfully wrote 2 to ch
Copy the code

Then continue until all values are written and the channel is closed. The final output is,

successfully wrote 0 to ch
successfully wrote 1 to ch
read value 0 from ch
successfully wrote 2 to ch
read value 1 from ch
successfully wrote 3 to ch
read value 2 from ch
successfully wrote 4 to ch
read value 3 from ch
read value 4 from ch
Copy the code

A deadlock

package main

import (
    "fmt"
)

func main(a) {
    ch := make(chan string.2)
    ch <- "naveen"
    ch <- "paul"
    ch <- "steve"
    fmt.Println(<-ch)
    fmt.Println(<-ch)
}
Copy the code

Run in playgroud In the above program, we write three strings to a buffer channel of capacity 2. When the third string was written, its capacity was exceeded, so the write operation was blocked. Now you must wait for the other Goroutine to read from the channel before you can continue writing, but there is no Goroutine reading from that channel in the code above. So a deadlock occurs, and the program will print the following at run time,

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
    /tmp/sandbox274756028/main.go:11 +0x100
Copy the code

Length VS Capacity

Capacity is the number of values a channel can hold. This is the value we specified when we created it using the make function.

The length is the number of elements currently in the channel.

A program will make understanding easier 😀

package main

import (
    "fmt"
)

func main(a) {
    ch := make(chan string.3)
    ch <- "naveen"
    ch <- "paul"
    fmt.Println("capacity is".cap(ch))
    fmt.Println("length is".len(ch))
    fmt.Println("read value", <-ch)
    fmt.Println("new length is".len(ch))
}
Copy the code

Run in playgroud

In the above program, the channel is created with a capacity of 3, which means it can hold three strings. Then we write two strings each, and now the channel has two strings, so its length is 2. We read a string from a channel. A channel now has only one string, so its length becomes 1. This program will print,

capacity is 3
length is 2
read value naveen
new length is 1
Copy the code

WaitGroup

The next part of this tutorial is about Worker Pools. To understand the working pool, we first need to understand the WaitGroup, which will be used for the working pool implementation.

WaitGroup is used to block main Goroutines until all Goroutines have finished executing. Let’s say we have three Goroutines generated from main Goroutine that need to be executed concurrently. Main Goroutines need to wait for the other three Goroutines to complete before terminating, otherwise the remaining Goroutines may not be properly executed by the time main Goroutines terminate, in which case WaitGroup can be used.

Stop the theoretical code 😀

package main

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main(a) {
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")}Copy the code

Run in playgroud

WaitGroup is a structural type, and we create a null variable of type WaitGroup in line 18. WaitGroup works by using counters. When we call Add on WaitGroup with an int, the counter increments the value passed to Add. Decrement counters by calling the Done method on WaitGroup. The Wait method blocks the Goroutine calling it until the counter reaches zero.

In the above program, we call the WG.add (1) loop at line 20 and iterate three times. So the counter is now 3. The for loop also produces three Goroutines, and main Goroutines calls WG.wait () at line 23 to block until the counter goes to zero. In Goroutine, the counter is reduced by calling WG.done. Once all 3 generated Goroutines have executed, i.e. wG.done () is called three times, the counter is cleared, main Goroutine is unblocked, the program is executed, output,

started Goroutine 2 started Goroutine 0 started Goroutine 1 Goroutine 0 ended Goroutine 2 ended Goroutine 1 ended All go  routines finished executingCopy the code

Your output may differ from mine because Goroutines are executed in a different order :).

Implementation of coroutine pool

An important use of buffer channels is the implementation of coroutine pools.

Typically, a coroutine pool is a set of coroutines waiting for tasks to be assigned to them. Once they complete the assigned task, they wait for the next one again.

We will implement the coroutine pool using buffer channels. Our coroutine pool will perform the task of finding the sum of the digits of the input digits. For example, if you pass 234, the output will be 9 (9 = 2 + 3 + 4). The input to the coroutine pool will be a list of pseudo-random integers.

The following are the core functions of our coroutine pool

  • To create aGoroutinesPool to listen for buffersjobs channelWaiting for task assignment
  • tojobs channelAdd tasks
  • When the task is complete, write the result to the bufferresults channel
  • fromresults channelRead and print the results

We will write this program step by step to make it easier to understand.

The first step is to create structures that represent tasks and results.

type Job struct {
    id       int
    randomno int
}

type Result struct {
    job         Job
    sumofdigits int
}
Copy the code

Each Job structure has an ID and Randomno, which is used to add up the numbers.

The Result structure has a job field and a sumofdigits field, which holds the Result of the sumof the job numbers.

The next step is to create a buffer channel for receiving tasks and storing results.

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
Copy the code

The worker Goroutines listens for new tasks on the task buffer channel. Once the task is complete, write the results to the results buffer channel.

The digits function looks up the sum of the digits of an integer and returns its. We added a 2-second sleep to this function to simulate a scenario where the function takes some time to evaluate the results.

func digits(number int) int {
    sum := 0
    no := number
    forno ! =0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
Copy the code

Next, you’ll write a function that creates the worker Goroutine.

func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
Copy the code

The above function creates a worker that reads the task from the Jobs Channel, creates the Result structure with the return value of the current task and the digits function, and then writes the Result to the results buffer channel. This function takes WaitGroup WG as an argument, and when all the tasks are complete, it calls the Done method to end the blocking of the current Goroutine.

The createWorkerPool function creates a Goroutines pool.

func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
Copy the code

The above function takes the number of workers to create. It calls WG.add (1) to increase the WaitGroup counter before creating the Goroutine. It then creates worker Goroutines by passing the address of the WaitGroup WG to the worker function. After creating the required worker Goroutines, it blocks the current coroutine by calling Wg.wait () until all Goroutines have finished executing, closing the Results Channel because all Goroutines have finished executing, No results are written to the Results channel.

Now that we have written the coroutine pool, let’s move on to writing the functionality that assigns tasks to coroutines.

func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
Copy the code

The allocate function above generates a maximum of 998 pseudo-random numbers as input arguments, uses the random numbers to create the Job structure, and writes them to the Jobs channel with the I of the for loop counter as id. It closes the Jobs Channel after writing all the tasks.

The next step is to create a function to read the Results Channel and print it out.

func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
Copy the code

The result function reads the Results Channel and prints the task ID, entering a random number and the sum of the random numbers. The result function writes true to the Done Channel after printing all the results.

With that in mind, let’s concatenate all the above functions using the main function.

func main(a) {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")}Copy the code

In line 2 we first store the execution startTime of the program, and in the last line (line 12) we calculate the time difference between endTime and startTime and display the total running time of the program. This is necessary because we will do some benchmarking by changing the number of Goroutines.

NoOfJobs is set to 100, and then allocate is called to add the task to the Jobs Channel.

The Done Channel is then created and passed to the Results Channel so that it can start printing output and notify when it has printed everything.

Finally, a pool of 10 Work Goroutines is created by calling createWorkerPool, and main blocks until the Done Channel writes true and prints all results.

Below is the complete code.

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {
    id       int
    randomno int
}
type Result struct {
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)
var results = make(chan Result, 10)

func digits(number int) int {
    sum := 0
    no := number
    forno ! =0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main(a) {
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")}Copy the code

Run in playgroud

Please run this program on your local computer to calculate the total time more accurately.

The program will print,

Job id 1, input random no 636, sum of digits 15
Job id 0, input random no 878, sum of digits 23
Job id 9, input random no 150, sum of digits 6
...
total time taken  20.01081009 seconds
Copy the code

For 100 tasks, a total of 100 lines will be printed, and the total time spent running the program will be printed on the last line. Your output will be different from mine because Goroutines can run in any order and the total time will vary by hardware. In my case, the program completed in about 20 seconds.

Now let’s increase the noOfWorkers in the main function to 20. We doubled the number of workers. As the Work Goroutines have increased, the total time required for the program to complete should decrease. In my case, it becomes 10.004364685 seconds, and the program prints,

. Total Time taken 10.004364685 secondsCopy the code

Now we know that as the number of Work Goroutines increases, the total time required to complete a task decreases. I leave it as an exercise for you to execute and analyze the results using different noOfJobs and noOfWorkers values in the main function.