An overview of the

Channel is one of the most important components in the CSP concurrency model. Two independent concurrent entities communicate with each other through the shared communication channel. Most people just use this structure and very few people talk about the underlying implementation, and this article is about the underlying implementation of a channel.

channel

The underlying implementation of channel is a structure, the source code is as follows:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}
Copy the code

It may not be easy to look at the source code, but HERE I have drawn a picture for you to look at, and I have marked the different colors on it and commented on their effects.

A channel, like a conveyor belt or queue, always follows the rules of FIFO to ensure the order of sending and receiving data. The channel is an important way of communication between Goroutines, and is safe for concurrency.

buf

In the hchan structure, buf points to a circular queue, which is used to implement the circular queue, sendx is the pointer to the end of the circular queue, recvx is the pointer to the head of the circular queue, dataqsize is the size of the cached channel, and qcount is the number of elements in the channel.

Ch := make(chan int, 10) creates a channel with a buffer, which is the purple buF on the graph. Buf is created during make. It has the size of the elements times the number of elements in a circular queue, which you can see as a ring, and buf is a pointer to that ring.

Ch = make(chan int,6) buf refers to the ring’s address on the heap.

func makechan(t *chantype, size int) *hchan {
    elem := t.elem

    // compiler checks this but be safe.
    if elem.size >= 1<<16 {
        throw("makechan: invalid channel element type")}ifhchanSize%maxAlign ! =0 || elem.align > maxAlign {
        throw("makechan: bad alignment")
    }

	  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
    // buf points into the same allocation, elemtype is persistent.
    // SudoG's are referenced from their owning thread so they can't be collected.
    // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil.true))
        // Race detector uses this location for synchronization.
      c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil.true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    if debugChan {
        print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}
Copy the code

This is the implementation of the code, which checks if your arguments are valid, then opens the space in memory via mallocGC, and returns.

sendx & recvx

Here I manually simulate a ring implementation code:

// Queue cycle buffer
type CycleQueue struct {
    data                  []interface{} // An array of elements
    frontIndex, rearIndex int           // frontIndex header pointer,rearIndex tail pointer
    size                  int           // Circular size
}

// NewQueue Circular Queue
func NewQueue(size int) (*CycleQueue, error) {
    if size <= 0 || size < 10 {
      return nil, fmt.Errorf("initialize circular queue size fail,%d not legal,size >= 10", size)
    }
    cq := new(CycleQueue)
    cq.data = make([]interface{}, size)
    cq.size = size
    return cq, nil
}

// Push add data to queue
func (q *CycleQueue) Push(value interface{}) error {
    if (q.rearIndex+1) %cap(q.data) == q.frontIndex {
        return errors.New("circular queue full")
    }
    q.data[q.rearIndex] = value
    q.rearIndex = (q.rearIndex + 1) % cap(q.data)
    return nil
}

// Pop return queue a front element
func (q *CycleQueue) Pop(a) interface{} {
    if q.rearIndex == q.frontIndex {
        return nil
    }
    v := q.data[q.frontIndex]
    q.data[q.frontIndex] = nil // Set the element position to null
    q.frontIndex = (q.frontIndex + 1) % cap(q.data)
    return v
}
Copy the code

Circular queues usually use the empty cell method to solve the ambiguity problem caused by font=rear when the queue is empty and full, but this will waste a cell. In golang’s channel, the qcount field is added to record the length of the queue. On the one hand, a storage unit is not wasted. On the other hand, when len is used to check the length of the queue, the qcount field can be returned directly.

When we need to read data, we fetch it directly from the recvx pointer and write it from the sendx position, as shown:

sendq & recvq

Coroutine blocking occurs when the write buffer is full or when the read buffer is empty.

If a write block adds the current coroutine to the sendQ queue until a RECVQ initiates a read operation, the write queue will be woken up by the program.

When the buffer is full, g-W will be added to the SENDq queue to wait for g-R to operate, and g-W will be woken up and continue to work. This design is very close to the five states of thread in the operating system. It can be seen that the designer of GO may have referred to the thread design of the operating system.

When SendQ is not null, and there is no buffer, that is, no buffer channel, it takes the data from the first sendq coroutine. Gopher can look at the source code for himself. This article is also a recent summary of the notes I saw in this source code.

Point focus on

If you do not pay attention to please point a attention bai! Continuously updated… Follow me to share Rust and Golang and System Design.