The Goroutine coroutine provided by Go makes it easy to write multithreaded programs, but how to effectively control these concurrent goroutines is something we need to explore. The Go library provides synchronization primitives that control data security between Goroutines in lock and atomic operations, while WaitGroup, Channel, and Context control their concurrent behavior, as explained by Small Kitchen Knife in Golang Overview of Concurrency Control. The realization principle of lock, atomic operation and channel has been analyzed in detail. For this article, we will focus on WaitGroup.

I met WaitGroup

WaitGroup is the content under the SYNC package that controls synchronization between coroutines. The WaitGroup usage scenario, as the name implies, is used when we need to wait for a set of coroutines to complete before we can do any further processing.

func main(a) {
	var wg sync.WaitGroup

	wg.Add(2) //worker number 2

	go func(a) {
		// worker 1 do something
		fmt.Println("Goroutine 1 done!)
		wg.Done()
	}()

	go func(a) {
		// worker 2 do something
		fmt.Println("Goroutine 2 done!)
		wg.Done()
	}()

	wg.Wait() // wait all waiter done
	fmt.Println("all work done!")}// output
goroutine 2Done! goroutine1Done! All the work done!Copy the code

As you can see, WaitGroup is very simple to use, providing three methods. Although there is no parent-child relationship between goroutines, this article will refer to goroutines that call Wait as primary goroutines and goroutines that call Done as child Goroutines for ease of understanding.

func (wg *WaitGroup) Add(delta int)/ / addWaitGroupThe childgoroutinecount
func (wg *WaitGroup) Done(a)/ / whengoroutineWhen the task is complete, reduce the count by 1
func (wg *WaitGroup) Wait(a)// Blocks the call to this methodgoroutine, until the count is 0
Copy the code

So how does it work? In the source code SRC /sync/waitgroup.go, we can see that the core source code is less than 100 lines, which is very concise and well worth learning.

Front knowledge

Less code does not mean the implementation is simple and easy to understand. Conversely, without the prior knowledge described below, the reader will struggle to truly understand the implementation of WaitGroup. Before parsing the source code, let’s go over this (if you already know it, you can skip to the source code parsing section below).

A semaphore

In learning about operating systems, we learned that semaphores are a mechanism for protecting shared resources and solving multithreaded synchronization problems. The semaphore S is a global variable with non-negative integer values that can only be handled by two special operations, called P and V.

  • P(s)If:sNon-zero, thenPwillsSubtract 1 and return immediately. ifsZero, suspends the thread untilsBecomes non-zero and waits for another executionV(s)The thread of the operation wakes up the thread. After waking up,POperation willsSubtract one and return control to the caller.
  • V(s):VOperation willsAdd 1. If there are any threads blocking inPWaiting for operationsBecomes non-zero, thenVThe action wakes up one of these threads, which then willsMinus 1 to finish itPOperation.

In the underlying semaphore function of Go

  • runtime_Semacquire(s *uint32)The function blocks the Goroutine until the semaphoresIs greater than 0, and then atomically subtracts from that, which isPOperation.
  • runtime_Semrelease(s *uint32, lifo bool, skipframes int)Function atomicity increases the value of the semaphore, which is then notified byruntime_SemacquireBlocked goroutine, i.eVOperation.

These two semaphore functions are not only used in WaitGroup, but in Go’s Ingenious Mutex Design, we found that Go is also involved in the design of the mutex.

Memory alignment

For the following structure, can you answer how much memory it takes up

type Ins struct {
	x bool  // 1 byte
	y int32 // 4 bytes
	z byte  // 1 byte
}

func main(a) {
	ins := Ins{}
	fmt.Printf("ins size: %d, align: %d\n", unsafe.Sizeof(ins), unsafe.Alignof(ins))
}

//output
ins size: 12, align: 4
Copy the code

Given the size of the fields in the structure, the INS object should be 1+4+1=6 bytes, but it is actually 12 bytes due to memory alignment. From the article “THE IMPACT of CPU cache architecture on Go program”, we know that CPU memory reads are not read byte by byte, but piece by piece. Thus, computer loading or writing is efficient when the values of the types are aligned in memory.

The length of memory in an aggregate type (structure or array) may be greater than the sum of its elements. The compiler adds unused memory addresses to fill memory gaps to ensure that contiguous members or elements are aligned to the starting addresses of structures or arrays.

Therefore, when we design structures, we can save more memory space by defining members of the same type in adjacent locations when they are of different types.

Atomic operated CAS

CAS is a kind of atomic operation, which can be used to realize uninterrupted data exchange operation in multithreaded programming, so as to avoid data inconsistency caused by uncertain execution order and unpredictability of interruption when multithreading simultaneously overwrites a certain data. This operation compares the value in memory with the specified data, and replaces the data in memory with the new value if the value is the same. The low-level implementation of atomic operations in Go is described in detail in the little Chopper article “The Foundation of Synchronization Primitives”.

Shift operation >> and <<

In previous articles on locking, “Go’s Subtle Mutex Design” and “Go’s More granular Read/write Lock Design,” we saw a lot of bit operations. Flexible bit operation, can let an ordinary number change a rich meaning, here is only introduced in the following will be used in the shift operation.

For the left shift operation <<, all digits are moved to the left by the corresponding digit in binary form, the high digit is discarded, and the low digit is filled with zeros. So without spilling out the numbers, moving one place to the left is the same thing as multiplying by 2 to the first, and moving n places to the left is the same thing as multiplying by 2 to the n.

For the right shift operation, all the digits are moved to the right by the corresponding digit in binary form, the lower digit is moved out, and the higher digit is filled with the sign bit. Moving one place to the right is the same thing as dividing by 2, moving n places to the right is the same thing as dividing by 2 to the n. I’m taking the quotient, and I don’t have the remainder.

Shift operations can also have very clever operations, and we’ll see more advanced uses of shift operations later.

Unsafa. Pointer and uintptr

Pointers in Go can be divided into three categories: 1. Pointers of common type *T, such as *int; 2. Unsafe.Pointer; 3. Uintptr.

  • *T: Ordinary pointer type, used to pass the address of the object, cannot be used for pointer calculation.
  • Pointer: A generic Pointer. A Pointer of type *T can be converted to an unsafe.Pointer, and a Pointer of type *T does not have to be the same as the original Pointer. But it can’t do pointer calculations and can’t read values in memory (only by converting to an ordinary pointer of some concrete type).
  • Uintptr: The Uintptr isn’t exactly a pointer, it’s an unsigned integer of undefined size. Unsafe.Pointer can be converted to uINptr. Since uINptr stores the value of the address to which the Pointer points, Pointer operations can be performed using that value. The Uintptr is not used as a pointer during GC, and the UintpTR type target is reclaimed.

Unbroadening.Pointer is a bridge that allows ordinary Pointers of any type to be converted to each other and to be converted to a Uintptr for Pointer operations. However, the unsafe.Pointer and the unsafe.Pointer conversion would allow us to write arbitrary values into memory, which would break Go’s original type system, and since not all values are valid memory addresses, the conversion from uintptr to unsafe.Pointer would also break the type system. Therefore, since Go defines this package as unsafe, it should not be used arbitrarily.

The source code parsing

This article is based on Go source version 1.15.7

The structure of the body

The structure of sync.waitGroup is defined as follows, which includes a secondary field for noCopy and a state1 field with compound meaning.

type WaitGroup struct {
	noCopy noCopy

	// 64-bit value: high 32 bits are counter, low 32 bits are waiter count.
	// 64-bit atomic operations require 64-bit alignment, but 32-bit
	// compilers do not ensure it. So we allocate 12 bytes and then use
	// the aligned 8 bytes in them as state, and the other 4 as storage
	// for the sema.
	state1 [3]uint32
}

// state returns pointers to the state and sema fields stored within wg.state1.
func (wg *WaitGroup) state(a) (statep *uint64, semap *uint32) {
  // 64-bit compiler addresses are divisible by 8 to determine 64-bit alignment
	if uintptr(unsafe.Pointer(&wg.state1))%8= =0 {
		return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]}else {
		return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]}}Copy the code

The noCopy field is an empty structure that does not consume memory and is not byte populated by the compiler. It is designed to do static compilation checks with the Go Vet tool to prevent developers from copying WaitGroup while using it for security purposes. Read The No Copy Mechanism for more details.

The state1 field is a uint32 array of 3 characters. It is used to represent three parts: 1. The count value of the child Goroutine set by Add(); 2. 2. Number of waiters blocked by Wait(); 3. Semaphore semap.

Since subsequent operations are performed on a STATep of type uint64, atomic operations on 64-bit integers require 64-bit alignment, the 32-bit compiler does not guarantee this. Therefore, the composition meaning of the state1 field is different in 64-bit and 32-bit environments.

Note that when we initialize a WaitGroup object, its counter value, waiter value, semap value are all 0.

The Add function

The input to the Add() function is an integer that can be positive or negative and is a change to the value of counter. If the counter value changes to 0, all waiters blocked in Wait() will be awakened; If the value of counter is negative, panic will occur.

We will remove the race detection part of the code, Add() function implementation source code is as follows

func (wg *WaitGroup) Add(delta int) {
  // Get the semAP containing counter and waiter
	statep, semap := wg.state()
	state := atomic.AddUint64(statep, uint64(delta)<<32)
	v := int32(state >> 32)
	w := uint32(state)
  
	if v < 0 {
		panic("sync: negative WaitGroup counter")}ifw ! =0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")}if v > 0 || w == 0 {
		return
	}

	if*statep ! = state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// if we go to this, we must say counter=0, waiter>0
  // Add(-x) goroutine
  // Its execution means that all the child Goroutines have completed the task
  // Therefore, we need to return all the compound states to 0 and release the semaphore of waiter
	*statep = 0
	for; w ! =0; w-- {
    // Release the semaphore once to wake up a blocked waiter
		runtime_Semrelease(semap, false.0)}}Copy the code

The code is very lean, so let’s break down the key parts.

 	state := atomic.AddUint64(statep, uint64(delta)<<32)  // Add counter value delta
	v := int32(state >> 32)   // Get the value counter
	w := uint32(state)        // Get waiter
Copy the code

In this case, statep is a uint64 value. If statep contains 2 counter, waiter is 1, and delta is 1, the three lines of code are shown in the following figure.

After obtaining the current counter number v and waiter number w, we will judge their values in several cases.

	// Case 1: This is a very rudimentary error. The value of counter cannot be negative
  if v < 0 {
		panic("sync: negative WaitGroup counter")}// Situation 2: Misuse and misuse cause panic
  // Because WG can actually reuse, but the basis for reuse is to reset all states to 0
	ifw ! =0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Case 3: The Add operation is only responsible for increasing the value of counter.
  // If counter is greater than 0, wake up is left to subsequent Add callers (Add(negative int))
  // If waiter is 0, waiter is not blocked yet
	if v > 0 || w == 0 {
		return
	}

  // Situation 4: Misuse and misuse cause panic
	if*statep ! = state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}Copy the code

In terms of misuse and Flue causing panic, it is actually quite difficult to explain without example error codes. The good news is that the Go source code provides examples of incorrect use in the SRC /sync/waitgroup_test.go file. Those who want to learn more should look at the following three test functions.

func TestWaitGroupMisuse(t *testing.T)
func TestWaitGroupMisuse2(t *testing.T)
func TestWaitGroupMisuse3(t *testing.T)
Copy the code
The Done function

The Done() function is simpler, calling Add(-1). In practice, the Done() function should be called when the child Goroutine task completes.

func (wg *WaitGroup) Done(a) {	wg.Add(- 1)}
Copy the code
The Wait function

If the value of counter in WaitGroup is greater than zero, the main goroutine that performs Wait() increases waiter by one and blocks until it reaches zero before further code can proceed.

We will remove the race detection part of the code, Wait() implementation of the source code as follows

func (wg *WaitGroup) Wait(a) {	statep, semap := wg.state()	for {		state := atomic.LoadUint64(statep) Statep V := int32(state >> 32) // Obtain counter value W := uint32(state) // Obtain waiter value // If v==0, it indicates that there are no sub-Goroutine tasks to be executed. If v == 0 {return} // If no other goroutine changes the compound state between performing the CAS atomic operation and reading the compound state // then add waiter +1, otherwise: To the next cycle, Reread composite state if atomic.Com pareAndSwapUint64 (statep, state, State +1) {// Wait for Add to call runtime_Semrelease to wake up runtime_Semacquire(semap) // IN FLUE causes panic // *statep = 0 when the goroutine is awakened // the goroutine has been reset by the *statep = 0 statement when the goroutine is awakened // the goroutine has been reset by the *statep = 0 statement when the goroutine is awakened // The goroutine has been reset by the *statep = 0 statement when the goroutine is awakened. If *statep! = 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } }}
Copy the code

conclusion

To understand the source implementation of WaitGroup, you need some prior knowledge of semaphores, memory alignment, atomic operations, shifts, and pointer conversions.

The state1 structure maintains two counters and a semaphores. The counters are the count counter of the child goroutine added by Add(), and the waiter is blocked by Wait(). The semaphore is used to block and wake up the Waiter. When Add(positive n) is executed, counter +=n indicates that n sub-Goroutines are added to perform tasks. After each child of goroutine has completed its task, the Done() function will be called to decrease counter by 1. When the last child of goroutine has completed, counter will be 0 and the Waiter will be blocked in the Wait() call.

However, there are a few things to note when using WaitGroup

  • throughAdd()The number of counters added to the function must be consistent with subsequent onesDone()The subtractions are the same. If the former is large, then the block isWait()The Goroutine at the call will never wake up; If the latter is large, panic will occur.
  • Add()The delta function should be executed first.
  • Do not copy WaitGroup objects for use.
  • If you want to reuse WaitGroup, you must reuse WaitGroup in all previousWait()Make a new one after the call returnsAdd()The call.