preface

Golang has two powerful tools for concurrent programming, channel and Goroutine. In this article, we will talk about channel. Anyone familiar with Golang knows the famous phrase: “Use communication to share memory, not communicate through shared memory”. This statement is twofold: The Go language does provide a traditional locking mechanism in the Sync package, but channels are recommended to solve concurrency problems. This article will first make an in-depth study of channel from the usage and principle of channel.

The channel usage

What is the channel

A channel is a channel. A channel is a secure message queue used by go coroutines to receive or send messages. A channel is like a conduit between two GO coroutines to realize the synchronization of various resources. It can be illustrated as follows:

The use of channel is simple:

func main(a) {

    ch := make(chan int.1// Create a channel of type int and buffer size 1

    ch <- 2 // Send 2 to ch

    n, ok := <- ch // n receives the value sent from ch

    if ok {

        fmt.Println(n) / / 2

    }

    close(ch) / / close the channel

}

Copy the code

There are a few things to note when using channels:

  • To anil channelSending a message will block forever;
  • To a closedchannelSending a message causes runtime panic(panic);
  • channelCan not continue to closechannelSend messages but can continue fromchannelReceive messages;
  • whenchannelWhen closed and the buffer is empty, the slave continueschannelReceiving a message returns a zero value of the corresponding type.

Unbuffered channels and Buffered channels

Unbuffered Channels is a channel with a buffer size of 0. The receiver of a channel blocks until the message is received, and the sender blocks until the receiver receives the message. This mechanism can be used for state synchronization between two Goroutines. Buffered Channels has a buffer that blocks when the buffer is full; When the buffer is empty, the receiver blocks.

Unbuffered Channels and Buffered Channels are illustrated by The two diagrams In The Nature Of Channels In Go.

Unbuffered channels:

Unbuffered channels

Buffered channels:

Buffered channels

Traversal of the channel

for range

Channels support for range traversal:

package main  



import "fmt"  



func main(a) {  

    ci := make(chan int.5)  

    for i := 1; i <= 5; i++ {

        ci <- i

    }    

    close(ci)  



    for i := range ci {  

        fmt.Println(i)  

    }  

}  

Copy the code

It is worth noting that during the traverse, if the channel is not closed, then the deadlock will wait forever, causing an error. If the traversal channel is closed, the traversal automatically exits after the data is traversed. In other words, for range traversal is blocking traversal.

for select

Select can handle non-blocking message sending, receiving, and multiplexing.

package main  



import "fmt"  



func main(a) {  

    ci := make(chan int.2)

    for i := 1; i <= 2; i++ {

        ci <- i

    }

    close(ci)



    cs := make(chan string.2)

    cs <- "hi"

    cs <- "golang"

    close(cs)



    ciClosed, csClosed := false.false

    for {

        if ciClosed && csClosed {

            return

        }

        select {

        case i, ok := <-ci:

            if ok {

                fmt.Println(i)

            } else {

                ciClosed = true

                fmt.Println("ci closed")

            }

        case s, ok := <-cs:

            if ok {

                fmt.Println(s)

            } else {

                csClosed = true

                fmt.Println("cs closed")

            }

        default:

            fmt.Println("waiting...")

        }

    }

}  

Copy the code

Select contains a case block, which is used by a channel to send or receive messages. When any case block is ready, its corresponding content is executed. When more than one case code block is ready, a random case code block is selected and executed; If all case blocks are not ready, wait; You can also have a default code block, which is executed when all case blocks are not ready.

Principle of the channel

First post the source address of channel, readers can compare to see.

The data structure

Let’s look at the structure of a channel:

type hchan struct {

    qcount   uint           // total data in the queue

    dataqsiz uint           // size of the circular queue

    buf      unsafe.Pointer // points to an array of dataqsiz elements

    // The size of an element in a channel

    elemsize uint16 

    // Whether it is closed

    closed   uint32

    // The type of the element in channel

    elemtype *_type // element type

    sendx    uint   // send index

    recvx    uint   // receive index

    recvq    waitq  // list of recv waiters

    sendq    waitq  // list of send waiters



    // lock protects all fields in hchan, as well as several

    // fields in sudogs blocked on this channel.

    //

    // Do not change another G's status while holding this lock

    // (in particular, do not ready a G), as this can deadlock

    // with stack shrinking.

    lock mutex

}

Copy the code

Qcount represents the number of elements in the queue, dataqsiz represents the total size of the queue, and buf represents a pointer to the loop array. Sendx and recvx are used to identify the current sent and received elements in the loop queue, respectively. Recvq and Sendq are both lists of goroutines currently waiting to receive and waiting to send, respectively.

Look again at the waitQ data structure:

type waitq struct {

    first *sudog

    last  *sudog

}



type sudog struct {

    / / the current goroutine

    g *g



    // isSelect indicates g is participating in a select, so

    // g.selectDone must be CAS'd to win the wake-up race.

    isSelect bool

    next     *sudog

    prev     *sudog

    elem     unsafe.Pointer // data element (may point to stack)



    // The following fields are never accessed concurrently.

    // For channels, waitlink is only accessed by g.

    // For semaphores, all fields (including the ones above)

    // are only accessed when holding a semaRoot lock.



    acquiretime int64

    releasetime int64

    ticket      uint32

    parent      *sudog // semaRoot binary tree

    waitlink    *sudog // g.waiting list or semaRoot

    waittail    *sudog // semaRoot

    c           *hchan // channel

}

Copy the code

Where sudog represents the Goroutine encapsulation in the wait list and contains some context information, with first and last referring to the Goroutine at the top of the wait list, respectively.

Compile analysis

Before taking a look at how channels work, let’s use the Go Tool to examine the following code to see what runtime methods channel operations call underneath:

ch := make(chan int.2)

ch <- 2

ch <- 1

<-ch

n, ok := <-ch

if ok {

    fmt.Println(n)

}

close(ch)

Copy the code

compile

go build test.go

go tool objdump -s "main\.main" test | grep CALL

Copy the code

Filter out the CALL:

  test.go:118           0x1092f55               e81612f7ff              CALL runtime.makechan(SB)

  test.go:119           0x1092f74               e82714f7ff              CALL runtime.chansend1(SB)

  test.go:120           0x1092f8e               e80d14f7ff              CALL runtime.chansend1(SB)

  test.go:121           0x1092fa5               e8361ff7ff              CALL runtime.chanrecv1(SB)

  test.go:122           0x1092fbd               e85e1ff7ff              CALL runtime.chanrecv2(SB)

  test.go:126           0x1092fd7               e8841cf7ff              CALL runtime.closechan(SB)

  test.go:124           0x1092fea               e8b156f7ff              CALL runtime.convT64(SB)

  print.go:275          0x1093041               e88a98ffff              CALL fmt.Fprintln(SB)

  test.go:47            0x1093055               e896c1fbff              CALL runtime.morestack_noctxt(SB)

Copy the code

create

From the compilation analysis above, you can see that the runtime method makechan was called when the channel was created:

func makechan(t *chantype, size int) *hchan {

    elem := t.elem



    // compiler checks this but be safe.

    if elem.size >= 1<<16 {

        throw("makechan: invalid channel element type")

    }

    ifhchanSize%maxAlign ! =0 || elem.align > maxAlign {

        throw("makechan: bad alignment")

    }



    // Calculate the total size of the buffer required (buffer size * element size) and determine if the maximum size is exceeded

    mem, overflow := math.MulUintptr(elem.size, uintptr(size))

    if overflow || mem > maxAlloc-hchanSize || size < 0 {

        panic(plainError("makechan: size out of range"))

    }



    // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.

    // buf points into the same allocation, elemtype is persistent.

    // SudoG's are referenced from their owning thread so they can't be collected.

    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.

    var c *hchan

    switch {

    case mem == 0:

        // If the buffer size is 0, or if the size of the element in a channel is 0 (struct{}{}), only the space required by the channel is allocated

        // Queue or element size is zero.

        c = (*hchan)(mallocgc(hchanSize, nil.true))

        // Race detector uses this location for synchronization.

        c.buf = c.raceaddr()

    caseelem.kind&kindNoPointers ! =0:

        // Allocate a contiguous memory space equal to the space required by the buffer array + hchan.

        // Elements do not contain pointers.

        // Allocate hchan and buf in one call.

        c = (*hchan)(mallocgc(hchanSize+mem, nil.true))

        c.buf = add(unsafe.Pointer(c), hchanSize)

    default:

        // The element contains Pointers to allocate space for hchan and buffer respectively

        // Elements contain pointers.

        c = new(hchan)

        c.buf = mallocgc(mem, elem, true)

    }



    c.elemsize = uint16(elem.size)

    c.elemtype = elem

    c.dataqsiz = uint(size)



    if debugChan {

        print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")

    }

    return c

}

Copy the code

Makechan’s code logic is relatively simple, first verifying the element type and buffer space size, then creating hchan and allocating the required space. There are three cases: when the buffer size is 0, or the element size in a channel is 0, only the space required by the channel is allocated; When the channel element type is not a pointer, only one contiguous memory space is allocated for hCHAN and the buffer, which is the size of the buffer array space plus the required space for Hchan. By default, buffers contain Pointers, and memory needs to be allocated for hchan and buffers separately. Finally, other fields of hchan are updated, including ELEMSIZE, ELEMType and dataqsiz.

send

The sending operation of a channel calls the runtime method chansend1. Inside chansend1, chansend is called.

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {

    / / channel is nil

    if c == nil {

        // If it is not blocked, send failed

        if! block {

            return false

        }

        // Otherwise, the current Goroutine blocks and hangs

        gopark(nil.nil, waitReasonChanSendNilChan, traceEvGoStop, 2)

        throw("unreachable")

    }



    if debugChan {

        print("chansend: chan=", c, "\n")

    }



    if raceenabled {

        racereadpc(c.raceaddr(), callerpc, funcPC(chansend))

    }



    // Fast path: check for failed non-blocking operation without acquiring the lock.



    // For non-blocking channels that are not closed, if there is no buffer and no Goroutine waiting to receive, or if there is a buffer and the buffer is full, then send failed

    if! block && c.closed ==0 && ((c.dataqsiz == 0 && c.recvq.first == nil) | |

        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {

        return false

    }



    var t0 int64

    if blockprofilerate > 0 {

        t0 = cputicks()

    }



    / / lock

    lock(&c.lock)



    // If the channel is closed

    ifc.closed ! =0 {

        // To unlock, panic

        unlock(&c.lock)

        panic(plainError("send on closed channel"))

    }



    // When a channel is not closed, there are several cases:



    // 1, when there is a Goroutine waiting to receive

    ifsg := c.recvq.dequeue(); sg ! =nil {

        // Found a waiting receiver. We pass the value we want to send

        // directly to the receiver, bypassing the channel buffer (if any).



        // Then send the value being sent directly to the waiting Goroutine

        send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)

        return true

    }



    // The buffer is not full

    if c.qcount < c.dataqsiz {

        // Space is available in the channel buffer. Enqueue the element to send.

        // Gets a pointer to the element in the buffer array at sendx

        qp := chanbuf(c, c.sendx)

        if raceenabled {

            raceacquire(qp)

            racerelease(qp)

        }

        // Copy the currently sent value to the buffer

        typedmemmove(c.elemtype, qp, ep)

        // add one to sendx index

        c.sendx++

        // Because it is a circular queue, sendx is set to 0 when it is equal to the queue length

        if c.sendx == c.dataqsiz {

            c.sendx = 0

        }

        // The total number of elements in the queue is increased by one, and the unlock is returned

        c.qcount++

        unlock(&c.lock)

        return true

    }



    // if there is no Goroutine waiting to receive and there is no free space in the buffer, then unlock it and return send failure

    if! block {

        unlock(&c.lock)

        return false

    }



    // Block on the channel. Some receiver will complete our operation for us.

    // If it is blocking, package the current Goroutine into a sudog structure and add it to the sendq queue of the channel

    gp := getg()

    mysg := acquireSudog()

    mysg.releasetime = 0

    ift0 ! =0 {

        mysg.releasetime = - 1

    }

    // No stack splits between assigning elem and enqueuing mysg

    // on gp.waiting where copystack can find it.

    mysg.elem = ep

    mysg.waitlink = nil

    mysg.g = gp

    mysg.isSelect = false

    mysg.c = c

    gp.waiting = mysg

    gp.param = nil

    c.sendq.enqueue(mysg)



    // Call goparkunlock to set the current Goroutine to wait and unlock it

    goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

    // Ensure the value being sent is kept alive until the

    // receiver copies it out. The sudog has a pointer to the

    // stack object, but sudogs aren't considered as roots of the

    // stack tracer.

    KeepAlive(ep)



    // someone woke us up.

    // Perform cleanup and release sudog structures after waking up

    ifmysg ! = gp.waiting {

        throw("G waiting list is corrupted")

    }

    gp.waiting = nil

    if gp.param == nil {

        if c.closed == 0 {

            throw("chansend: spurious wakeup")

        }

        panic(plainError("send on closed channel"))

    }

    gp.param = nil

    if mysg.releasetime > 0 {

        blockevent(mysg.releasetime-t0, 2)

    }

    mysg.c = nil

    releaseSudog(mysg)

    return true

}

Copy the code

The execution logic of chansend is clearly commented, so let’s go over it again. The processing logic is relatively simple for several sending failure cases under the condition of non-blocking sending or channel being closed. Readers can refer to the annotations. Here we focus on several general situations when a channel is not closed:

There are goroutines waiting to receive

If a Goroutine exists in recVQ, the value being sent is sent directly to the Goroutine waiting to receive. The schematic diagram is as follows:


So let’s see
sendMethods:

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(a).skip int) {

.



    ifsg.elem ! =nil {

        // Copy the sent value directly to the memory address of the received value (such as v = <-ch)

        sendDirect(c.elemtype, sg, ep)

        sg.elem = nil

    }

    // Get the Goroutine waiting to receive data

    gp := sg.g

    unlockf()

    gp.param = unsafe.Pointer(sg)

    ifsg.releasetime ! =0 {

        sg.releasetime = cputicks()

    }

    // Wake up the Goroutine that was waiting to receive data

    goready(gp, skip+1)

}

Copy the code

It is necessary to describe several states of Goroutine during the scheduling process:

_Gidle = iota // Goroutine has just been allocated and has not been initialized yet



_Grunnable // Goroutine is in a runqueue, not yet running, and has no stack of its own



_Grunning // Goroutine has its own stack and is allocated M(thread) and P(scheduling context)



_Gsyscall // Goroutine is making a system call



_Gwaiting // Goroutine is blocked



_Gdead // Goroutine is not in use. It may have just exited or is being initialized



_Gcopystack // indicates that g's current stack is being removed and a new stack is being assigned

Copy the code

When goReady is called, the state of the Goroutine is changed from _Gwaiting to _Grunnable until the next schedule is executed again.

When the buffer is full

When the buffer is not full, find the location of the buffer array pointed to by Sendx, copy the sending value to that location, add the sendx index and release the lock, as shown below:

Blocking the send

If the send is blocked, the current Goroutine is packaged into a sudog structure and added to the sendq queue of the channel. The schematic diagram is as follows:

Goparkunlock is then called to set the current Goroutine to _Gwaiting and unlock it. The Goroutine is blocked and waiting to be awakened (goReady). If awakened by the scheduler, the cleanup is performed and the corresponding sudog structure is finally released.

receive

Channel reception takes two forms:

<-ch

n, ok := <-ch

Copy the code

These two methods call the runtime methods chanrecv1 and chanrecv2 respectively:

func chanrecv1(c *hchan, elem unsafe.Pointer) {

    chanrecv(c, elem, true)

}



func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {

    _, received = chanrecv(c, elem, true)

    return

}

Copy the code

Both methods will eventually call the chanrecv method:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {



    if debugChan {

        print("chanrecv: chan=", c, "\n")

    }



    / / channel is nil

    if c == nil {

        // Non-blocking direct return (false, false)

        if! block {

            return

        }

        // Block receive, the current Goroutine blocks and hangs

        gopark(nil.nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)

        throw("unreachable")

    }



    // Fast path: check for failed non-blocking operation without acquiring the lock.



    // Non-blocking mode for the following two cases:

    // 1. There is no buffer and the waiting queue is empty

    // 2, there are buffers but the buffer array is empty and channel is not closed

    Return (false, false);

    if! block && (c.dataqsiz ==0 && c.sendq.first == nil ||

        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&

        atomic.Load(&c.closed) == 0 {

        return

    }



    var t0 int64

    if blockprofilerate > 0 {

        t0 = cputicks()

    }



    / / lock

    lock(&c.lock)

    // If channel is closed and the buffer has no elements

    ifc.closed ! =0 && c.qcount == 0 {

        if raceenabled {

            raceacquire(c.raceaddr())

        }

        unlock(&c.lock)

        // There are variables waiting to be received (i.e. V = v in <-ch)

        ifep ! =nil {

            // Clear ep's address based on the type of the channel element, i.e. ep receives a zero value of the channel element

            typedmemclr(c.elemtype, ep)

        }

        // Returns (true, false), that is, a value received, but not a valid value received from the channel

        return true.false

    }



    // In addition to the above unconventional situations, there are the following common situations:



    If there is a Goroutine in sendq, the current channel has no buffer, or the current channel is full

    ifsg := c.sendq.dequeue(); sg ! =nil {

        // Found a waiting sender. If buffer is size 0, receive value

        // directly from sender. Otherwise, receive from head of queue

        // and add sender's value to the tail of the queue (both map to

        // the same buffer slot because the queue is full).

        // If there is no buffer, then receive data directly from sender; Otherwise, the data is received from the head of the BUF queue and the sender's data is added to the tail of the BUF queue

        recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)

        // Received successfully

        return true.true

    }



    // there are elements in buffer buf

    if c.qcount > 0 {

        // Receive directly from queue

        // Get the element from the position pointed to by recvx

        qp := chanbuf(c, c.recvx)

        if raceenabled {

            raceacquire(qp)

            racerelease(qp)

        }

        ifep ! =nil {

            // Copy the elements taken from buf to the current coroutine

            typedmemmove(c.elemtype, ep, qp)

        }

        // Empty the memory where the data is stored

        typedmemclr(c.elemtype, qp)

        // Receive index +1

        c.recvx++

        if c.recvx == c.dataqsiz {

            c.recvx = 0

        }

        // the total number of buf elements is -1

        c.qcount--

        // Unlock, and receive successfully

        unlock(&c.lock)

        return true.true

    }



    // 3, non-blocking mode, and no data acceptable

    if! block {

        // Unlock, and receive failure is returned

        unlock(&c.lock)

        return false.false

    }



    // no sender available: block on this channel.

    // Block mode, get the current Goroutine, pack a sudog

    gp := getg()

    mysg := acquireSudog()

    mysg.releasetime = 0

    ift0 ! =0 {

        mysg.releasetime = - 1

    }

    // No stack splits between assigning elem and enqueuing mysg

    // on gp.waiting where copystack can find it.

    mysg.elem = ep

    mysg.waitlink = nil

    gp.waiting = mysg

    mysg.g = gp

    mysg.isSelect = false

    mysg.c = c

    gp.param = nil

    // Add the channel to recvq

    c.recvq.enqueue(mysg)

    // Suspend the current Goroutine, set it to _Gwaiting and unlock it

    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)



    // someone woke us up

    // Perform cleanup and release sudog structures after waking up

    ifmysg ! = gp.waiting {

        throw("G waiting list is corrupted")

    }

    gp.waiting = nil

    if mysg.releasetime > 0 {

        blockevent(mysg.releasetime-t0, 2)

    }

    closed := gp.param == nil

    gp.param = nil

    mysg.c = nil

    releaseSudog(mysg)

    return true, !closed

}

Copy the code

The logic of the chanrecv method is very similar to that of Chansend, and we will only analyze a few common cases here. The other cases are explained more clearly by the above comments, and the reader can refer to the corresponding code and comments.

There are goroutines waiting to be sent

If a pending Goroutine exists in sendq, the current channel has no buffer, or the current channel is full. Get the Goroutine that blocks first from Sendq and call recv:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(a).skip int) {

    if c.dataqsiz == 0 {

        // No buffer

        if raceenabled {

            racesync(c, sg)

        }

        ifep ! =nil {

            // copy data from sender

            recvDirect(c.elemtype, sg, ep)

        }

    } else {

        // Buffer is full

        // Queue is full. Take the item at the

        // head of the queue. Make the sender enqueue

        // its item at the tail of the queue. Since the

        // queue is full, those are both the same slot.

        qp := chanbuf(c, c.recvx)

        if raceenabled {

            raceacquire(qp)

            racerelease(qp)

            raceacquireg(sg.g, qp)

            racereleaseg(sg.g, qp)

        }

        // copy data from queue to receiver

        ifep ! =nil {

            typedmemmove(c.elemtype, ep, qp)

        }

        // copy data from sender to queue

        typedmemmove(c.elemtype, qp, sg.elem)

        c.recvx++

        if c.recvx == c.dataqsiz {

            c.recvx = 0

        }

        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz

    }

    sg.elem = nil

    gp := sg.g

    unlockf()

    gp.param = unsafe.Pointer(sg)

    ifsg.releasetime ! =0 {

        sg.releasetime = cputicks()

    }

    // Change the state of the Goroutine waiting to send data from _Gwaiting to _Grunnable, waiting for the next schedule.

    goready(gp, skip+1)

}

Copy the code

1, If there is no buffer, then receive data directly from sender; If the buffer is full, receive data from the head of the BUF queue and add data from the sender to the tail of the BUF queue; 3. Finally, call goReady to change the state of the Goroutine waiting to send data from _Gwaiting to _Grunnable, waiting for the next schedule.

The following figure shows what happens when the buffer is full:

There is also data in buffer BUF

If there are still elements in the buffer BUF, then the normal receive is done, and the elements fetched from buF are copied to the current coroutine’s received data target memory address. It is worth noting that even though the channel is closed at this point, it can still receive data from the buffer BUF normally. This is an easy case, so I’m not going to draw the schematic.

Non-blocking receive

If the current Goroutine is in blocking mode and no data is currently available to receive, then the current Goroutine needs to be packaged as a sudog and added to the channel’s waiting queue RECVQ, with the state of the current Goroutine set to _Gwaiting and waiting to wake up. The schematic diagram is as follows:

If the current Goroutine is then awakened by the scheduler, the cleanup is performed and the corresponding Sudog structure is finally released.

Shut down

To close a channel:

func closechan(c *hchan) {

    // Nil channel check

    if c == nil {

        panic(plainError("close of nil channel"))

    }



    lock(&c.lock)

    // A closed channel cannot be closed again

    ifc.closed ! =0 {

        unlock(&c.lock)

        panic(plainError("close of closed channel"))

    }



    if raceenabled {

        callerpc := getcallerpc()

        racewritepc(c.raceaddr(), callerpc, funcPC(closechan))

        racerelease(c.raceaddr())

    }

    // Set the shutdown state to 1

    c.closed = 1



    var glist glist



    // release all readers

    // Go through recvq, clear sudog's data, remove the Goroutine in _Gwaiting state and add it to glist

    for {

        sg := c.recvq.dequeue()

        if sg == nil {

            break

        }

        ifsg.elem ! =nil {

            typedmemclr(c.elemtype, sg.elem)

            sg.elem = nil

        }

        ifsg.releasetime ! =0 {

            sg.releasetime = cputicks()

        }

        gp := sg.g

        gp.param = nil

        if raceenabled {

            raceacquireg(gp, c.raceaddr())

        }

        glist.push(gp)

    }



    // release all writers (they will panic)

    // Iterate over sendq, clear sudog data, and add Goroutine in _Gwaiting state to glist

    for {

        sg := c.sendq.dequeue()

        if sg == nil {

            break

        }

        sg.elem = nil

        ifsg.releasetime ! =0 {

            sg.releasetime = cputicks()

        }

        gp := sg.g

        gp.param = nil

        if raceenabled {

            raceacquireg(gp, c.raceaddr())

        }

        glist.push(gp)

    }

    unlock(&c.lock)



    // Ready all Gs now that we've dropped the channel lock.

Set the state of all goroutines in GList to _Grunnable and wait for the scheduler to schedule

    for! glist.empty() {

        gp := glist.pop()

        gp.schedlink = 0

        goready(gp, 3)

    }

}

Copy the code

1. When closing a channel, recvq and sendq will be iterated (actually only recvq or sendq), the pending Goroutine in sudog will be taken out and added to the GList list, and some information and state on sudog will be cleared.

2. Then iterate through the GList list, call goReady for each Goroutine, set all goroutines to _Grunnable state, and wait for scheduling.

3. When Goroutine is awakened, chansend and chanrecv continue to execute the remaining logic of the current Goroutine.

conclusion

To sum up, this paper first introduces the definition and usage details of channel through the basic usage of channel, and then explores the basic operation of channel including sending, receiving and closing in more detail and in-depth. Careful readers will also notice that the operation of channels is closely related to the scheduling of coroutines, but this article has only touched on the scheduling of Goroutines, which will be explored later when the time is right.

The resources

Concurrency 1, The Nature Of Channels In Go