Focus on big data and container cloud core technology decryption, can provide full stack of big data + cloud native platform consulting solutions, please continue to pay attention to this set of blog. If you have any academic exchange, please feel free to contact me. For more content, please pay attention to the public account of Data Cloud Technology Community.

1 Preemption scheduling (pre-selection process + optimization failure)

  • Scheduler’s CMD code directory structure
  • Scheduler’s PKG code directory structure

1.1 Scheduling Relationship

  • Pre-selected scheduling -> Preferred scheduling logic -> Node preemption logic
  • ScheduleOne implements a complete scheduling workflow of 1 pod. The process is sequential, i.e., non-concurrent. This means that as soon as the previous pod’s scheduleOne is complete, a return is made and the next pod’s scheduleOne is immediately followed!
pkg/scheduler/algorithm/scheduler_interface.go:78
type ScheduleAlgorithm interface {
    Schedule(*v1.Pod, NodeLister) (selectedMachine string, err error)
    Preempt(*v1.Pod, NodeLister, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
    Predicates() map[string]FitPredicate
    Prioritizers() []PriorityConfig
}

pkg/scheduler/scheduler.go:276
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
    if! sched.config.WaitForCacheSync() {
        return
    }
    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

pkg/scheduler/scheduler.go:513
func (sched *Scheduler) scheduleOne() {
    pod := sched.config.NextPod()
    suggestedHost, err := sched.schedule(pod)
    iferr ! = nil {if fitError, ok := err.(*core.FitError); ok {
            preemptionStartTime := time.Now()
            sched.preempt(pod, fitError)
        }
        return
    }
    assumedPod := pod.DeepCopy()
    allBound, err := sched.assumeVolumes(assumedPod, suggestedHost)
    err = sched.assume(assumedPod, suggestedHost)
    go func() {
        err := sched.bind(assumedPod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: assumedPod.Namespace, Name: assumedPod.Name, UID: assumedPod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: suggestedHost,
            },
        })
    }()
}

pkg/scheduler/scheduler.go:290
General scheduling process (pre-selected process + Preferred process)
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
    nodes, err := nodeLister.List()
    trace.Step("Computing predicates")
    filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pod, nodes)
    trace.Step("Prioritizing")
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    trace.Step("Selecting host")
    return g.selectHost(priorityList)
}
Copy the code

1.2 Birth cause of preemption ->Pod priority

  • With priority, Pod can be called priority scheduling and preemption scheduling. Pods with a higher priority can be ranked first in the scheduling queue and choose Node first.
  • When a higher-priority pod fails to find a node, it checks to see if the lower-priority pod on the node can run. If so, one or more lower-priority pods on the node will be expelled. The high-priority POD is then successfully run on one node.
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: high-priority
value: 1000000
globalDefault: false
description: "This priority class should be used for XYZ service pods only."

apiVersion: v1
kind: Pod
metadata:
  name: nginx
  labels:
    env: test
spec:
  containers:
  - name: nginx
    image: nginx
    imagePullPolicy: IfNotPresent
  priorityClassName: high-priority
Copy the code

2 Preemption scheduling entry – Outer preemption implementation

  • Check whether preemption is disabled. If preemption is disabled, return directly.
  • Gets the latest object data for the pod that failed to schedule.
  • Preempt Algorithm algorithm. Preempt is executed to return a list of pre-scheduled nodes and pods to be culled.
  • Add the nodes returned by the preemption algorithm to the pod’s status.nominatedNodename and remove the pods that need to be culled.
  • Clear pod status. NominatedNodeName when the preemption algorithm returns nil.
  • The whole process of preemption of the final result is, in fact, update the Pod. Status. NominatedNodeName attribute information. If preemption algorithm returns the node is not null, then the node updates to Pod. The Status. NominatedNodeName, otherwise it will Pod. Status. NominatedNodeName set to null.

2.1 calls to preempt

suggestedHost, err := sched.schedule(pod)
iferr ! = nil {if fitError, ok := err.(*core.FitError); ok {
      preemptionStartTime := time.Now()
      sched.preempt(pod, fitError)
      metrics.PreemptionAttempts.Inc()
   } else {
      klog.Errorf("error selecting node for pod: %v", err)
      metrics.PodScheduleErrors.Inc()
   }
   return
}
Copy the code

2.2 Outer preemption implementation related functions

  • PodPriorityEnabled: Returns “” if feature is not enabled
  • GetUpdatedPod: Updates POD information. Both the input and return values are of type * v1.pod
  • Sched. Config. Algorithm. Preempt: core logic process
func (sched*Scheduler) preempt(preemptor * v1.pod, scheduleErr Error) (string, error)""
    if! util.PodPriorityEnabled() || sched.config.DisablePreemption {return ""// Update pod information; Into arguments and return values are * v1 in Pod type preemptor, err: = sched. Config. PodPreemptor. GetUpdatedPod (preemptor) / / preempt processes, The preempt the core node, victims, nominatedPodsToClear, err: = sched. Config. Algorithm. Preempt (preemptor, sched.config.NodeLister, scheduleErr) var nodeName =""
    ifnode ! = nil {nodeName = node. The Name "appointed pod" / / update the queue queue sched. Config. SchedulingQueue. UpdateNominatedPodForNode (preemptor, NodeName) / / set the pod Status. The NominatedNodeName err = sched. Config. PodPreemptor. SetNominatedNodeName (preemptor, nodeName)iferr ! = nil {/ / if the error is removed from the queue sched. Config. SchedulingQueue. DeleteNominatedPodIfExists (preemptor)return "", err
        }

        for_, victim := range victims {// pod to be removediferr := sched.config.PodPreemptor.DeletePod(victim); err ! = nil {return "", err
            }
            sched.config.Recorder.Eventf(victim, v1.EventTypeNormal, "Preempted"."by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName)
        }
    }
    // Clearing nominated pods should happen outside of "if node ! = nil"// The cleanup process is at the topifExternally, let's go back to the implementation of Preempt()for _, p := range nominatedPodsToClear {
        rErr := sched.config.PodPreemptor.RemoveNominatedNodeName(p)
        ifrErr ! = nil { klog.Errorf("Cannot remove nominated node annotation of pod: %v", rErr)
            // We do not return as this error is not critical.
        }
    }
    returnFunc (p *podPreemptor) GetUpdatedPod(pod * v1.pod) (* v1.pod, error) {returnp.Client.CoreV1().Pods(pod.Namespace).Get(pod.Name, Metav1.getoptions {}} // Delete a pod func (p *podPreemptor) DeletePod(pod * v1.pod) error {returnp.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, & metav1. DeleteOptions {})} / / set the pod Status. NominatedNodeName for the specified node name func (p * podPreemptor) SetNominatedNodeName(pod *v1.Pod, nominatedNodeName string) error { podCopy := pod.DeepCopy() podCopy.Status.NominatedNodeName = nominatedNodeName _, err := p.Client.CoreV1().Pods(pod.Namespace).UpdateStatus(podCopy)returnErr} / / empty pod. Status. NominatedNodeName func (p * podPreemptor) RemoveNominatedNodeName (pod * v1. Pod) error {if len(pod.Status.NominatedNodeName) == 0 {
      return nil
   }
   return p.SetNominatedNodeName(pod, "")}Copy the code

3 preemption scheduling entry – Inner evaluation cull recommendation

3.1 Recommended steps for inner evaluation elimination

The main implementation of Preempt is to find nodes that can be scheduled and pods on them that need to be culled due to preemption. The basic process is as follows:

  • According to the causes of scheduling failure, a batch of screening is performed on all nodes to screen out the list of potential scheduled nodes.
  • Select the POD and its node to be sacrificed by selectNodesForPreemption.
  • The above filtered victims are filtered again based on extended preemption logic.
  • Based on the above filtering results, select a node that may eventually be scheduled due to preemption.
  • Based on the above candidate nodes, find the list of victim pods on this node whose priority is lower than the currently scheduled pod.

3.2 Core source code analysis

  • Convention Node Preempt selects a node and preempts the Pods resource above, returning (the node information, the preempted Pods information,nominated Node Name List of nodes to be cleaned up, possible error)
(*v1.Node, []*v1.Pod, []*v1.Pod, error)
Copy the code
  • The Convention Pod prevents other Pods from using The “specified” resource, even if it takes a lot of time to wait for other pending pods.
  • PodEligibleToPreemptOthers: is there a low priority state of pod in delete, it returns false.
  • NodesWherePreemptionMightHelp: find predicates phase failure but by preemption may be able to dispatch the nodes of success.
  • SelectNodesForPreemption: This victimsonnode attempts to find the minimum number of pods in a given node that need to be expelled, and to ensure that once these pods are expelled, the noode can victimize a Pod enough to run from it.
Computes whether "pods" can be scheduled after all low-priority Pods on preemption nodes are ejected. If so, sort by priority and divide into two groups based on whether or not the PDB is broken. One group affects the PDB limit and the other group does not affect the PDB. The two groups were ranked in order of priority. Then gradually release pods from groups that affect the PDB, and then gradually release pods from groups that do not affect the PDB, keeping the "pod" fit into the node. This means that if a pod doesn't fit the node, then the pod can't be missed, which means that the minimum set of Pods has been found.Copy the code
  • PickOneNodeForPreemption: choose one of nodes from a given node, this function assumes that the given value part of the map is the priority in descending order. The criteria for choosing node here are: 1. The fewest PDB violations 2. Minimum high priority victim 3. Minimum number of priorities 4. Minimum number of victims 5. Just return the first one
func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, scheduleErr error) (*v1.Node, []* v1.pod, []* v1.pod, error) {// omit several lines // determine whether the expulsion operation is appropriateif! podEligibleToPreemptOthers(pod, g.cachedNodeInfoMap) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
      returnNil, nil, nil, nil} // allNodes allNodes, err := nodelist.list ()iferr ! = nil {return nil, nil, nil, err
   }
   if len(allNodes) == 0 {
      returnnil, nil, nil, ErrNoNodesAvailable} / / computing potential execution can be used to run after the expulsion of pod nodes potentialNodes: = nodesWherePreemptionMightHelp (allNodes, fitError.FailedPredicates)if len(potentialNodes) == 0 {
      klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
      // In this case, we should clean-up any existing nominated node name of the pod.
      returnNil, nil, []* v1.pod {Pod}, nil} // List PDB objects PDBS, err := g.pdblister.list (allelages.everything ())iferr ! = nil {returnNil, nil, nil, err} // Calculate what pods all nodes need to expel, etc., NodeToVictims, err := selectNodesForPreemption(POD, G.achedNodeinfomap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs)iferr ! = nil {returnNil nil, nil, err} / / expand scheduling logic nodeToVictims, err = g.p rocessPreemptionWithExtenders (pod, nodeToVictims)iferr ! = nil {return// Select 1 node for schedule candidateNode := pickOneNodeForPreemption(nodeToVictims)if candidateNode == nil {
      returnNil, nil, nil, err} Move to the activeQ, allowing the scheduler / / to find another node to the pod nominatedPods: = g.g etLowerPriorityNominatedPods (pod, candidateNode Name)if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok {
      return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err
   }

   return nil, nil, nil, fmt.Errorf(
      "preemption failed: the target node %s has been deleted from scheduler cache",
      candidateNode.Name)
}
Copy the code

4 summarizes

Kubernetes Preempt scheduling process is very difficult to read through, the author spent a lot of time and experience on the code of walking analysis, but this is not the most appropriate form of code presentation, there is more room for optimization.

Focus on big data and container cloud core technology decryption, can provide full stack of big data + cloud native platform consulting solutions, please continue to pay attention to this set of blog. If you have any academic exchange, please feel free to contact me. For more content, please pay attention to the public account of Data Cloud Technology Community.