Preface: It can be said that concurrency is what REALLY attracts me to GO. After in-depth understanding of this mechanism, I benefit a lot. Next, I will talk about the concurrency mechanism of GO with my weak cognition.

First, the initialization process

Before we do that, let’s take a look at the assembly code in ASm_arm64.s to start the logic

CALL Runtime ·args(SB) CALL Runtime ·osinit(SB) CALL Runtime ·hashinit(SB) CALL Runtime ·schedinit(SB) // Create a new goroutine to start program PUSHQ$runtime, the main, f (SB) / / entry PUSHQ$0// arg size CALL Runtime ·newproc(SB) POPQ AX // start this M CALL runtime·mstart(SB)Copy the code

Then it’s time for analysis

1. The osinit function also gets the number of cpus and the size of the page, which is pretty easy. 2.

func schedinit() {// get the current G _g_ := getg()ifraceenabled { _g_.racectx, Maxmcount = 10000 // Initializes stack space stackInit () // Initializes memory space mallocinit() // initializes current M McOmmoninit (_g_. M) // Adjust the number of P to the number of cpus procs := ncpuif n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
        procs = n
    }
    ifProcs > _MaxGomaxprocs {procs = _MaxGomaxprocs} // Initialize Pifprocresize(procs) ! = nil { throw("unknown runnable goroutine during bootstrap")}}Copy the code

3. We can see above that procresize is called to initialize P, so let’s look at procresize. This code is too long, parsed in several parts (only paste important code) (1) initialize new P

for i := int32(0); i < nprocs; i++ {
        pp := allp[i]
        ifPp == nil {// Create a new P object pp = new(P) pp.id = I pp.status = _Pgcstop Atomicstorep (unsafe.pointer (&allp[I]), unsafe.pointer (pp))} // if P does not already have a cache, then allocate itif pp.mcache == nil {
            if old == 0 && i == 0 {
                if getg().m.mcache == nil {
                    throw("missing mcache?")
                }
                pp.mcache = getg().m.mcache // bootstrap
            } else{pp.mcache = allocmcache()// Allocate cache}}Copy the code

(2) Release the unused P

fori := nprocs; i < old; I ++ {p := allp[I] // Adds the local task to the global queueforp.runqhead ! = p.runqtail {p.runqtail-- gp := p.runq[P.runqTAIL %uint32(len(p.runq))]. PTR () // Globrunqputhead (gp)} = p.runqtail {p.runqtail-- gp := P.runq [P.runqtail %uint32(len(p.runq))]. // Free cache freemcache(p.mcache) p.mcache = nil // Link the current G of P to the global GFPurge (P) p.stat = _Pdead // can't free P itself because it can be referenced by an M in syscall }Copy the code

After these two steps, we create a batch of PS, and the idle PS are put into the scheduler Sched’s free linked list

Two, the process of creating G

As you can see from the assembly code above, the newProc function is called to create main G, which is then used to execute Runtime. main, which is used to create a thread (which is responsible for system monitoring at runtime), and the main function in the GO program. Take a look at the newProc code

func newproc(siz int32, fn *funcval) { argp := add(unsafe.Pointer(&fn), PC := getCallerPC (unsafe.Pointer(&siz))//func() {newproc1 (fn, (* uint8) (argp), siz, 0, PC) / / place actually creating G})}Copy the code

Let’s take a look at the main code for NewPro1

func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, Callerpc Uintptr) *g {// Obtain g _p_ := _g_.m.p.ptr() newg := gfget(_p_) // If obtain fails, create a new oneif newg == nil {
        newg = malg(_StackMinCasgstatus (newg, _Gidle, _Gdead) allgadd(newg)} runqput(_p_, newg,true) // The following three conditions are: whether there is a free P; Whether M is in a spin state; Whether runteime.main is currently createdifatomic.Load(&sched.npidle) ! = 0 && atomic.Load(&sched.nmspinning) == 0 && runtimeInitTime ! = 0 { wakep() } }Copy the code

The wakep() function is also worth looking at, and the idea can be applied to everyday programming

func wakep() {// The thread needs to bind a P after being woken up. Cas operation is used here to avoid waking up multiple threads, which also corresponds to one of the three judgment conditions aboveif! atomic.Cas(&sched.nmspinning, 0, 1) {return
    }
    startm(nil, true)}Copy the code

Startm code is left to the reader to see, or feel the post code, the main idea is: to get a free P () if the incoming P is empty, and then try to get free M (M is a scheduler schedt management of leisure, this structure can go to the below), can’t get to go to create a M, etc.

Three, the Channel

This one is a little bit simpler, and it doesn’t have much code, but it looks like a lot

Create a Channel

Let’s take a look at the structure definition (deleted)

typeDataqsiz uint // buffer slot size buf unsafe.Pointer // Pointer to buffer slot elemsize uint16 // Data size Elemtype *_type // Data type Sendx uint // Index of the sending position Recvx uint // Index of the receiving position Recvq Waitq // Wait list of the receiving position Sendq waitq // send wait list lock mutex // lock}type sudog struct {
    g          *g
    selectdone *uint32 // CAS to 1 to win select race (may point to stack)
    next       *sudog
    prev       *sudog
    elem       unsafe.Pointer // data element (may point to stack)
    waitlink    *sudog // g.waiting list or semaRoot
    waittail    *sudog // semaRoot
    c           *hchan // channel
}Copy the code

Recvq is blocked in the G list of a channel. Sendq is blocked in the G list of a channel. G can block in different channels at the same time. This is where sudog is introduced, which is essentially a wrapper around G, representing a G on a waiting queue.

Let’s look at the creation process

Func makechan(t *chantype, size int64) *hchan {elem := t.lem // Size not greater than 64Kif elem.size >= 1<<16 {
        throw("makechan: invalid channel element type"} var c *hchan // The whole creation process is straightforwardifelem.kind&kindNoPointers ! = 0 | | size = = 0 {/ / one-time allocates memory c = (* hchan) (mallocgc (hchanSize + uintptr (size) * elem. Size, nil,true))
        ifsize > 0 && elem.size ! = 0 { c.buf = add(unsafe.Pointer(c), hchanSize) }else {
            c.buf = unsafe.Pointer(c)
        }
    } else{c = new(hchan) c.bouf = newarray(elem, int(size))} C uintsize = uint16(em.size) C uinttype = elem C Dataqsiz = uint(size)return c
}Copy the code

2, send

(1) If G is blocking in recVq, fetch G from the queue and give it data. (2) If G is blocking in RECVq, fetch G from the queue and give it data

ifsg := c.recvq.dequeue(); sg ! = nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }Copy the code

(2) If hchan.buf has available space, then put the data in

// Compare qcount and datasiz to see if there is still space availableifQp := chanbuf(c, c sendx);if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }Copy the code

(3) If hchan.buf is full, it will be blocked

// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
ift0 ! Mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.selectDone = nil mysg.c = c Gp.waiting = mysg gp.param = nil // Add the current goroutine to the queue c.endq.enqueue (mysg) goparkunlock(&c.lock,"chan send", traceEvGoBlockSend, 3)Copy the code

Here we can see that if it is full, then sudog will appear, initialized to represent the current G in the wait queue

3, receiving

Similarly, reception is divided into three cases

(1) There is a sending goroutine blocking on the channel, buF is full

ifsg := c.sendq.dequeue(); sg ! = nil { recv(c, sg, ep,func() { unlock(&c.lock) }, 3)
        return true.true
    }Copy the code

(2) There are data in BUF

ifC.qucount > 0 {// Receive qp directly from queue := chanbuf(c, c.recvx)ifep ! = nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true.true
    }Copy the code

(3) If there is no data in BUF, it will be blocked

    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    ift0 ! = 0 {mysg. Releasetime = -1} Mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.selectDone = nil mysg.c = c gp.param = nil c.recvq.enqueue(mysg) goparkunlock(&c.lock,"chan receive", traceEvGoBlockRecv, 3)Copy the code

Summary: Although the logic of this code is not complex, there are a lot of things to design, and it took a lot of time. Now I understand the logic of executing G on M, but I still don’t know the details, and I will continue to study later. Overall read down, first of all, the first is the mechanism of concurrency can be said to be a very good understanding of the future in the preparation of relevant code is certainly very helpful. Secondly, I learned some programming ideas, such as CAS operation, how to better encapsulate and abstract.