This is the 25th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

Yuan has seen similar design patterns, such as Pipe. The general idea is to abstract away the environment in which the operation is run, with inputs and outputs. To link up the execution of operations in an orderly fashion. If you want to be flexible, the input and output can not be unified, and can even be customized. But if you think about the design patterns you’ve seen before, it’s usually synchronous operations, that is, operations themselves are not complex, but they need to be sequenced in sequence, with support for common functions such as input and output comparison.

How does golang implement a pipe feature that supports interrupt operations? Yuan Xiaobai’s curiosity jumped out again.

Now that we know what constitutes pipe, how exactly does BuildKit design and implement this pipe? With this question, Yuan Xiaobai began to comb the sequence between the modules.

Pipe pre-preparation initialization

  • Create a channel-runch in conjunction with a user-defined function, such as our blocking function, where we are waiting for runCh to write, so that we can return the result “res0”.
f := func(ctx context.Context) (interface{}, error) {
   select {
   case <-ctx.Done():
      return nil, ctx.Err()
   case <-runCh:
      return "res0".nil}}Copy the code
  • Create a waitSignal channel so I can do whatever I want, and when the Signal is triggered, that’s pipe.onSendcompletion
waitSignal := make(chan struct{}, 10)
signalled := 0
signal := func(a) {
   signalled++
   waitSignal <- struct{}{}
}

p, start := NewWithFunction(f)
p.OnSendCompletion = signal
Copy the code
  • Pass in a user-defined function to create the pipe. From the results returned, we get a PIPE and a start function
return p, func(a) {
   res, err := f(ctx)
   iferr ! =nil {
      p.Sender.Finalize(nil, err)
      return
   }
   p.Sender.Finalize(res, nil)}Copy the code

As you can see, this is where our custom function is actually called. And the pipe also calls the sender.Finalize method.

  • The function that actually creates the Pipe instance isNew:
func New(req Request) *Pipe {
   cancelCh := &channel{}
   roundTripCh := &channel{}
   pw := &sender{
      req:         req,
      sendChannel: roundTripCh,
   }
   pr := &receiver{
      req:         req,
      recvChannel: roundTripCh,
      sendChannel: cancelCh,
   }

   p := &Pipe{
      Sender:   pw,
      Receiver: pr,
   }

   cancelCh.OnSendCompletion = func(a) {
      v, ok := cancelCh.Receive()
      if ok {
         pw.setRequest(v.(Request))
      }
      ifp.OnReceiveCompletion ! =nil {
         p.OnReceiveCompletion()
      }
   }

   roundTripCh.OnSendCompletion = func(a) {
      ifp.OnSendCompletion ! =nil {
         p.OnSendCompletion()
      }
   }

   return p
}
Copy the code

This is where the Sender, Receiver, pipe is created. Worthy of our attention and cancel. OnSendCompletion and roundTripCh OnSendCompletion, respectively will invoke the p.O nReceiveCompletion and p.O nSendCompletion life-cycle function. This is where channel and Pipe delivered before. And then finally, both the Receiver and the Sender hold roundTripCh, but the difference is roundTripCh is recvChannel for the receiver and sendChannel for the sender, and that’s where the pipe connection is set up, One end writes send and the other reads RECV.

  • The start – command is readygo start()Based on what you know about Goroutine, this is usually not an immediate call to the actual runtime because switching goroutine has a cost, small compared to switching threads, but not completely free. The following verification further supported Yuan xiaobai’s guess:
go start()
require.Equal(t, false, p.Receiver.Receive())

st := p.Receiver.Status()
require.Equal(t, st.Completed, false)
require.Equal(t, st.Canceled, false)
require.Nil(t, st.Value)
require.Equal(t, signalled, 0)
Copy the code

Pipe Happy Path – Normal flow

The normal process is that we set up the pipe pipe and execute it, and get the result we expect. Everything goes smoothly.

  • Ready to return normally, that is, the runCh channel normally receives a message –close(runCh)For example, an asynchronous HTTP request is returned normally. Then the current goroutine enters the block –<-waitSignal, listen for signals and wait to be woken up:
close(runCh)
<-waitSignal
Copy the code
  • The start function is officially executed, the user-defined function is called, and the result is “res”; Trigger p. ender. Finalize
  • There are two things that Sender does in Finalize. One is to update the status attribute, such as Value: res, Err: nil, Completed: true, Canceled: req.Canceled False. The other thing is to call sendChannel to send the status that was just updated. As mentioned earlier,sender’s sendChannel is the common roundTripChannel:
func (c *channel) Send(v interface{}) {
   c.value.Store(&wrappedValue{value: v})
   ifc.OnSendCompletion ! =nil {
      c.OnSendCompletion()
   }
}
Copy the code
  • SendChannel’s Send method again calls the OnSendCompletion event that was set up in advance:
roundTripCh.OnSendCompletion = func(a) {
   ifp.OnSendCompletion ! =nil {
      p.OnSendCompletion()
   }
}
Copy the code

The actual OnSendCompletion event of Pipe is called, and that event is the signal function set up in the test:

signal := func(a) {
   signalled++
   waitSignal <- struct{}{}
}
Copy the code

Add up the counter. And writes to the waitSignal channel. This wakes up the other Goroutine waiting for updates to the waitSignal channel, in this case our test-main Goroutine.

  • When we call p.regieiver.status again, we get what we expect:
p.Receiver.Receive()
st = p.Receiver.Status()
require.Equal(t, st.Completed, true)
require.Equal(t, st.Canceled, false)
require.NoError(t, st.Err)
require.Equal(t, st.Value.(string), "res0")
Copy the code

Pipe Cancellation process

The difference with the normal process is that the user takes the message and calls the PIPE operationp.Receiver.Cancel()

p.Receiver.Cancel()
<-waitSignal
Copy the code
  • The first operation triggered is Receiver.Cancel:
func (pr *receiver) Cancel(a) {
   req := pr.req
   if req.Canceled {
      return
   }
   req.Canceled = true
   pr.sendChannel.Send(req)
}
Copy the code

Update the reQ status. Because the Sender and Receiver instances both hold the REQ, they can share reQ information. Call sendchannel. Send(req), since sendChannel is cancelCh before, when calling the Send method of channel:

func (c *channel) Send(v interface{}) {
   c.value.Store(&wrappedValue{value: v})
   ifc.OnSendCompletion ! =nil {
      c.OnSendCompletion()
   }
}
Copy the code

The corresponding OnSendCompletion operation is also called:

cancelCh.OnSendCompletion = func(a) {
   v, ok := cancelCh.Receive()
   if ok {
      pw.setRequest(v.(Request))
   }
   ifp.OnReceiveCompletion ! =nil {
      p.OnReceiveCompletion()
   }
}
Copy the code

Here, the OnReceiveCompletion operation of pipe is called again:

ctx, cancel := context.WithCancel(context.TODO())

p.OnReceiveCompletion = func(a) {
   if req := p.Sender.Request(); req.Canceled {
      cancel()
   }
}
Copy the code

Because req’s Canceled is set to true, the context’s cancel method is called. This is where you connect the start function, because when you execute custom f, you pass in CTX, which is the context, and f(CTX) will report an error.

  • Different from normal process, when sender.Finalize is called, nil is passed in, err-p.senderFinalize (nil, err)
  • Next, in the normal process, signal is called and the main Goroutine, our test function, is woken up

The result, of course, is what we expect:

p.Receiver.Receive()
st = p.Receiver.Status()
require.Equal(t, st.Completed, true)
require.Equal(t, st.Canceled, true)
require.Error(t, st.Err)
require.Equal(t, st.Err, context.Canceled)
Copy the code

Next: Take a closer look at Moby Buildkit #26 – Scheduler’s genius