This is the 26th day of my participation in the November Gwen Challenge. Check out the event details: The last Gwen Challenge 2021

scheduler.build

Now that there are two Edges, how does Scheduler schedule tasks? Where Scheduler starts building:

// build evaluates edge into a result
func (s *scheduler) build(ctx context.Context, edge Edge) (CachedResult, error) {
   s.mu.Lock()
   e := s.ef.getEdge(edge)
   ...
   wait := make(chan struct{})
   var p *pipe.Pipe
   p = s.newPipe(e, nil, pipe.Request{Payload: &edgeRequest{desiredState: edgeStatusComplete}})
   p.OnSendCompletion = func(a) {
      p.Receiver.Receive()
      if p.Receiver.Status().Completed {
         close(wait)
      }
   }
   ...
   <-wait
   ...
}
Copy the code

The final task to be completed in onSendCompletion is close(wait), which means that this is the last edge. When onSendCompletion is complete, This means that this time the build is complete.

scheduler newPipe

Look again at newPipe:

// newPipe creates a new request pipe between two edges
func (s *scheduler) newPipe(target, from *edge, req pipe.Request) *pipe.Pipe {
   p := &edgePipe{
      Pipe:   pipe.New(req),
      Target: target,
      From:   from,
   }

   s.signal(target)
   iffrom ! =nil {
      p.OnSendCompletion = func(a) {
         p.mu.Lock()
         defer p.mu.Unlock()
         s.signal(p.From)
      }
      s.outgoing[from] = append(s.outgoing[from], p)
   }
   s.incoming[target] = append(s.incoming[target], p)
   p.OnReceiveCompletion = func(a) {
      p.mu.Lock()
      defer p.mu.Unlock()
      s.signal(p.Target)
   }
   return p.Pipe
}
Copy the code

From = nil; from = nil; from = nil; That is, the first edge built is the last edge. When from is not empty, OnSendCompletion calls S.signal (p.finish) to send from, which is the source signal. OnReceiveCompletion sends the target signal. Add from and target to s.utgoign and s.coming, respectively.

scheduler.signal

// signal notifies that an edge needs to be processed again
func (s *scheduler) signal(e *edge) {
   s.muQ.Lock()
   if_, ok := s.waitq[e]; ! ok { d := &dispatcher{e: e}if s.last == nil {
         s.next = d
      } else {
         s.last.next = d
      }
      s.last = d
      s.waitq[e] = struct{}{}
      s.cond.Signal()
   }
   s.muQ.Unlock()
}
Copy the code

Create a new & Dispatcher based on the signal edge, add to s. Cast, and trigger the signal. Who received the signal, and what was done with it?

func (s *scheduler) loop(a) {
   defer func(a) {
      close(s.closed)
   }()

   go func(a) {
      <-s.stopped
      s.mu.Lock()
      s.cond.Signal()
      s.mu.Unlock()
   }()

   s.mu.Lock()
   for {
      select {
      case <-s.stopped:
         s.mu.Unlock()
         return
      default:
      }
      s.muQ.Lock()
      l := s.next
      ifl ! =nil {
         if l == s.last {
            s.last = nil
         }
         s.next = l.next
         delete(s.waitq, l.e)
      }
      s.muQ.Unlock()
      if l == nil {
         s.cond.Wait()
         continue
      }
      s.dispatch(l.e)
   }
}
Copy the code

Remember that scheduler triggers this loop when it is initialized by Jobs, and it always listens for the latest dispatch event and finally dispatches s.Dispatch (L.E) :

// dispatch schedules an edge to be processed
func (s *scheduler) dispatch(e *edge){... pf := &pipeFactory{s: s, e: e}// unpark the edge
   e.unpark(inc, updates, out, pf)
   ...
}
Copy the code

E.park is called when edge is distributed. This is where edge is really handled:

func (e *edge) unpark(incoming []pipe.Sender, updates, allPipes []pipe.Receiver, f *pipeFactory){... desiredState, done := e.respondToIncoming(incoming, allPipes)if done {
      return
   }

   cacheMapReq := false
   // set up new outgoing requests if needed
   if e.cacheMapReq == nil && (e.cacheMap == nil || len(e.cacheRecords) == 0) {
      index := e.cacheMapIndex
      e.cacheMapReq = f.NewFuncRequest(func(ctx context.Context) (interface{}, error) {
         cm, err := e.op.CacheMap(ctx, index)
         return cm, errors.Wrap(err, "failed to load cache key")
      })
      cacheMapReq = true}...if e.execReq == nil {
      if added := e.createInputRequests(desiredState, f, false); ! added && ! e.hasActiveOutgoing && ! cacheMapReq { bklog.G(context.TODO()).Errorf("buildkit scheluding error: leaving incoming open. forcing solve. Please report this with BUILDKIT_SCHEDULER_DEBUG=1")
         debugSchedulerPreUnpark(e, incoming, updates, allPipes)
         e.createInputRequests(desiredState, f, true)}}}Copy the code

Here:

  • desiredState, done := e.respondToIncoming(incoming, allPipes)To determine whether the Edge build is complete
  • e.cacheMapReq = f.NewFuncRequest(...), creates the operation to get op.Cachemap, and you can see that a new pipe has been created
  • added := e.createInputRequests(...)If it is an executable Op, check whether the Inputs are ready, that is, their Inputs, to create a new pipe.

After this ring ring carding down, how to more intuitive understanding?

  • Build from Edge0, creating a only careclose(wait)The pipe0
  • Into theunparkAfter that, it checks its Op’s CacheMap, which is the second step here, and creates the PiPE1. Pipe1 comes from Pipe0, so it sends an EDge0 signal when it’s done, so that the redistributed Dispatch event can tell the EDge0 pipe to fetch the desired content and see if it’s done
  • Inputs edge0 have edge1, so create the build event for edge1 too. Pipe2 is created in Step 3
  • After edge1 starts building, like Edge1, see if your Op has created a PiPE3 in CacheMap (step 4), but in this case, edGE1 is notified when the piPE3 has been sent

As you can see, this is also a recursion, and pipe is all about connecting people together and triggering a callback event, s.dispatch in this case, s.loop is like an event bus that listens for new events and sends them.

Since it is recursive, there is a mouth. Where is the exit of edge? How do you judge that you are finished?Remember the s.coming, s.utgoing array? It stores all of the pipes, and the edge recursive build exits in:

desiredState, done := e.respondToIncoming(incoming, allPipes)
Copy the code

The pipes associated with e0 here are P0, P1, and P2, which means that e0 is not completed until all three pipes are completed. Similarly, e1 is associated with p2 and p3, and the build is not complete until p2 and P3 are both completed.

So that’s it! Scheduler is really a good housekeeper and manages everything in an orderly way.

Next: In-depth understanding of the Moby Buildkit series #28 – SourceOp CacheMap