Mutex Mutex is used to allow coroutines to access a Mutex resource. This article disassemble Mutex source code into three parts, respectively from the “mutual exclusion”, “increase the opportunity to obtain locks in the operation of the coroutine”, “solve the problem of hunger” three aspects of Mutex principle.

The mutex

Mutex Mutex is used to access mutually exclusive resources by coroutines. It contains Lock() and Unlock() methods. Where, Lock() is used to compete for Lock resources. At the same time, multiple coroutines will execute Lock() to compete for Lock resources. Unlock() is used to release lock resources and wake up coroutines in the wait queue. In normal use (as shown in the code), only coroutines entering critical sections execute Unlock(). Therefore, only one coroutine can execute the Unlock() method at a time.

var lock sync.Mutex

func Hello1(a){
  lock.Lock()
  fmt.Println("Hello")
  lock.Unlock()
}

func Hello2(a){
  lock.Lock()
  defer lock.Unlock()
  fmt.Println("Hello")}Copy the code

Waiting queue

When the coroutine fails to compete for the Lock resource by calling the Lock() method, the coroutine enters the wait queue and switches to the blocking state. When the coroutine holding the lock resource calls Unlock() to release the lock resource, the corresponding coroutine is awakened from the wait queue. Mutex Mutex waiting queue is achieved by balanced tree, its source location in $GOROOT/SRC/runtime/sema. Go. Waiting queue uses semaphore sema to distinguish different mutex, and supports FIFO and LIFO

// enter the queue and block the current coroutine, lifo=true to the first queue, lifo=false to the last queue
func runtime_SemacquireMutex(sema *uint32, lifo bool, skipframes int)

// Wake up a coroutine from the wait queue
func runtime_Semrelease(sema *uint32, handoff bool, skipframes int)
Copy the code

The source code parsing

Mutex Mutex in solving a problem, on the basis of mutually exclusive added “increases the chances of running coroutines for lock” and “solve the problem of hunger” features. Next, this paper will decompose Mutex from three aspects of “mutual exclusion”, “increasing the chance of obtaining locks by running coroutines” and “solving hunger problem”.

Mutex structure

The Mutex Mutex has two fields: state and SEMa. The state field indicates the lock status. Sema semaphore can be understood as a unique identifier for a mutex that is used to put coroutines into and wake them up from wait queues.

type Mutex struct {
  state int32 		// State variable, which represents the state of the lock (number of waiting coroutines, starvation state, whether there are running coroutines, whether it is held)
	sema  uint32		// The semaphore used to wake up the coroutine from the wait queue
}

const (
    // Lock identifier bit (last bit of state)
    // mutex. state&mutexLocked==1 indicates that the state is locked; Mutex.state&mutexLocked==0 indicates that no lock is in place
    mutexLocked = 1 << iota      
    // Is there a running coroutine (penultimate state)
    // mutex. state&mutexWoken==1 indicates a running coroutine; Mutex.state&mutexWoken==0: No running coroutine exists
    mutexWoken
    // Hungry state bit (third from bottom of state)
    State&mutexmedicine ==1; Mutex.state& mutexmedicine ==0 means no hunger
    mutexStarving
    // Offset of number of waiters (value 3)
    // mutex. state a right-shift of three bits indicates the number of blocked coroutines in the wait queue, i.e., how many coroutines are blocked due to the lock
    mutexWaiterShift = iotaCopy the code

The mutex

The mutual exclusion of locks is implemented by the mutexLocked flag bit. When mutexLocked is 1, the lock is held, and other coroutines need to be queued. When mutexLocked is 0, the lock is not held, and the coroutine can use CAS to set the mutexLocked flag bit to 1 to capture the lock resources.

The Lock() method flows as follows:

  1. Read the lock state;
  2. Update the lock status through CAS operation. If the CAS operation fails, return to Step 1 and go to Step 3.
  3. Determine whether the current coroutine successfully holds the lock resource;
    1. Success -> ExitLock()Method to execute critical section code;
    2. Failure -> enter the wait queue, waiting to be woken up by the coroutine holding the locked resource; Then, perform Step 1 again.
// Lock Contention for Lock resources
func (m *Mutex) Lock(a) {
    // Lucky case: the current lock is not held by any coroutines, and the lock is successfully locked
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    // Read the lock status
    old := m.state
    for {
        new := old
        // Set the lock's new state to held
        new |= mutexLocked
        // If the lock is already held, wait for coroutines +1
        ifold&mutexLocked ! =0 {
            new+ =1 << mutexWaiterShift
        }
        / / CAS operation
        if atomic.CompareAndSwapInt32(&m.state, old, new) { 
            // The original state of the lock is not held, then the current coroutine successfully competed for the lock resource, and exits the for loop
            if old&(mutexLocked) == 0 { 
                break
            }
            // The lock is already held by another coroutine, and the current coroutine is queued
            runtime_SemacquireMutex(&m.sema, false.1)
            // Re-read the status of the lock from the coroutine awakened from the wait queue
            old := m.state
        } else { 
            // CAS failed to compete for the lock again
            old = m.state
        }
    }
}
Copy the code

The Unlock() method does the following:

  1. throughatomic.AddInt32(&m.state, -mutexLocked)Atomic operations switch the lock state to an unheld state.
  2. Determines whether the coroutine in the wait queue needs to be woken up. The coroutine is not awakened from the wait queue when the lock resource is already held or there are no longer waiting. In other cases, the coroutine is awakened from the wait queue by a CAS operation.
// Unlock releases lock resources
func (m *Mutex) Unlock(a) {
    // Set the lock to an unheld state
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // Lucky case: There is no coroutine waiting for the current lock resource
    if new= =0 {
        return
    }

    // There are coroutines waiting for the current lock resource and one of them needs to be woken up
    old := new
    for {
        // Lock resource has no waits -> No need to release wait coroutines
        // Lock resource already locked -> No longer need to release the lock resource waiting for the coroutine to grab it already held
        if old>>mutexWaiterShift == 0|| old&(mutexLocked) ! =0 {
            return
        }
        // Will wait for coroutine data -1 for the current locked resource
        new = (old - 1<<mutexWaiterShift)
        if atomic.CompareAndSwapInt32(&m.state, old, new) { / / CAS operation
            // Wake up a coroutine waiting for the currently locked resource
            runtime_Semrelease(&m.sema, false.1)
            return
        }
        old = m.state
    }
}
Copy the code

Increases the chance of a running coroutine acquiring a lock

To reduce the cost of context switching required to wake coroutines from the wait queue, Mutex Mutex is designed to increase the chance that a running coroutine will acquire the lock. This is achieved by the flag bit mutexWoken, and the flag bit mutexWoken is 1, indicating that there are currently running coroutines competing for lock resources. In this case, the Unlock() method will not wake up the coroutine from the wait queue to compete for the lock.

The Lock() method flows as follows:

  1. Read the lock state;
  2. Update by spin operationmutexWokenFlag bit (tellUnlock()Method already has a running coroutine contention lock, do not wake up coroutine);
  3. Update the lock status through CAS operation. If the CAS operation fails, return to Step 1 and go to Step 4.
  4. Determine whether the current coroutine successfully holds the lock resource;
    1. Success -> ExitLock()Method to execute critical section code;
    2. Failure -> enter the wait queue, waiting to be woken up by the coroutine holding the locked resource; Then, perform Step 1 again.
func (m *Mutex) Lock(a) {
    // Lucky case: the current lock is not held by any coroutines, and the lock is successfully locked
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    // the awoke variable indicates whether the current coroutine sets the mutexWoken flag in status
    awoke := false
    // The number of iterations used to determine whether the spin operation can be performed
    iter := 0
    old := m.state
    for {
        // When mutex is held by another coroutine, update the mutexWoken flag bit of status with spin
        // With the mutexWoken flag set, the unlock method no longer wakes up coroutines in blocks, reducing the cost of context switching
        if old&mutexLocked == mutexLocked && runtime_canSpin(iter) {
            // if the mutexWoken flag bit is not set and the number of wait coroutines for the lock is not 0; Use CAS operation to set mutexWoken flag bit;
            if! awoke && old&mutexWoken ==0&& old>>mutexWaiterShift ! =0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                // after the successful setting of mutexWoken, the awoke awoke was set to true, indicating that mutexWoken indicated the current coroutine setting
                awoke = true
            }
            runtime_doSpin()
            // Number of iterations of spin +1
            iter++
            old = m.state
            continue
        }

        new := old
        // The current lock resource is already held by another coroutine, and the waiting number of the lock resource coroutine is +1
        ifold&(mutexLocked) ! =0 {
            new+ =1 << mutexWaiterShift
        }
        // If the current coroutine is set to mutexWoken, clear the mutexWoken flag bit
        if awoke {
            new &^= mutexWoken
        }
        // CAS operation to obtain the lock
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // The original state of the lock is not held, then the current coroutine successfully competed for the lock resource, and exits the for loop
            if old&mutexLocked == 0 {
                break
            }
            // The current coroutine is blocked and queued
            runtime_SemacquireMutex(&m.sema, false.1)

            // Re-read the status of the lock from the coroutine awakened from the wait queue
            old = m.state
            // The awoke coroutines set the awoke awoke awoke flag to true by setting the mutexWoken flag in unlock
            awoke = true
            // Clear the number of spin iterations of the awakened coroutine
            iter = 0
        } else {
            old = m.state
        }
    }
}
Copy the code

The Unlock() method does the following:

  1. throughatomic.AddInt32(&m.state, -mutexLocked)Atomic operations switch the lock state to an unheld state.
  2. Determines whether the coroutine in the wait queue needs to be woken up. Coroutines are not awakened from the wait queue when “lock resource is already held” or “there are no longer waiting” or “there are already running coroutines robbing lock resource”. In other cases, the coroutine is awakened from the wait queue by a CAS operation.
func (m *Mutex) Unlock(a) {
    // Set the lock to an unheld state
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // Lucky case: There is no coroutine waiting for the current lock resource
    if new= =0 {
        return
    }

    old := new
    for {
        // Lock resource has no waits -> No need to release wait coroutines
        // Lock resource already locked -> No longer need to release the lock resource waiting for the coroutine to grab it already held
        // There are already running coroutine snatch lock resources -> No need to release waiting coroutines, reducing the cost of context switch
        if old>>mutexWaiterShift == 0|| old&(mutexLocked|mutexWoken) ! =0 {
            return
        }
        // Wake up the coroutine in the wait queue with the number of coroutines waiting -1 and mark the lock as "there are already running coroutine snatch lock resources"
        new = (old - 1<<mutexWaiterShift) | mutexWoken
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            runtime_Semrelease(&m.sema, false.1)
            return
        }
        old = m.state
    }
}
Copy the code

hunger

Every time the running coroutine is given a chance to acquire locks, it will cause that the coroutine that has entered the waiting queue cannot obtain lock resources all the time and will have hunger problem. The Mutex Mutex determines whether to go hungry based on the wait time of the coroutine; After entering the hungry state, the new coroutine no longer competes for locks, but directly enters the tail of the waiting queue, increasing the opportunity for the old coroutine to obtain locks. When all the coroutines in the hungry state at the head of the waiting queue successfully acquire the lock resource, the lock will exit the hungry state.

The Lock() method flows as follows:

  1. Read the lock state;
  2. Determine if the lock is hungrymutexStarving:
    1. Not hungry: Updated by spin operationmutexWokenSign a
    2. Hunger status: not updatedmutexWokenMark; willmutexWaiterShiftNumber of waiters +1, no competition for lock resources;
  3. Update the lock status through CAS operation. If the CAS operation fails, return to Step 1 and go to Step 4.
  4. Determine whether the current coroutine successfully holds the lock resource;
    1. Success -> ExitLock()Method to execute critical section code;
    2. Failure -> Determine if coroutine is competing for lock resource for the first time:
      1. First: Records the time of the coroutine’s first contention lockwaitStartTime; Enter the end of the wait queue;
      2. Non-first: enters the end of the wait queue
    3. When the coroutine wakes up from the wait queue, it determines whether it needs to switch to the hungry state. If the lock is already hungry, the awakened coroutine will no longer compete for the lock through the CAS operation, but will acquire the lock directly using the atomic operation (since only one awakened coroutine exists at a time, this operation is safe).
func (m *Mutex) Lock(a) {
    // Lucky case: the current lock is not held by any coroutines, and the lock is successfully locked
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }

    // waitStartTime records the time when the coroutine first competes for the lock
    var waitStartTime int64
    // Starving mode flag
    starving := false
    awoke := false
    iter := 0
    old := m.state
    for {
        // The lock is hungry, and the wake flag of the lock is not set on the spin mutexWoken
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        // ...
        }

        new := old

        // When the lock is held or in the hungry state, the current coroutine does not acquire the lock and enters the wait state directly
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        ifold&(mutexLocked|mutexStarving) ! =0 {
            new+ =1 << mutexWaiterShift
        }

        // Set the hunger state
        ifstarving && old&mutexLocked ! =0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }

        / / CAS operation
        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // mutexLocked: the original locks are not held -> Lock obtaining break
            // Mutexmedicine: the original state of the lock was starving -> Failed to obtain the lock, and the coroutine entered the waiting queue
            if old&(mutexLocked|mutexStarving) == 0 {
                break
            }

            // If waitStartTime is 0, it indicates that the coroutine is competing for lock resources for the first time
            // If waitStartTime is not 0, it means that the coroutine is awakened after entering the wait queue to compete for the lock resource again. Set queueLifo to true and enter the first position in the wait queue (LIFO, last in first out).queueLifo := waitStartTime ! =0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // Enter wait queue, coroutine enters block state
            runtime_SemacquireMutex(&m.sema, queueLifo, 1)

            // When the coroutine is awakened, it determines whether the coroutine is hungry based on the waiting time
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state

            // The lock is hungry, and the awakened coroutine gets the lock resource directly
            ifold&mutexStarving ! =0 {
                / / mutexLocked: locked
                // (-1 <
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                / /! Starving: The coroutine in front of the waiting queue is hungry. When the coroutine is not hungry, the hungry flag needs to be cleared
                // old>>mutexWaiterShift == 1: If the current coroutine is the last one in the queue, the hunger flag needs to be cleared; The reason is that when the coroutine is in the hungry state, the running coroutine directly enters the waiting queue, and only the awakened coroutine can obtain the lock. When the wait queue is empty, the hunger flag is not cleared in time, resulting in subsequent coroutines directly entering the wait queue. At the same time, no coroutine performs unlock operation, and all coroutines enter the blocked state.
                if! starving || old>>mutexWaiterShift ==1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }

            awoke = true
            iter = 0
        } else {
            old = m.state
        }
    }
}
Copy the code

The Unlock() method does the following:

  1. throughatomic.AddInt32(&m.state, -mutexLocked)Atomic operations switch the lock state to an unheld state.
  2. To determine whether or not you are hungry:
    1. Not hungry: Determines whether the coroutine in the wait queue needs to be awakened. Coroutines are not awakened from the wait queue when “lock resource is already held” or “there are no longer waiting” or “there are already running coroutines grabbing lock resource” or “in the hungry state”. In other cases, the coroutine is awakened from the wait queue by a CAS operation.
    2. Hungry: Wakes up the coroutine directly from the head of the wait queue.
func (m *Mutex) Unlock(a) {
    // Set the lock to an unheld state
    new := atomic.AddInt32(&m.state, -mutexLocked)
    // Lucky case: There is no coroutine waiting for the current lock resource
    if new= =0 {
        return
    }


    if new&mutexStarving == 0 {
        // Non-hunger mode
        // The coroutine awakened by unlock needs to compete with the running coroutine, so CAS is used to determine the lock state
        old := new
        for {
            if old>>mutexWaiterShift == 0|| old&(mutexLocked|mutexWoken|mutexStarving) ! =0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false.1)
                return
            }
            old = m.state
        }
    } else {
        // The lock is hungry and wakes up the first coroutine in the waiting queue
        // In the hungry state, the non-first contended coroutine is at the head of the queue, and the first contended coroutine is at the tail of the queue
        runtime_Semrelease(&m.sema, true.1)}}Copy the code

The last

The source code for Mutex is in $GOROOT/ SRC /sync/mutex.go. In order to better explain the principle of Mutex, this article code to Mutex source code has been deleted.