infrastructure

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
	state int32
	sema  uint32
}
Copy the code

State (signed 32-bit integer) :

High 29 bits: currently blocked degroutetine Number Low 3 bits: hungry, 0 normal mo mode, 1 Hungry Mode low 2 bits: Awakened, 0 unawakened, 1 awakened Low 1 bit: locked, 0 available, 1 occupiedCopy the code

Sema: semaphore

Several constants

We’re going to use it later, but we know for a moment what each variable is

  • MutexLocked =1 << iota // =1 indicates locked
  • mutexWoken // =2
  • mutexStarving // =4
  • mutexWaiterShift = iota // =3
  • StarvationThresholdNs = 1e6 // 1 ms

Runtime Method Introduction

So just to give you an idea of what each Runtime method is, right

  • runtime_canSpin

    In the Runtime package, runtime_canSpin limits the number of CPU cores passed to golang to be greater than 4, the maximum logical processor to be greater than 1, at least one local P queue, and the local P queue to run G queue empty (maximum logical processor, The local P and G queues can be learned by themselves.

    func sync_runtime_canSpin(i int) bool {
    	// sync.Mutex is cooperative, so we are conservative with spinning.
    	// Spin only few times and only if running on a multicore machine and
    	// GOMAXPROCS>1 and there is at least one other running P and local runq is empty.
    	// As opposed to runtime mutex we don't do passive spinning here,
    	// because there can be work on global runq or on other Ps.
    	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
    		return false
    	}
    	ifp := getg().m.p.ptr(); ! runqempty(p) {return false
    	}
    	return true
    }
    Copy the code
  • runtime_doSpin

    The procyield function is called, which is also implemented in assembly language. The function loops internally to invoke the PAUSE directive. PAUSE means to do nothing, but consumes CPU time, and does not optimize the PAUSE instruction unnecessarily when it is executed.

    
    //go:linkname sync_runtime_doSpin sync.runtime_doSpin
    func sync_runtime_doSpin(a) {
    	procyield(active_spin_cnt)
    }    
    Copy the code

The Lock Lock

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexStarving
	mutexWaiterShift = iota

	// Mutex fairness.
	//
	// Mutex can be in 2 modes of operations: normal and starvation.
	// In normal mode waiters are queued in FIFO order, but a woken up waiter
	// does not own the mutex and competes with new arriving goroutines over
	// the ownership. New arriving goroutines have an advantage -- they are
	// already running on CPU and there can be lots of them, so a woken up
	// waiter has good chances of losing. In such case it is queued at front
	// of the wait queue. If a waiter fails to acquire the mutex for more than 1ms,
	// it switches mutex to the starvation mode.
	//
	// In starvation mode ownership of the mutex is directly handed off from
	// the unlocking goroutine to the waiter at the front of the queue.
	// New arriving goroutines don't try to acquire the mutex even if it appears
	// to be unlocked, and don't try to spin. Instead they queue themselves at
	// the tail of the wait queue.
	//
	// If a waiter receives ownership of the mutex and sees that either
	// (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms,
	// it switches mutex back to normal operation mode.
	//
	// Normal mode has considerably better performance as a goroutine can acquire
	// a mutex several times in a row even if there are blocked waiters.
	// Starvation mode is important to prevent pathological cases of tail latency.
	starvationThresholdNs = 1e6
)

// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock(a) {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}

	var waitStartTime int64
	starving := false
	awoke := false
	iter := 0
	old := m.state
	for {
		// Don't spin in starvation mode, ownership is handed off to waiters
		// so we won't be able to acquire the mutex anyway.
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
			// Active spinning makes sense.
			// Try to set mutexWoken flag to inform Unlock
			// to not wake other blocked goroutines.
            if! awoke && old&mutexWoken ==0&& old>>mutexWaiterShift ! =0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
				awoke = true
			}
			runtime_doSpin()
			iter++
			old = m.state
			continue
		}
		new := old
		// Don't try to acquire starving mutex, new arriving goroutines must queue.
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}
		ifold&(mutexLocked|mutexStarving) ! =0 {
			new+ =1 << mutexWaiterShift
		}
		// The current goroutine switches mutex to starvation mode.
		// But if the mutex is currently unlocked, don't do the switch.
		// Unlock expects that starving mutex has waiters, which will not
		// be true in this case.
		ifstarving && old&mutexLocked ! =0 {
			new |= mutexStarving
		}
		if awoke {
			// The goroutine has been woken from sleep,
			// so we need to reset the flag in either case.
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")}new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// If we were already waiting before, queue at the front of the queue.queueLifo := waitStartTime ! =0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
			runtime_SemacquireMutex(&m.sema, queueLifo)
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
			old = m.state
			ifold&mutexStarving ! =0 {
				// If this goroutine was woken and mutex is in starvation mode,
				// ownership was handed off to us but mutex is in somewhat
				// inconsistent state: mutexLocked is not set and we are still
				// accounted as waiter. Fix that.
				ifold&(mutexLocked|mutexWoken) ! =0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
				delta := int32(mutexLocked - 1<<mutexWaiterShift)
				if! starving || old>>mutexWaiterShift ==1 {
					// Exit starvation mode.
					// Critical to do it here and consider wait time.
					// Starvation mode is so inefficient, that two goroutines
					// can go lock-step infinitely once they switch mutex
					// to starvation mode.
					delta -= mutexStarving
				}
				atomic.AddInt32(&m.state, delta)
				break
			}
			awoke = true
			iter = 0
		} else {
			old = m.state
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

Copy the code

The Lock source code is too long, let’s break it up (remember that the Lock method is used to compete for locks) :

  • Position 1

    // Fast path: grab unlocked mutex.
    	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    		if race.Enabled {
    			race.Acquire(unsafe.Pointer(m))
    		}
    		return
    	}
    Copy the code

    If the state is 0, no coroutine is holding the lock. The lock is obtained directly, and mutexLocked(=1) is assigned to state. For example, this is the state when the first goroutine request is made, or the state when the lock is idle.

  • Position 2

    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
    	// Don't spin in starvation mode, ownership is handed off to waiters
    	// so we won't be able to acquire the mutex anyway.
    	if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    		// Active spinning makes sense.
    		// Try to set mutexWoken flag to inform Unlock
    		// to not wake other blocked goroutines.
    		if! awoke && old&mutexWoken ==0&& old>>mutexWaiterShift ! =0 &&
    			atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    			awoke = true
    		}
    		runtime_doSpin()
    		iter++
    		old = m.state
    		continue
    	}
    Copy the code
    • WaitStartTime: indicates the wait time for this Goroutine
    • Linda: Is Ben Goroutine starving
    • A ~ d diet: Whether the goroutine was awakened
    • Iter: number of spins

    The first if is that the lock is already occupied, but in normal mode, the Goroutine can still spin a certain number of times. If both conditions are met, the current Goroutine can spin continuously to wait for the lock to be released, otherwise it goes into starvation mode.

    If state is not already set to woken during the second if spin, set its woken flag and mark itself awake

  • Position 3

    	new := old
    	// Don't try to acquire starving mutex, new arriving goroutines must queue.
    	if old&mutexStarving == 0 {
    		new |= mutexLocked
    	}
    	ifold&(mutexLocked|mutexStarving) ! =0 {
    		new+ =1 << mutexWaiterShift
    	}
    	// The current goroutine switches mutex to starvation mode.
    	// But if the mutex is currently unlocked, don't do the switch.
    	// Unlock expects that starving mutex has waiters, which will not
    	// be true in this case.
    	ifstarving && old&mutexLocked ! =0 {
    		new |= mutexStarving
    	}
    	if awoke {
    		// The goroutine has been woken from sleep,
    		// so we need to reset the flag in either case.
    		if new&mutexWoken == 0 {
    			throw("sync: inconsistent mutex state")}new &^= mutexWoken
    	}	
    Copy the code

    At this point, the state of state might be:

    1. The lock has not been released and is in a normal state
    2. The lock has not been released. The lock is starving
    3. The lock has been released and is in normal state
    4. The lock has been released and the lock is starving

    And the awoke goroutine may be true or false()

    The first if, if the current lock state is in the hungry state, does not set the new state lock, because the hungry state lock is directly transferred to the first waiting queue. If the state of the current lock is in normal mode, new state sets the lock.

    The second if increases the number of waits in the queue by one

    The third if, if the goroutine is already hungry and the old state is already locked, marks the new state as hungry and changes the lock to hungry.

    Fourth, if this goroutine is already set to wake up, the new state wakeup flag needs to be cleared because this goroutine is either locked or asleep. The new state of total state is no longer woken.

  • Position 4

    if atomic.CompareAndSwapInt32(&m.state, old, new) {
    		if old&(mutexLocked|mutexStarving) == 0 {
    			break // locked the mutex with CAS
    		}
    		// Set/calculate the wait time for this goroutinequeueLifo := waitStartTime ! =0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            
            // Since the lock is not available, block this goroutine with the sleep primitive
            QueueLifo =false queueLifo=false queueLifo=false queueLifo=false
            QueueLifo =true to join the head of the waiting queue
            runtime_SemacquireMutex(&m.sema, queueLifo)
            
            // After sleep, the goroutine is awakened
            // Calculate whether the goroutine is already hungry.
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            // Get the current lock status
            old = m.state
            
            // If the current state is already hungry
            // The lock should be in Unlock state, so the lock should be handed directly to the goroutine
    		ifold&mutexStarving ! =0 { 
                
                // If the current state is locked, or is marked as wakeup, or the waiting queue is not empty,
                // Then state is an illegal state
                ifold&(mutexLocked|mutexWoken) ! =0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")}// The current goroutine is used to set the lock, and the number of goroutines waiting is reduced by 1.
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                
                // If bengoroutine is the last person waiting, or it is not hungry,
                // Then we need to set the lock state to normal mode.
                if! starving || old>>mutexWaiterShift ==1 {
                    // Exit starvation mode
                    delta -= mutexStarving
                }
                
                // Set the new state, since the lock has been obtained, exit, return
                atomic.AddInt32(&m.state, delta)
                break
            }
            
            // If the current lock is in normal mode, the goroutine is awakened, the spin count is cleared, and the for loop begins again
            awoke = true
            iter = 0
        } else { // If the CAS fails, retrieve the lock state and start again from the beginning of the for loop
            old = m.state
        }
    }
    Copy the code

    The first if, via CAS, sets the new state value. Note that the lock flag for new does not have to be true, but may simply indicate that the lock state is hungry

    The second if, if the old state is unlocked and the lock is not hungry, then the current goroutine has acquired ownership of the lock

Ulock releases the lock

func (m *Mutex) Unlock(a) {
    // If state is not in a locked state, then Unlock is not locked at all
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")}// Once the lock is released, other waiting parties need to be notified
    // If the lock is hungry, hand it to the first person in the waiting queue, wake it up, and let it get the lock
    // If the lock is in normal state,
    // new state If the status is normal
    if new&mutexStarving == 0 {
        old := new
        for {
            // If there is no waiting goroutine, or the lock is not idle, return directly.
            if old>>mutexWaiterShift == 0|| old&(mutexLocked|mutexWoken|mutexStarving) ! =0 {
                return
            }
            // Subtract the number of waiting Goroutines by one and set the woken flag
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            // Set the new state, where a semaphore will wake up a blocking goroutine to acquire the lock.
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // In starvation mode, lock ownership is directly transferred to the first person in the queue.
        // Note that state mutexLocked is not yet locked, and the awakened Goroutine will set it.
        // In the meantime, if a new goroutine requests the lock, mutex is still considered locked because mutex is hungry,
        // The new Goroutine will not take the lock.
        runtime_Semrelease(&m.sema, true)}}Copy the code