This article is translated

Original address: golangbot.com/buffered-ch…

Work pool

One of the important uses of buffer channels is the implementation of working pools. Typically, a work pool is a collection of threads waiting for tasks to be assigned to them. Once the assigned task is completed, they can be used again for the next task. We will implement the working pool using buffer channels. Our work pool will perform the task of finding the sum of the bits of the input number. For example, if 234 were passed, the output would be 9 (2 + 3 + 4). The input to the work pool will be a pseudo-random list of integers.

Here are the core functions of our work pool:

  • Create a Goroutine pool that listens on the input buffer channel waiting for an allocation job
  • Add the job to the input buffer channel
  • Write the result to the output buffer channel when the job is complete
  • Read and print the results from the output buffer channel

We will write this program step by step to make it easier to understand.

The first step will be to create a structure that represents work and results.

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}
Copy the code

Each Job structure has an ID and a Randomno for which the sum of the numbers must be computed.

The Result structure has a job field that is the job that holds the Result (the sumof individual numbers) in the sumofDigits field.

The next step is to create a buffer channel for receiving jobs and writing output.

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10) 
Copy the code

The Worker Goroutines listens for new tasks on the job buffer channel. When the task is complete, the results are written to the results buffer channel.

The digits function below does the actual work, finding the sum of the digits of the integer and then returning it. We will add 2 seconds of sleep to this feature to simulate the fact that it takes some time to compute the results.

func digits(number int) int {  
    sum := 0
    no := number
    forno ! =0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
Copy the code

Next, we’ll write a function that creates the worker program Goroutine.

func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
Copy the code

The above function creates a worker that reads from the job channel, creates a Result structure with the return value of the current job and the digits function, and writes the Result to the Results buffer channel. This function takes WaitGroup WG as an argument and calls the Done() method on it when all the jobs are finished.

The createWorkerPool function creates the worker Goroutine pool.

func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
Copy the code

The above function will create the size of the working pool as an argument. Before the Goroutine is created to increment the WaitGroup counter, it calls WG.add (1). It then creates worker Goroutines by passing a pointer to the WaitGroup WG to the worker function. Once the required worker Goroutines are created, it waits for all Goroutines to complete execution by calling WG.wait (). After all of the Goroutines have finished executing, it closes the result channel because all of the Goroutines have finished executing, and no one else writes further to the result channel.

Now that we have the work pool ready, let’s move on to writing functions that can assign jobs to workers.

func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
Copy the code

The assignment function above takes the number of jobs to be created as input arguments, generates a pseudorandom number with a maximum value of 998, creates the Job structure using the random number and the for loop counter I as id, and then writes it to the Job channel. After all jobs have been written, it closes the job channel.

The next step will be to create a function that reads the result channel and prints out the output.

func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
Copy the code

The result function reads the Results channel and prints the job ID, entering the sum of the random number and the number of bits. The result function also takes the Done channel as an argument, to which it will write once it prints all the results.

We have prepared everything. Let’s continue with the final step of calling all of these functions from the main function.

func main(a) {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")}Copy the code

We first store the startTime of execution of the program in the mian function, then calculate the time difference between endTime and startTime in the last line and display the total time spent by the program. This is necessary because we will be doing some benchmarking by changing the number of Goroutines.

Set noOfJobs to 100, and then call ALLOCATE to add the job to the job channel.

The completion channel is then created and passed to the result Goroutine so that it can start printing output and notify when everything is printed.

Finally, a pool of 10 auxiliary goroutines is created by calling the createWorkerPool function, and Main waits on the Done channel for all the results to be printed.

This is the complete procedure for your reference. I also imported the necessary software packages.

package main

import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

func digits(number int) int {  
    sum := 0
    no := number
    forno ! =0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main(a) {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")}Copy the code

Run this program on your local computer to calculate the total time more accurately.

Program output:

Job id 1, input random no 636, sum of digits 15  
Job id 0, input random no 878, sum of digits 23  
Job id 9, input random no 150, sum of digits 6  
...
total time taken  20.01081009 seconds  

Copy the code

A total of 100 lines are printed corresponding to the 100 jobs, and then finally the total time spent running the program is printed in the last line. Your output will be different from mine because Goroutines can run in any order, and the total time will vary depending on the hardware. In my case, the program takes about 20 seconds to complete.

Now let’s increase the noOfWorkers in the main function to 20. We doubled the number in the working pool. Since the work Goroutines have increased (or doubled, to be exact), the total time required to complete the program should have decreased (or halved, to be exact). In my case, it became 10.004364685 seconds, and the program was printed out,

. Total Time taken 10.004364685 secondsCopy the code

Now we can understand that as the number of workers Goroutine increased, the total time required to complete the work decreased. I’ll use this as an exercise for you to use noOfJobs and noOfWorkers in the main function to get different values and analyze the results.