With waitGroup, it was possible to control the execution of tasks concurrently, and it was possible to guarantee the execution of tasks, but whether the task was successful or not, if it failed, the error message was lost on the coroutine run stack, which made it difficult to trace the error.

This is where we need the ErrGroup.

Analysis of the

Actually think very simple, after all, it is also concurrent execution, the biggest feature is that you can spit out err.

type Group struct {
	cancel func(a)

	wg sync.WaitGroup

	errOnce sync.Once
	err     error
}

func (g *Group) Go(f func(a) error) {
  // The waitGroup is still in effect
	g.wg.Add(1)

	go func(a) {
    // It must be closed regardless of the error
		defer g.wg.Done()

		iferr := f(); err ! =nil {
     	// Only the first error will be recorded
			g.errOnce.Do(func(a) {
        // An incorrect assignment occurred
				g.err = err
				ifg.cancel ! =nil {
          // If there is an error, log the first error message and close the context
					g.cancel()
				}
			})
		}
	}()
}
Copy the code

The Group uses waitGroup to control the concurrency of the Goroutine, and the member variable err to record errors that occur during the operation. Here, only the error value returned for the first time is recorded. [Note that only the first error is recorded. Sync.once will only be executed Once]

How to use it?

// There are two types of creation
eg1 := new(errgroup.Group)
eg2, _ := errgroup.WithContent(context.Background())

/ / execution
for _,v := range urls {
    eg2.Go(func(a)error{
        resp,err := http.Get(v)
        iferr ! =nil {
            resp.Body.Close()
        }
        ...
        return err
    })
}

iferr := eg2.Wait(); err ! =nil {
    fmt.Println(err)
}
Copy the code

You create it with new or withContext, it’s the same thing, but WithContent comes with a cancel for CTX.


An extreme example would be a task with 10,000 subtasks, which means 10,000 Goroutines. This is a significant cost to the service. The native problem is that concurrency cannot be controlled.

So let’s see how this can be controlled:

func (g *Group) GOMAXPROCS(n int) {
	if n <= 0 {
		panic("errgroup: GOMAXPROCS must great than 0")}// Execute it once
	g.workerOnce.Do(func(a) {
		g.ch = make(chan func(context.Context) error.n)
		for i := 0; i < n; i++ {
			go func(a) {
				for f := range g.ch {
          // Do the actual execution
					g.do(f)
				}
			}()
		}
	})
}
Copy the code
  1. Ensure that coroutine control is executed only once, otherwisechannelControl is chaotic
  2. withchannelControl the execution of tasks
  3. Always carry the upper level in any mediumctx

In summary, the new Group is designed:

type Group struct {
	err     error
	wg      sync.WaitGroup
	errOnce sync.Once

	workerOnce sync.Once
	// Coroutine concurrency control
	ch         chan func(ctx context.Context) error
  // The function to be executed in the blocking coroutine is cached and can be executed again
	chs        []func(ctx context.Context) error

	ctx    context.Context
	cancel func(a)
}
Copy the code

There is also CHS, which is used as a cache for unexecuted functions. Since it is unexecuted, it will be executed at the end of Wait() :

func (g *Group) Wait(a) error {
	ifg.ch ! =nil {
    // Pull it from the cache, which will be consumed in GOMAXPROCS
		for _, f := range g.chs {
			g.ch <- f
		}
	}
  // Wait for all tasks to be consumed
	g.wg.Wait()
  // Close the channel and cancel the context
	ifg.ch ! =nil {
		close(g.ch) // let all receiver exit
	}
	ifg.cancel ! =nil {
		g.cancel()
	}
	return g.err
}
Copy the code

Of course, how to use it? Specifically, the call to GOMAXPROCS() :

func main(a) {
  g := Group{}
  g.GOMAXPROCS(5)
  g.Go(a)
  g.Go(b)
  g.Go(c)
  g.Go(d)
  g.Wait()
}

func a(a) (context.Context) error {
  time.Sleep(time.Second)
 	return nil
}
Copy the code
  1. Manually set the number of concurrent executions
  2. Each subtask execution should carrycontextthecontextCan be carried down from above