Reprint, please declare the source ~ this article published on luozhiyun blog: www.luozhiyun.com/archives/48…

This article uses the go source 15.7

This time I’m going to talk about signal-based preemptive scheduling.

introduce

Prior to the 1.14 version of Go, preemption trial scheduling was based on collaboration and required its own initiative to relinquish execution, but this was unable to deal with some edge cases that could not be preempted. For example, a for loop or garbage collection held up threads for a long time, some of which were not addressed by signal-based preemptive scheduling until 1.14.

Here’s an example to verify the preemption difference between 1.14 and 1.13:

package main

import (
	"fmt"
	"os"
	"runtime"
	"runtime/trace"
	"sync"
)

func main(a) {
	runtime.GOMAXPROCS(1)
	f, _ := os.Create("trace.output")
	defer f.Close()
	_ = trace.Start(f)
	defer trace.Stop()
	var wg sync.WaitGroup
	for i := 0; i < 30; i++ {
		wg.Add(1)
		go func(a) {
			defer wg.Done()
			t := 0
			for i:=0; i<1e8; i++ { t+=2
			}
			fmt.Println("total:", t)
		}()
	}
	wg.Wait()
}
Copy the code

In this example, call tracing of the execution process is done through Go Trace. Specify runtime.gomaxprocs (1) in the code to set the maximum number of CPU cores that can be used at the same time to 1, using only one P (processor), thus ensuring a single-processor scenario. A for loop is then called to open up 10 goroutines to execute the func function, which is purely computational and time-consuming, preventing goroutines from being idle for execution.

Let’s compile the program to analyze the trace output:

$go build -gcflags "-n -l" main.go -n disables optimization -l disables inline $./mainCopy the code

Then we get the trace.output file for visualization:

$ go tool trace -http=":6060" ./trace.output
Copy the code

Go1.13 trace analysis

As can be seen from the picture above:

  1. Because we’re limited to only one P, there’s only one Proc0 in the PROCS column;
  2. We started 30 goroutines in the for loop, so we can count the color boxes in Proc0, and there are exactly 30;
  3. The 30 goroutines in Proc0 are executed sequentially, one after the other, without preemption;
  4. Click on the details bar of any goroutines and you can see that the Wall Duration is about 0.23s, indicating that the goroutines have been executed for 0.23s, and the execution time of 10 goroutines is about 7s.
  5. Func1:20 (func); func1:20 (func);go func() ;
  6. End Stack Trace is main.main.func1:26, which in code is the func function that finally prints:fmt.Println("total:", t);

As can be seen from the above trace analysis, the cooperative scheduling of Go has no effect on the calcSum function. Once the execution starts, it can only wait until the execution ends. Each Goroutine takes 0.23s and cannot be preempted for execution.

Go 1.14 above trace analysis

Signal-based preemptive scheduling was introduced after Go 1.14, and as you can see from the graph above, the Proc0 column is full of goroutines being called during a switch. It’s no longer the case that once goroutines start, they have to wait until they finish.

The running time above is about 4s, which can be ignored because I am running on two machines with different configurations (mainly because I have trouble finding two machines with the same configuration).

Here’s a closer look at the details:

It can be seen from this detail that:

  1. The goroutine executes after 0.025s;
  2. Start Stack Trace is main.main.func1:21, as above;
  3. End Stack Trace is runtime.asyncPreempt:50. This function is executed when a preemption signal is received.

Analysis of the

Preempt signal installation

runtime/signal_unix.go

When the program starts, register the SIGURG signal handler runtime.dosigpreempt in runtime.sighandler.

initsig

func initsig(preinit bool) {
	// preinitialize
	if! preinit { signalsOK =true
	} 
	// Iterate over the signal array
	for i := uint32(0); i < _NSIG; i++ {
		t := &sigtable[i]
		// Skip SIGKILL, SIGSTOP, SIGTSTP, SIGCONT, SIGTTIN, SIGTTOU
		if t.flags == 0|| t.flags&_SigDefault ! =0 {
			continue}... setsig(i, funcPC(sighandler)) } }Copy the code

All semaphores are iterated through in the initsig function and registered by calling the setsig function. We can look at the sigtable global variable to see what information we have:

varsigtable = [...] sigTabT{/* 0 */ {0."SIGNONE: no trap"},
	/ * 1 * / {_SigNotify + _SigKill, "SIGHUP: terminal line hangup"},
	/ * 2 * / {_SigNotify + _SigKill, "SIGINT: interrupt"},
	/ * * / 3 {_SigNotify + _SigThrow, "SIGQUIT: quit"},
	/ * * / 4 {_SigThrow + _SigUnblock, "SIGILL: illegal instruction"},
	/ * * / 5 {_SigThrow + _SigUnblock, "SIGTRAP: trace trap"},
	/ * * / 6 {_SigNotify + _SigThrow, "SIGABRT: abort"},
	/ * * / 7 {_SigPanic + _SigUnblock, "SIGBUS: bus error"},
	/* 8 */ {_SigPanic + _SigUnblock, "SIGFPE: floating-point exception"},
	/* 9 */ {0."SIGKILL: kill"},
	/ * 10 * / {_SigNotify, "SIGUSR1: user-defined signal 1"},
	/ * * / {_SigPanic + _SigUnblock, "SIGSEGV: segmentation violation"},
	/ * * / 12 {_SigNotify, "SIGUSR2: user-defined signal 2"},
	/ * * / 13 {_SigNotify, "SIGPIPE: write to broken pipe"},
	/ * 14 * / {_SigNotify, "SIGALRM: alarm clock"},
	/ * * / 15 {_SigNotify + _SigKill, "SIGTERM: termination"},
	/ * 16 * / {_SigThrow + _SigUnblock, "SIGSTKFLT: stack fault"},
	/ * 17 * / {_SigNotify + _SigUnblock + _SigIgn, "SIGCHLD: child status has changed"},
	/ * * / {_SigNotify + _SigDefault + _SigIgn, "SIGCONT: continue"},
	/ * * / 19 {0."SIGSTOP: stop, unblockable"},
	/ * * / 20 {_SigNotify + _SigDefault + _SigIgn, "SIGTSTP: keyboard stop"},
	/ * * / 21 {_SigNotify + _SigDefault + _SigIgn, "SIGTTIN: background read from tty"},
	/ * 22 * / {_SigNotify + _SigDefault + _SigIgn, "SIGTTOU: background write to tty"},
  				 
	/ * * / {_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"},
	/ * * / 24 {_SigNotify, "SIGXCPU: cpu limit exceeded"},
	/ * * / 25 {_SigNotify, "SIGXFSZ: file size limit exceeded"},
	/ * * / 26 {_SigNotify, "SIGVTALRM: virtual alarm clock"},
	/ * * / 27 {_SigNotify + _SigUnblock, "SIGPROF: profiling alarm clock"},
	/ * * / {_SigNotify + _SigIgn, "SIGWINCH: window size change"},
	/ * * / 29 {_SigNotify, "SIGIO: i/o now possible"},
	/ * 30 * / {_SigNotify, "SIGPWR: power failure restart"},
	/ * * / 31 {_SigThrow, "SIGSYS: bad system call"},
	/ * * / 32 {_SigSetStack + _SigUnblock, "signal 32"}, /* SIGCANCEL; see issue 6997 */
	/ * * / 33 {_SigSetStack + _SigUnblock, "signal 33"}, /* SIGSETXID; see issues 3871, 9400, 12498 */. }Copy the code

Specific signal meaning we can see the introduction: Unix signal zh.wikipedia.org/wiki/Unix%E… _SigNotify + _SigIgn:

{_SigNotify + _SigIgn, "SIGURG: urgent condition on socket"}
Copy the code

The setsig function is runtime/os_linux.go.

setsig

func setsig(i uint32, fn uintptr) {
	var sa sigactiont
	sa.sa_flags = _SA_SIGINFO | _SA_ONSTACK | _SA_RESTORER | _SA_RESTART
	sigfillset(&sa.sa_mask)
	...
	if fn == funcPC(sighandler) {
        / / CGO
		if iscgo {
			fn = funcPC(cgoSigtramp)
		} else {
            // Call sigtramp instead
			fn = funcPC(sigtramp)
		}
	}
	sa.sa_handler = fn
	sigaction(i, &sa, nil)}Copy the code

Note here that when FN equals sighandler, the function called is replaced by sigtramp. The sigaction function on Linux will call sys_signal and sys_RT_SIGAction to implement the installation signal.

Perform preemption signal

This is when the signal happens and the signal processing happens, it’s supposed to happen after the preemption signal is sent, but I’m going to follow the setup signal. You can skip ahead and send the preempt signal and come back.

As you can see from the above analysis, when FN is equal to sighandler, the function called is replaced by sigtramp, which is an assembly implementation, let’s see.

src/runtime/sys_linux_amd64.s:

The TEXT, the runtime sigtramp < ABIInternal > (SB), NOSPLIT, $72... // We don't save mxcsr or the x87 control word because sigtrampgo doesn't // modify them. MOVQ DX, ctx-56(SP) MOVQ SI, MOVQ DI, SIGNum-72 (SP) MOVQ $runtime· SIGtrampgo (SB), AX CALL AX... RETCopy the code

This is called to indicate that the signal has sent a response, and Runtime sigtramp does the signal processing. Runtime sigtramp will continue to call Runtime sigtrampgo.

This function is in the runtime/signal_unix.go file:

sigtrampgo&sighandler

func sigtrampgo(sig uint32, info *siginfo, ctx unsafe.Pointer) {
	if sigfwdgo(sig, info, ctx) {
		return
	}
	c := &sigctxt{info, ctx}
	g := sigFetchG(c)
	... 
	sighandler(sig, info, ctx, g)
	setg(g)
	if setStack {
		restoreGsignalStack(&gsignalStack)
	}
}


func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
	_g_ := getg()
	c := &sigctxt{info, ctxt}
	... 
  // If it is a preemption signal
	if sig == sigPreempt && debug.asyncpreemptoff == 0 { 
   		// Handle preemption signals
		doSigPreempt(gp, c) 
	}

	...
}
Copy the code

There’s a lot of other signal processing going on in the SIGHandler method, so we’re just going to focus on the preemption part of the code, and that’s where the preemption is eventually done through the doSigPreempt method.

This function is in the runtime/signal_unix.go file:

doSigPreempt

func doSigPreempt(gp *g, ctxt *sigctxt) { 
	// Check if G is to be preempted and can be preempted safely
	if wantAsyncPreempt(gp) { 
		// Check whether preemption can be performed safely
		if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
			// Modify the register and perform a preemption call
			ctxt.pushCall(funcPC(asyncPreempt), newpc)
		}
	}
 
	// Update the preemption field
	atomic.Xadd(&gp.m.preemptGen, 1)
	atomic.Store(&gp.m.signalPending, 0)}Copy the code

The function handles the preemption signal, gets the current SP and PC registers and calls CTxt. pushCall to modify the registers, and calls runtime/preempt. Go asyncPreempt.

// Call asyncPreempt2 after saving the user-mode register
func asyncPreempt(a)
Copy the code

SRC/Runtime /preempt_amd64.s; asyncPreempt2; SRC /runtime/ preempt.go;

asyncPreempt2

func asyncPreempt2(a) {
	gp := getg()
	gp.asyncSafePoint = true
	// Whether G can be preempted
	if gp.preemptStop { 
		mcall(preemptPark)
	} else { 
    	// let G give up the execution right on M and put G into the global queue for subsequent scheduling
		mcall(gopreempt_m)
	}
	gp.asyncSafePoint = false
}
Copy the code

This function gets the current G and determines the preemptStop value of G, PreemptStop marks _Grunning Goroutine as preempt when suspendG of Runtime /preempt.go is called. Gp. preemptStop = true indicates that G is preempt.

Let’s look at the preemptPark function called by runtime/proc.go to perform preemption:

preemptPark

func preemptPark(gp *g) {
	
	status := readgstatus(gp)
	ifstatus&^_Gscan ! = _Grunning { dumpgstatus(gp) throw("bad g status")
	}
	gp.waitreason = waitReasonPreempted 
	casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)
    // cause m to give up g, giving up thread
	dropg()
    // Change the current Goroutine state to _Gpreempted
	casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted)
    // Continue scheduling
	schedule()
}
Copy the code

PreemptPark changes the state of the current Goroutine to _Gpreempted, calls dropg to release threads, and finally calls schedule to continue the task cycle of the other Goroutine.

gopreempt_m

The gopreempt_m method is more like surrender than preemption, and then rejoin the execution queue for scheduling.

func gopreempt_m(gp *g) { 
	goschedImpl(gp)
}

func goschedImpl(gp *g) {
	status := readgstatus(gp)
	...
  // Update status to _Grunnable
	casgstatus(gp, _Grunning, _Grunnable)
  // cause m to give up g, giving up thread
	dropg()
	lock(&sched.lock)
  // Rejoin the global execution queue
	globrunqput(gp)
	unlock(&sched.lock)
	// Continue scheduling
	schedule()
}
Copy the code

Preemption signal transmission

The preemption signal is sent by preemptM.

This function is in the runtime/signal_unix.go file:

preemptM

const sigPreempt = _SIGURG

func preemptM(mp *m){...if atomic.Cas(&mp.signalPending, 0.1) { 
		
		PreemptM sends a preemption request to M.
		// Upon receipt of this request, if the running Gor P is marked as preemption and the Goroutine is at an asynchronous safety point,
		// It will preempt Goroutine.
		signalM(mp, sigPreempt)
	}
}
Copy the code

The preemptM function calls signalM to send the _SIGURG signal at the initial installation to the specified M.

PreemptM is used to send preemption signals in the following areas:

  1. Go Background monitoring Runtime. sysmon Sends preemption signals when detecting timeout.
  2. Go GC stack scan sends preemption signal;
  3. Preemptall preemptall preempts all P’s and pauses them;

Go background monitoring performs preemption

Sysmon preempts a processor that is running or in a system call in a loop by calling Runtime. retake, which traverses the global processor at runtime.

The main purpose of system monitoring is to prevent G from occupying M for too long and causing hunger through preemption in the loop.

Runtime. retake is divided into two main parts:

  1. Call preemptone to preempt the current processor;
  2. Call Handoffp to grant access to the processor;

Preempt the current processor

func retake(now int64) uint32 {
	n := 0
	 
	lock(&allpLock) 
	// Iterate over the ALLP array
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil { 
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false
		if s == _Prunning || s == _Psyscall {
			// Scheduling times
			t := int64(_p_.schedtick)
			if int64(pd.schedtick) ! = t { pd.schedtick =uint32(t)
				// The last time the processor was scheduled
				pd.schedwhen = now
			// preempt the execution of G if it has been 10ms since the last schedule was triggered
			} else if pd.schedwhen+forcePreemptNS <= now {
				preemptone(_p_)
				sysretake = true}}... } unlock(&allpLock)return uint32(n)
}
Copy the code

This procedure obtains the current state of P. If the state is _Prunning or _Psyscall, and it has been 10ms since the last dispatch has been triggered, preemptone will be called to send preemption signals. Preemptone has been described above. I won’t repeat it here.

Call Handoffp to grant access to the processor

func retake(now int64) uint32 {
	n := 0
	lock(&allpLock) 
	// Iterate over the ALLP array
	for i := 0; i < len(allp); i++ {
		_p_ := allp[i]
		if _p_ == nil { 
			continue
		}
		pd := &_p_.sysmontick
		s := _p_.status
		sysretake := false.if s == _Psyscall { 
			// Number of system calls
			t := int64(_p_.syscalltick)
			if! sysretake &&int64(pd.syscalltick) ! = t { pd.syscalltick =uint32(t)
				// The system call time
				pd.syscallwhen = now
				continue
			} 
			if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
				continue
			} 
			unlock(&allpLock) 
			incidlelocked(- 1)
			if atomic.Cas(&_p_.status, s, _Pidle) { 
				n++
				_p_.syscalltick++
				// Give away the use of the processor
				handoffp(_p_)
			}
			incidlelocked(1)
			lock(&allpLock)
		}
	}
	unlock(&allpLock)
	return uint32(n)
}
Copy the code

If P is in the _Psyscall state, a judgment is made and handoffp is called to grant the use of P if one is not satisfied:

  1. runqempty(_p_): Determines whether the task queue of P is empty.
  2. atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle): nmspinning indicates the number of G being stolen. Npidle indicates the number of idle P’s.
  3. pd.syscallwhen+10*1000*1000 > now: Check whether the system call time exceeds 10ms.

Go GC stack scan sends preemption signals

For more information on GC, see this article: The Go language GC implementation principle and source code analysis www.luozhiyun.com/archives/47… when the GC for GC Root tag will scan the stack of G will call before scanning suspendG suspends the execution of the G to make the scanning, After scanning, call resumeG again to resume execution.

This function is available at: runtime/mgcmark.go:

markroot

func markroot(gcw *gcWork, i uint32){...switch{...// Scan the stack of each G
	default: 
		// Get the G to scan
		var gp *g
		if baseStacks <= i && i < end {
			gp = allgs[i-baseStacks]
		} else {
			throw("markroot: bad index")}...// forward to G0 for scanning
		systemstack(func(a){...// Suspend G to make the corresponding G stop running
			stopped := suspendG(gp)
			if stopped.dead {
				gp.gcscandone = true
				return
			}
			if gp.gcscandone {
				throw("g already scanned")}// Scan g's stack
			scanstack(gp, gcw)
			gp.gcscandone = true
			// Restore execution of G
			resumeG(stopped) 
		})
	}
}
Copy the code

Markroot will switch to G0 before scanning the stack to pass it on to G0 for scanning, and then call suspendG to determine the running state of G. If G is in the running state _Grunning, it sets preemptStop to true and sends preemption signals.

This function is run at: runtime/preempt. Go:

suspendG

func suspendG(gp *g) suspendGState{...const yieldDelay = 10 * 1000

	var nextPreemptM int64
	for i := 0; ; i++ {
		switch s := readgstatus(gp); s { 
		... 
		case _Grunning:
			if gp.preemptStop && gp.preempt && gp.stackguard0 == stackPreempt && asyncM == gp.m && atomic.Load(&asyncM.preemptGen) == asyncGen {
				break
			}
			if! castogscanstatus(gp, _Grunning, _Gscanrunning) {break
			}
			// Set the preemption field
			gp.preemptStop = true
			gp.preempt = true
			gp.stackguard0 = stackPreempt
 
			asyncM2 := gp.m
			asyncGen2 := atomic.Load(&asyncM2.preemptGen)
			// asyncM and asyncGen mark the last preemption in the loop to verify that the preemption cannot be repeatedneedAsync := asyncM ! = asyncM2 || asyncGen ! = asyncGen2 asyncM = asyncM2 asyncGen = asyncGen2 casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync { 
				now := nanotime()
				// Limit the frequency of preemption
				if now >= nextPreemptM {
					nextPreemptM = now + yieldDelay/2
					// Perform preemption signal sendingpreemptM(asyncM) } } } ... }}Copy the code

For the suspendG function I only truncated the processing of G in the _Grunning state. PreemptStop is set to true in this state, and is the only place where it is set to true. PreemptStop is associated with preempting the execution of the signal, so if you forget, you can go to asyncPreempt2 above.

Go GC StopTheWorld preempts all P’s

Go GC STW is executed by stopTheWorldWithSema at Runtime /proc.go:

stopTheWorldWithSema

func stopTheWorldWithSema(a) {
	_g_ := getg() 

	lock(&sched.lock)
	sched.stopwait = gomaxprocs
	// mark gcwaiting, which will be entered when scheduling
	atomic.Store(&sched.gcwaiting, 1)
	// Send preemption signal
	preemptall() 
	// Suspend the current P
	_g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.. wait := sched.stopwait >0
	unlock(&sched.lock)
	if wait {
		for {
			// Wait for 100 us
			if notetsleep(&sched.stopnote, 100*1000) {
				noteclear(&sched.stopnote)
				break
			}
			// Send preemption signal again
			preemptall()
		}
	}
	...
}
Copy the code

The stopTheWorldWithSema function calls Preemptall to send preemption signals for all P’s.

The file location of the preemptall function is runtime/proc.go:

preemptall

func preemptall(a) bool {
   res := false
   // iterate over all P's
   for _, _p_ := range allp {
      if_p_.status ! = _Prunning {continue
      }
      // Send preemption signal to running P
      if preemptone(_p_) {
         res = true}}return res
}
Copy the code

The preemptone call to preemptall marks G in M that corresponds to P as preemption in progress; Finally, preemptM is called to send a preemption signal to M.

The file location for this function is runtime/proc.go:

preemptone

func preemptone(_p_ *p) bool {
	// get M for P
	mp := _p_.m.ptr()
	if mp == nil || mp == getg().m {
		return false
	}
	// get G that M is executing
	gp := mp.curg
	if gp == nil || gp == mp.g0 {
		return false
	}
	// mark G as preemption
	gp.preempt = true

	// Check for preemption during stack expansion
	gp.stackguard0 = stackPreempt

	// Request asynchronous preemption for this P
	if preemptMSupported && debug.asyncpreemptoff == 0 {
		_p_.preempt = true
		preemptM(mp)
	} 
	return true
}
Copy the code

conclusion

Here we have a complete look at the signal-based preemption scheduling process. To summarize the specific logic:

  1. The program starts when registered_SIGURGSignal processing functionruntime.doSigPreempt;
  2. At this point, an M1 sends an interrupt signal to M2 through signalM function_SIGURG;
  3. M2 receives the signal, the operating system interrupts its execution code and switches to the signal processing functionruntime.doSigPreempt;
  4. M2 callruntime.asyncPreemptModify the execution context, re-enter the scheduling cycle to schedule other G;

Reference

Linux users to grab and kernel preemption, rounding blog.csdn.net/gatieme/art…

What did sysmon daemon thread www.bookstack.cn/read/qcrao-…

Go: Asynchronous Preemption medium.com/a-journey-w…

Unix signal zh.wikipedia.org/wiki/Unix%E…

Linux Signal mechanism gityuan.com/2015/12/20/…

Trace juejin.cn/post/684490…

Break down the language scheduling loop source implementation www.luozhiyun.com/archives/44…

Golang. design/ Under-the-H…