The outline

Public account: queer cloud storage

[toc]

Chan is the most important structure of Golang, one of the most important features that distinguishes it from other high-level languages, and one of the essential elements of Goroutine communication. Many people use it, but few people understand it completely. Even the syntax c < -x, <-c might be hard to remember. This article will show you how to use channels from the source compiler’s point of view.

What is a channel?

Essentially, golang’s channel is an implementation of a ringbuffer. We call a chan a management structure, and we can put any type of object in a channel, which we call elements.

We will begin with the use of channels and explain the most detailed use of channels.

Channel use posture

We start with the macro posture used by Chan. In summary, there are the following postures:

  • Chan, the creation of
  • Chan team
  • Chan, a team
  • Select is combined with chan
  • The for-range and Chan combine

Chan, create

To create a channel, you can create a channel with or without a buffer.

// No buffer channel
c := make(chan int)
// Channel with its own buffer
c1 := make(chan int , 10)
Copy the code

This corresponds to the actual function makechan, which is located in the runtime/chan.go file.

Chan team

User’s posture:

c <- x
Copy the code

The corresponding function implements chansend, which is located in the runtime/chan.go file.

Chan, a team

User’s posture:

v := <-c
v, ok := <-c
Copy the code

The corresponding functions are chanrecv1 and chanrecv2, located in the runtime/chan.go file.

Combined with the select statement

User’s posture:

select {
case c <- v:
	/ /... foo
default:
	/ /... bar
}
Copy the code

The corresponding function is implemented as selectNbSend in the Runtime /chan.go file.

User’s posture:

select {
case v = <-c:
	/ /... foo
default:
	/ /... bar
}
Copy the code

The corresponding function is implemented as selectNbRecv in the runtime/chan.go file.

User’s posture:

select {
case v, ok = <-c:
	/ /... foo
default:
	/ /... bar
}
Copy the code

The corresponding function is implemented as selectNbrecv2 in the runtime/chan.go file.

Combine the for-range statement

User’s posture:

for m := range c {
    / /... do something
}
Copy the code

The corresponding function, chanrecv2, is located in the runtime/chan.go file.

The source code parsing

With the macro user posture shown above, we have seen that different user gestures correspond to different implementation functions (the translation is done by the compiler), so let’s take a closer look at the implementation of these functions.

makechan

Make (chan int) = v := make(chan int); makechen (makechen) = make(chan int); makechen (makechen) = make(chan int);

runtime.makechan

Defining archetypes:

func makechan(t *chantype, size int) *hchan{}Copy the code

From this, we can see that when we say we’re creating a channel, we’re essentially getting a pointer to hChan, so the core structure of a channel is based on hchan.

Where the t argument specifies the element type:

type chantype struct {
	typ  _type
	elem *_type
	dir  uintptr
}
Copy the code

Size specifies the number of slots in the channel buffer. If it is a channel with a buffer, for example, then size is the number of slots, if not specified, then 0;

// size == 0
a := make(chan int)
// size == 2
b := make(chan int.2)
Copy the code

Let’s take a look at what Makechan did. It was very simple. He did just two things:

func makechan(t *chantype, size int) *hchan {
    // Parameter verification
    // Initialize the hchan structure
}
Copy the code

Parameter validation is nothing more than some check that’s out of bounds, or limit.

There are three simple ways to initialize hchan:

switch {
// No buffer scenario. This channel can be viewed as a pipe;
case mem == 0:
    c = (*hchan)(mallocgc(hchanSize, nil.true))
    c.buf = c.raceaddr()
// If the channel element does not contain a pointer, then a large memory block is allocated;
case elem.ptrdata == 0:
    c = (*hchan)(mallocgc(hchanSize+mem, nil.true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
// In the default scenario, hchan structures and buffer blocks are allocated separately;
default:
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)}Copy the code
  1. If a channel does not have a buffer, then only one hchan structure is allocated.
  2. If the elem element does not contain a pointer, then hchan and the elem buffer can be allocated together.
  3. If the channel element (elem) has a pointer, then hchan and buffer cannot be allocated together. So new an hchan structure and allocate an elem buffer array separately.

So we see that in addition to the memory allocation of the hchan structure itself, the key to the initialization of the hchan structure is four fields:

// Channel's element buffer array address;
c.buf = mallocgc(mem, elem, true)
// Channel element size, 8 bytes if int;
c.elemsize = uint16(elem.size)
// Element type, so we know what each element in the channel is;
c.elemtype = elem
Make (chan int, 2); // Make (chan int, 2);
c.dataqsiz = uint(size)
Copy the code

Hchan structure

The makechan function is responsible for creating the core structure of Chan -hchan. Let’s take a closer look at the hchan structure itself.

type hchan struct {
	qcount   uint           // Queue is a valid user element. This field is changed when the element enters the queue.
	dataqsiz uint           // Specify the size of the array buffer;
	buf      unsafe.Pointer // Specify the address of the buffer array, initialize the assignment, and then do not change;
	elemsize uint16  // Specify the size of the element. Use it with dataqsiz to know the size of the buffer block.
	closed   uint32
	elemtype *_type // Element type, initialize assignment;
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // A list of objects waiting for recV responses, abstracted as waiters
	sendq    waitq  // A list of objects waiting for seDN response, abstracted as waiters

	/ / exclusive resources protection lock, official specially, at the time of holding the mutex, absolutely do not modify the state of the Goroutine, not probably enlarge shrinks the stack capacitors, a deadlock
	lock mutex
}
Copy the code

In Makechan we can see that the initialization actually only initializes four core fields:

  1. Buf: indicates the buffer address
  2. Elemsize: specifies the element size
  3. Elemtype: specifies the element type
  4. Dataqsiz: Specifies the array size

When we use a channel, we know that a channel is often blocked in two ways: 1) when it is delivered, there is no space, and 2) when it is retrieved, there is no element.

// If c runs out of space, then this line of code will hang, and the goroutine will not return until there is a buffer;
c <- x

// If there are no user elements in c, then this line of code will hang, and the goroutine will be cut, and the line will not return until an element is retrieved;
<- c
Copy the code

From the above description, it involves goroutine blocking and Goroutine waking, which is related to recvq and sendq fields.

	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters
Copy the code

The Waitq type is actually a bidirectional LIST implementation, much like the LIST implementation in Linux.

type waitq struct {
	first *sudog
	last  *sudog
}
Copy the code

chansend

The chansend function is inserted when the compiler parses code such as c < -x, essentially Posting a user element into hchan’s ringbuffer. When chansend is called, the average user will encounter two situations:

  1. The delivery was successful, very smooth, normal return;
  2. The delivery is blocked, the function is blocked, the goroutine is cut;

Next, let’s take a look at what Chansend did.

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // All operations on a channel are in a mutex;
    lock(&c.lock)
    // If the target is a closed channel, then direct panic;
    ifc.closed ! =0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))}// Scenario 1: The best-performing scenario, where I deliver an element that happens to be waiting for someone (so I just give it to them);
    / / call the send function, this function is the back detail, actually very simple, increasing sendx, recvx index, and then direct the elements to his people, and wake him;
    ifsg := c.recvq.dequeue(); sg ! =nil {
        send(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
        return true
    }
    // Scene 2: There is still room for the ringbuffer.
    if c.qcount < c.dataqsiz {
        // Copy and assign the good element;
        qp := chanbuf(c, c.sendx)
        typedmemmove(c.elemtype, qp, ep)
        // Incrementing the index
        c.sendx++
        // Loop space
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // Increase the number of elements
        c.qcount++
        unlock(&c.lock)
        return true
    }
    // Do you need to block? If it is a non-blocking scene, then it will be unlocked directly. If it is a blocking scene, then it will go to the following logic;
    // Chan <- and <-chan scenarios are both true, but there are other scenarios where this is false.
    if! block { unlock(&c.lock)return false
    }
    // The code goes here to say that it is blocking the current goroutine because the condition is not met. So what it does is essentially leave the notification path open and wait for the condition to be met.
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    // Add a goroutine related cue structure to the queue and wait for the condition to be met;
    c.sendq.enqueue(mysg)
    // Goroutine ();
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

    // This is when someone wakes up the goroutine.
    // The following is the logic after awakening;
    ifmysg ! = gp.waiting { throw("G waiting list is corrupted")}// Do some resource release and environment cleaning.
    gp.waiting = nil
    gp.activeStackChans = false
    if gp.param == nil {
        // Do some checks
        if c.closed == 0 {
            throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))
    }
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true
}
Copy the code

When we execute c < -x in Golang to post an element to a channel, we call the chansend function. This function handles several scenarios, to summarize:

  1. Scenario 1: If someone (goroutine) is waiting for the element of the channel, this is the happiest scenario. Just give the element to him and wake it up. Hchan itself increments the ringbuffer index.
  2. Scenario 2: If there is room in the ringbuffer, then the element is stored. This is also the flow of the scenario. The storage and removal are asynchronous processes.
  3. Scene 3: There is no space in the ringbuffer. This is when the block is needed.c <- xThe compiled code isblock = trueWhen will chansend’s block argument be false? The answer is: select;

About the return value: chansend the return value indicates whether the element was successfully pushed into the queue, true if successful, false otherwise.

Select’s early Revelations:

select {
case c <- v:
    // ... foo
default:
    // ... bar
}
Copy the code

The Golang source code compiles to something like this:

if selectnbsend(c, v) {
/ /... foo
} else {
/ /... bar
}
Copy the code

Selectnbasend is just a proxy:

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
    // The chansend function is called with the block argument false;
	return chansend(c, elem, false, getcallerpc())
}
Copy the code

Summary: Yes, the Chansend function is as simple as that. It essentially says: Post elements to a channel.

chanrecv

The corresponding golang statement is: < -c. This function implements the element dequeuing function of a channel. For example, the compilation corresponds to something like this:

Golang statements:

<- c
Copy the code

Corresponding:

func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)}Copy the code

Golang statement (the difference this time is whether there is a return value) :

v, ok :=  <- c
Copy the code

Corresponding:

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)       
    return
}
Copy the code

When the compiler encounters <-c or v, ok := <-c, it will replace the corresponding functions chanrecv1 and chanrecv2. Both of these functions are essentially simple wrapses. The implementation of the element exit function is chanrecv. Block is equal to true (again, block is false only when select).

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// Special scenario: non-blocking mode, and no elements can be returned directly, this branch is a fast branch, the following code is locked;
	if! block && (c.dataqsiz ==0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	// All the following logic is in the lock;
	lock(&c.lock)

	ifc.closed ! =0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		ifep ! =nil {
			typedmemclr(c.elemtype, ep)
		}
		return true.false
	}

	// Scene: if you see a sender waiting for someone to receive, just give it to us;
	ifsg := c.sendq.dequeue(); sg ! =nil {
		recv(c, sg, ep, func(a) { unlock(&c.lock) }, 3)
		return true.true
	}

	// If there is still room for the element in the ringbuffer, then add the element to the ringbuffer and increment the index.
	if c.qcount > 0 {
		/ / element
		qp := chanbuf(c, c.recvx)
		ifep ! =nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		// Incrementing the index
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true.true
	}

	// There is not enough space for the ringbuffer. Do you need to block?
	// If block is false, then exit and return the corresponding value;
	if! block { unlock(&c.lock)return false.false
	}

	// At this point, it means to block and wait. The only thing to do is to prepare for blocking.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	// Goroutine is added to the queue as a waiter and is removed from the queue after the condition is satisfied;
	c.recvq.enqueue(mysg)
	// Goroutine ()
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	// This is where awakening begins;
	ifmysg ! = gp.waiting { throw("G waiting list is corrupted")}// Let's clean up some resources
	gp.waiting = nil
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}
Copy the code

The chanrecv function returns two values: selected and received. Selected is usually used as a combination of select and indicates whether to enter the select-case branch. Received indicates whether the element was successfully received from the queue. There are several cases:

  1. If it is non-blocking (block=false) and there are no elements available, return (selected=false, received=false) so that it does not enter the case branch of select;
  2. If chan is closed (block=true, received=false), select (selected=true, received=false); if chan is closed (selected=true, received=false), select (select);
  3. If chan is still in blocking mode, then return (Selected =true, recived=true), indicating that the element was properly fetched.

selectnbsend

This function is used when c < -v is combined with select. If the select case is a chan expression, the compiler will convert it to the corresponding selectnbsend function, as follows:

select {
case c <- v:
	/ /... foo
default:
	/ /... bar
}
Copy the code

The corresponding compile function logic is as follows:

if selectnbsend(c, v) {
	/ /... foo
} else {
	/ /... bar
}
Copy the code

Selectnbsend is essentially a wrapper around Chansend:

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
        // Notice that the block argument is false
        return chansend(c, elem, false, getcallerpc())
}
Copy the code

The only difference is that the block argument is set to false. That is, if there is no space in the ringbuffer, it will not block. Chan does not cut off execution permissions here.

selectnbrecv

This function is also used when v := <-c combined with select. If the select case is a chan expression, the compiler will convert it to the corresponding selectNbsrecv function, as follows:

select {
case v = <-c:
	/ /... foo
default:
	/ /... bar
}
Copy the code

The corresponding compile function logic is as follows:

if selectnbrecv(&v, c) {
	/ /... foo
} else {
	/ /... bar
}
Copy the code

Selectnbrecv is essentially a wrapper around chanrecv:

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
    // Set the block parameter to false
    selected, _ = chanrecv(c, elem, false)        
    return
}
Copy the code

The internal logic of Chanrecv is detailed above. There are no elements in the ringbuffer and it does not block. The scheduling authority is not removed because of this.

selectnbrecv2

Select * from ‘chan’; select * from ‘chan’; select * from ‘chan’; select from ‘chan’;

select {
case v, ok = <-c:
	/ /... foo
default:
	/ /... bar
}
Copy the code

The corresponding compile function logic is as follows:

if selectnbrecv2(&v,  &ok,  c) {
	/ /... foo
} else {
	/ /... bar
}
Copy the code

Selectnbrecv2 is essentially a wrapper of Chanrecv, but returns a different value:

func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
    // Set the block parameter to false
    selected, *received = chanrecv(c, elem, false)
    return
}
Copy the code

The internal logic of Chanrecv is detailed above. There are no elements in the ringbuffer and it does not block. The scheduling authority is not removed because of this. Selectnbrecv2 is different from selectNbRecv in that it also takes an OK parameter to indicate whether or not the element was retrieved.

chanrecv2

Chan can be used in conjunction with for-range, and the compiler recognizes this syntax use as follows:

for m := range c {
    / /... do something
}
Copy the code

This is essentially a for loop, and we know that the key to a for loop is to break it down into three parts: initialization, condition judgment, and condition progression:

for (init , condition, increment) {
    // do something
}
Copy the code

So after we combine for-range and Chan, how do we understand these three key factors? The brief is as follows:

Init Initialization: None

Condition:

ok := chanrecv2(c, ep)
if ok {
}
Copy the code

Increment Condition: None

When the compiler encounters chan with for-range, it converts it to a call to chanrecv2. Intent to queue elements from a channel and return a received value. First look at the implementation of Chanrecv2:

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    // If this block=true, chanrecv is blocked inside;
    _, received = chanrecv(c, elem, true)        
    return
}
Copy the code

The pseudo-code of chan compiled with for-range is as follows:

for(; ok = chanrecv2( c, ep ) ; ) {// do something
}
Copy the code

For range and chan, only the chan is closed. Otherwise, the loop is always inside the loop. Why is that? Note that chanrecv takes block=true, and that the for-range is an infinite loop. Chanrecv2 returns false to break out of the loop. Chanrecv2 returns false with block=true only because chan is in the close state.

conclusion

Golang chan is very simple to use. The simple syntax sugar is actually the corresponding function implementation. The translation is done by the compiler. A deeper understanding of the implementation of these functions is essential to a thorough understanding of chan’s use and constraints. By understanding how it works and how it works, you can use Golang whenever you want.