As the core component of K8S container scheduling, KuBE-Scheduler plays an important role in automatic container scheduling. This article mainly wants to introduce the basic principle of scheduler scheduling from the perspective of code. This paper mainly introduces the latest version of V1.18 when THE author writes this article, which has a large update to the scheduling module. Moreover, starting from K8S V1.15, in order to meet the need of flexibility, K8S introduces the framework mechanism for kube-Scheduler. In order to better meet the requirements of user-defined scheduling rules, many data on the network are outdated, and it is necessary to analyze this part again.

(The following code uses v1.18 as an example)

First, the main function of container scheduling is to schedule Pod to an appropriate node as expected, but this sentence is easy to say, but it is more responsible to implement, need to consider more factors, mainly in the following aspects:

1. Fairness: Decisions should be made fairly during scheduling to ensure that each node has the opportunity to be scheduled.

2. Resource utilization: The result of scheduling can make full use of cluster resources and deploy as many PODS as possible under limited resource conditions.

3. Scheduling efficiency: it can quickly complete the scheduling of a large number of PODS, and will not have a big impact on the scheduling efficiency when the cluster scale increases.

4. Scheduling flexibility: In order to adapt to complex business environment and actual requirements, multiple schedulers should be supported to work in parallel and customized schedulers and scheduling policies should be allowed.

2. As shown in the figure above, the whole Scheduling is divided into two stages: Scheduling and Binding, and each stage is divided into multiple stages, each of which contains several Extension Points.

This article starts with scheduleOne method to analyze the whole process of scheduling a single Pod. This method is also the core function of scheduling logic. It mainly does the following things:

1. Remove the next Pod from the schedulerQueue for scheduling;

2. The profileForPod method obtains the profile of scheduler defined in THE Pod. (Scheduling Profiles is a new concept introduced in V1.18 and is mainly used to configure extension points in different phases of the Scheduler)

3. Call the core method sched.Algorithm.Schedule synchronously to select suitable nodes for Pod, which mainly includes two major processes of pre-selection and optimization, namely filter and score;

4. If no appropriate node is selected, sched.preempt enters the preemption scheduling phase after Pod priority is configured and preemption configuration is enabled, otherwise the scheduling fails and exits.

5. In order to improve the scheduling efficiency, enter the hypothetical binding stage, and write the selected node into the cache as the binding result to realize the asynchronous execution of the real binding;

6. Run the plug-in of reserve extension point, which obtains resource information reserved for Pod before binding. After the actual binding is successful, the PostBind phase will be entered. If errors occur in the remaining stages, the Pod status will be updated in the unreserve stage.

7. Run the plug-in of the permit extension point, which has veto power over the scheduling result at this stage. There are usually three results: A. Approve all plug-ins at this stage agree to the scheduling of this Pod and enter the real binding stage; B. deny Any plug-in rejects this scheduling, then the Pod returns to the original scheduling queue and triggers the unreserve stage; C. Wait (with a timeout) If a plugin replies with a wait, the Pod enters the Pod wait list until approve is performed. If timeout occurs, the Pod is considered deny and the unreserve phase is triggered.

8. Enable the coroutine concurrent processing binding task, first bind the Pod Volume (if not bound)

9. Run the PreBind, Bind, and PostBind plugins separately to perform the binding task. Note that if one of the Bind plugins chooses to handle the Pod, the rest of the Bind plugins will be skipped. After a successful binding, the nodeName nodeName is added to the Pod properties.

/pkg/scheduler/scheduler.go

// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
 
func (sched *Scheduler) scheduleOne(ctx context.Context) {
 
    podInfo := sched.NextPod()
 
    // pod could be nil when schedulerQueue is closed
 
    if podInfo == nil || podInfo.Pod == nil {
 
        return
 
    }
 
    pod := podInfo.Pod
 
    prof, err := sched.profileForPod(pod)
 
    if err != nil {
 
        // This shouldn't happen, because we only accept for scheduling the pods
 
        // which specify a scheduler name that matches one of the profiles.
 
        klog.Error(err)
 
        return
 
    }
 
    if sched.skipPodSchedule(prof, pod) {
 
        return
 
    }
 
 
    klog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)
 
 
    // Synchronously attempt to find a fit for the pod.
 
    start := time.Now()
 
    state := framework.NewCycleState()
 
    state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
 
    schedulingCycleCtx, cancel := context.WithCancel(ctx)
 
    defer cancel()
 
    scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
 
    if err != nil {
 
        // Schedule() may have failed because the pod would not fit on any host, so we try to
 
        // preempt, with the expectation that the next time the pod is tried for scheduling it
 
        // will fit due to the preemption. It is also possible that a different pod will schedule
 
        // into the resources that were preempted, but this is harmless.
 
        if fitError, ok := err.(*core.FitError); ok {
 
            if sched.DisablePreemption {
 
                klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." +
 
                    " No preemption is performed.")
 
            } else {
 
                preemptionStartTime := time.Now()
 
                sched.preempt(schedulingCycleCtx, prof, state, pod, fitError)
 
                metrics.PreemptionAttempts.Inc()
 
                metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime))
 
                metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime))
 
            }
 
            // Pod did not fit anywhere, so it is counted as a failure. If preemption
 
            // succeeds, the pod should get counted as a success the next time we try to
 
            // schedule it. (hopefully)
 
            metrics.PodScheduleFailures.Inc()
 
        } else {
 
            klog.Errorf("error selecting node for pod: %v", err)
 
            metrics.PodScheduleErrors.Inc()
 
        }
 
        sched.recordSchedulingFailure(prof, podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
 
        return
 
    }
 
    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
 
    // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
 
    // This allows us to keep scheduling without waiting on binding to occur.
 
    assumedPodInfo := podInfo.DeepCopy()
 
    assumedPod := assumedPodInfo.Pod
 
 
 
 
    // Assume volumes first before assuming the pod.
 
    //
 
    // If all volumes are completely bound, then allBound is true and binding will be skipped.
 
    //
 
    // Otherwise, binding of volumes is started after the pod is assumed, but before pod binding.
 
    //
 
    // This function modifies 'assumedPod' if volume binding is required.
 
    allBound, err := sched.VolumeBinder.AssumePodVolumes(assumedPod, scheduleResult.SuggestedHost)
 
    if err != nil {
 
        sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError,
 
            fmt.Sprintf("AssumePodVolumes failed: %v", err))
 
        metrics.PodScheduleErrors.Inc()
 
        return
 
    }
 
 
    // Run "reserve" plugins.
 
    if sts := prof.RunReservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
 
        sched.recordSchedulingFailure(prof, assumedPodInfo, sts.AsError(), SchedulerError, sts.Message())
 
        metrics.PodScheduleErrors.Inc()
 
        return
 
    }
 
    // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
 
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
 
    if err != nil {
 
        // This is most probably result of a BUG in retrying logic.
 
        // We report an error here so that pod scheduling can be retried.
 
        // This relies on the fact that Error will check if the pod has been bound
 
        // to a node and if so will not add it back to the unscheduled pods queue
 
        // (otherwise this would cause an infinite loop).
 
        sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("AssumePod failed: %v", err))
 
        metrics.PodScheduleErrors.Inc()
 
        // trigger un-reserve plugins to clean up state associated with the reserved Pod
 
        prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 
        return
 
    }
 
    // Run "permit" plugins.
 
    runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 
    if runPermitStatus.Code() != framework.Wait && !runPermitStatus.IsSuccess() {
 
        var reason string
 
        if runPermitStatus.IsUnschedulable() {
 
            metrics.PodScheduleFailures.Inc()
 
            reason = v1.PodReasonUnschedulable
 
        } else {
 
            metrics.PodScheduleErrors.Inc()
 
            reason = SchedulerError
 
        }
 
        if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
 
            klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
 
        }
 
        // One of the plugins returned status different than success or wait.
 
        prof.RunUnreservePlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 
        sched.recordSchedulingFailure(prof, assumedPodInfo, runPermitStatus.AsError(), reason, runPermitStatus.Message())
 
        return
 
    }
 
    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
 
    go func() {
 
        bindingCycleCtx, cancel := context.WithCancel(ctx)
 
        defer cancel()
 
        metrics.SchedulerGoroutines.WithLabelValues("binding").Inc()
 
        defer metrics.SchedulerGoroutines.WithLabelValues("binding").Dec()
 
 
        waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod)
 
        if !waitOnPermitStatus.IsSuccess() {
 
            var reason string
 
            if waitOnPermitStatus.IsUnschedulable() {
 
                metrics.PodScheduleFailures.Inc()
 
                reason = v1.PodReasonUnschedulable
 
            } else {
 
                metrics.PodScheduleErrors.Inc()
 
                reason = SchedulerError
 
            }
 
            if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
 
                klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
 
            }
 
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
 
            prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 
            sched.recordSchedulingFailure(prof, assumedPodInfo, waitOnPermitStatus.AsError(), reason, waitOnPermitStatus.Message())
 
            return
 
        }
 
 
        // Bind volumes first before Pod
 
        if !allBound {
 
            err := sched.bindVolumes(assumedPod)
 
            if err != nil {
 
                sched.recordSchedulingFailure(prof, assumedPodInfo, err, "VolumeBindingFailed", err.Error())
 
                metrics.PodScheduleErrors.Inc()
 
                // trigger un-reserve plugins to clean up state associated with the reserved Pod
 
                prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 
                return
 
            }
 
        }
 
        // Run "prebind" plugins.
 
        preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 
        if !preBindStatus.IsSuccess() {
 
            var reason string
 
            metrics.PodScheduleErrors.Inc()
 
            reason = SchedulerError
 
            if forgetErr := sched.Cache().ForgetPod(assumedPod); forgetErr != nil {
 
                klog.Errorf("scheduler cache ForgetPod failed: %v", forgetErr)
 
            }
 
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
 
            prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 
            sched.recordSchedulingFailure(prof, assumedPodInfo, preBindStatus.AsError(), reason, preBindStatus.Message())
 
            return
 
        }
 
 
        err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
 
        metrics.E2eSchedulingLatency.Observe(metrics.SinceInSeconds(start))
 
        if err != nil {
 
            metrics.PodScheduleErrors.Inc()
 
            // trigger un-reserve plugins to clean up state associated with the reserved Pod
 
            prof.RunUnreservePlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 
            sched.recordSchedulingFailure(prof, assumedPodInfo, err, SchedulerError, fmt.Sprintf("Binding rejected: %v", err))
 
        } else {
 
            // Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
 
            if klog.V(2) {
 
                klog.Infof("pod %v/%v is bound successfully on node %q, %d nodes evaluated, %d nodes were found feasible.", assumedPod.Namespace, assumedPod.Name, scheduleResult.SuggestedHost, scheduleResult.EvaluatedNodes, scheduleResult.FeasibleNodes)
 
            }
 
 
 
 
            metrics.PodScheduleSuccesses.Inc()
 
            metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
 
            metrics.PodSchedulingDuration.Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
 
 
 
 
            // Run "postbind" plugins.
 
            prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
 
        }
 
    }()
 
}
Copy the code

The following is a description of several core points (sched.algorithm. Schedule scheduling and sched.preempt preemption) in the whole scheduling process:

Third, sched. The Algorithm is the core. ScheduleAlgorithm interface type, the interface definition and implementation method in/PKG/scheduler/core/generic_scheduler. Go.

1. Execute podPassesBasicChecks function to perform the basic check of scheduling, currently mainly checking whether Pod has PVC and if so whether it is available; Run the snapshot() function to take a snapshot of scheduler cache and node info to save the current node status. The snapshot will be used in the following pre-selection and optimization process.

2. Call findNodesThatFitPod method for pre-selection and filter out nodes that meet the conditions;

3. Call the prioritizeNodes method for prioritization and score the nodes resulting from the previous step;

4. SelectHost method is invoked to select the node with the highest score from the scoring node list, and reservoir sampling algorithm is adopted to obtain a random node for the node with the same score to ensure fairness.

/pkg/scheduler/core/generic_scheduler.go

// Schedule tries to schedule the given pod to one of the nodes in the node list. // If it succeeds, it will return the name of the node. // If it fails, it will return a FitError error with reasons. func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name}) defer trace.LogIfLong(100 * time.Millisecond) if err := podPassesBasicChecks(pod, g.pvcLister); err ! = nil { return result, err } trace.Step("Basic checks done") if err := g.snapshot(); err ! = nil { return result, err } trace.Step("Snapshotting scheduler cache and node infos done") if g.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } // Run "prefilter" plugins. startPredicateEvalTime := time.Now() filteredNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) if err ! = nil { return result, err } trace.Step("Computing predicates done") if len(filteredNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, }} metrics.DeprecatedSchedulingAlgorithmPredicateEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPredicateEva lTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PredicateEvaluation).Observe(metrics.SinceInSeconds(startPr edicateEvalTime)) startPriorityEvalTime := time.Now() // When only one node after predicate, just use it. if len(filteredNodes) == 1 { metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalT ime)) return ScheduleResult{ SuggestedHost: filteredNodes[0].Name, EvaluatedNodes: 1 + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, filteredNodes) if err ! = nil { return result, err } metrics.DeprecatedSchedulingAlgorithmPriorityEvaluationSecondsDuration.Observe(metrics.SinceInSeconds(startPriorityEvalT ime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PriorityEvaluation).Observe(metrics.SinceInSeconds(startPri orityEvalTime)) host, err := g.selectHost(priorityList) trace.Step("Prioritizing done") return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(filteredNodes) + len(filteredNodesStatuses), FeasibleNodes: len(filteredNodes), }, err }Copy the code

The core code above is pre-selection and optimization, and these two parts are closely related to the Scheduler Framework, which are further explained below:

The findNodesThatFitPod method consists of three steps: Pre filtering Pre – Filter (RunPreFilterPlugins), the Filter plugin filters (findNodesThatPassFilters), Extender plug-in Filter (findNodesThatPassExtenders), At the heart of this is the Filter plugin filtering, which is the findNodesThatPassFilters method.

/pkg/scheduler/core/generic_scheduler.go

// Filters the nodes to find the ones that fit the pod based on the framework // filter plugins and filter extenders. func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.NodeToStatusMap, error) { s := prof.RunPreFilterPlugins(ctx, state, pod) if ! s.IsSuccess() { return nil, nil, s.AsError() } filteredNodesStatuses := make(framework.NodeToStatusMap) filtered, err := g.findNodesThatPassFilters(ctx, prof, state, pod, filteredNodesStatuses) if err ! = nil { return nil, nil, err } filtered, err = g.findNodesThatPassExtenders(pod, filtered, filteredNodesStatuses) if err ! = nil { return nil, nil, err } return filtered, filteredNodesStatuses, nil }Copy the code

FindNodesThatPassFilters the findNodesThatPassFilters method is used to pre-select scheduling by filtering nodes that meet Pod requirements through a series of filters. The return value of the findNodesThatPassFilters method is a list of filtered nodes.

The pre-selection process is as follows:

  1. Obtain information about all nodes in the cluster using the previous snapshot.

  2. NumFeasibleNodesToFind is called to obtain the number of viable nodes from the nodes obtained above. This is mainly to meet the performance problems that may be caused by the next optimization (scoring) process in the case of large clusters (more than 100 nodes). With this mechanism, The scheduler stops searching for the remaining nodes in the cluster when it finds enough viable nodes (beyond the return value of numFeasibleNodesToFind). The sampling algorithm is not described in detail due to space reasons.

  3. The checkNode function is executed concurrently until the number of viable nodes defined in the previous step is selected. The podPassesFiltersOnNode method in this function, which uses a series of Filter plug-ins to Filter the current Pod, is also at the heart of the preselection process.

  4. The list of nodes that meet the pre-selected conditions is displayed.

/pkg/scheduler/core/generic_scheduler.go

// findNodesThatPassFilters finds the nodes that fit the filter plugins. func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) { allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err ! = nil { return nil, err } numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes))) // Create filtered list with enough space to avoid growing it // and allow assigning. filtered := make([]*v1.Node, numNodesToFind) if ! prof.HasFilterPlugins() { for i := range filtered { filtered[i] = allNodes[i].Node() } g.nextStartNodeIndex = (g.nextStartNodeIndex + len(filtered)) % len(allNodes) return filtered, nil } errCh := parallelize.NewErrorChannel() var statusesLock sync.Mutex var filteredLen int32 ctx, cancel := context.WithCancel(ctx) checkNode := func(i int) { // We check the nodes starting from where we left off in the previous scheduling cycle, // this is to make sure all nodes have the same chance of being examined across pods. nodeInfo := allNodes[(g.nextStartNodeIndex+i)%len(allNodes)] fits, status, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo) if err ! = nil { errCh.SendErrorWithCancel(err, cancel) return } if fits { length := atomic.AddInt32(&filteredLen, 1) if length > numNodesToFind { cancel() atomic.AddInt32(&filteredLen, -1) } else { filtered[length-1] = nodeInfo.Node() } } else { statusesLock.Lock() if ! status.IsSuccess() { statuses[nodeInfo.Node().Name] = status } statusesLock.Unlock() } } beginCheckNode := time.Now() statusCode := framework.Success defer func() { // We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins // function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle. // Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod. metrics.FrameworkExtensionPointDuration.WithLabelValues(framework.Filter, statusCode.String()).Observe(metrics.SinceInSeconds(beginCheckNode)) }() // Stops searching for more nodes once the configured number of feasible nodes // are found. parallelize.Until(ctx, len(allNodes), checkNode) processedNodes := int(filteredLen) + len(statuses) g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes) filtered = filtered[:filteredLen] if err := errCh.ReceiveError(); err ! = nil { statusCode = framework.Error return nil, err } return filtered, nil }Copy the code

The podPassesFiltersOnNode method has a long comment, mainly to describe Pod priority enabled and preempt occurred.

When Pod priority is enabled, a Pod with a higher priority usually gets the opportunity to schedule first in the scheduling queue. If the Pod with a higher priority finds that all nodes do not meet the requirements, the preemption logic will be triggered, that is, some pods with a lower priority will be expelled from the nodes. The expelled pods go into a graceful end-of-life phase, and the high-priority Pod is defined as the Pod being elected, and the nodes being selected are called the nodes being elected. However, it is important to note that the elected node may not be the node on which the Pod will be scheduled. For example, if other node resources are released during the termination process of the expelled Pod, the Pod may also be scheduled to other nodes.

The podPassesFiltersOnNode method is used to check whether a node meets Pod requirements through a series of plug-ins. This function is called in two cases:

1. Schedule: When scheduling occurs, it evaluates whether it can be scheduled according to the current Pods in the node and the Pods and nodes with higher priority elected to this node;

2. Preempt: When Preempt occurs, the expelled Pods are removed and the Pods elected to this node are added.

Since elected Pods are not necessarily scheduled to the elected node, this function is executed twice during evaluation: the first time it filters with the elected Pods information; The second time will remove the selected Pods information for filtering; The second filter is added to prevent scheduled Pods from meeting the affinity requirement with the elected Pods under the influence of the filtering plug-in of some Pod affinity rules. If the elected Pods are scheduled to another node, the scheduled Pods do not meet the affinity filtering conditions, resulting in exceptions.

/pkg/scheduler/core/generic_scheduler.go

// podPassesFiltersOnNode checks whether a node given by NodeInfo satisfies the // filter plugins. // This function is called from two different places: Schedule and Preempt. // When it is called from Schedule, we want to test whether the pod is // schedulable on the node with all the existing pods on the node plus higher // and equal priority pods nominated to run on the node. // When it is called from Preempt, we should remove the victims of preemption // and add the nominated pods. Removal of the victims is done by // SelectVictimsOnNode(). Preempt removes victims from PreFilter state and // NodeInfo before calling this function. func (g *genericScheduler) podPassesFiltersOnNode( ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo, ) (bool, *framework.Status, error) { var status *framework.Status podsAdded := false // We run filters twice in some cases. If the node has greater or equal priority // nominated pods, we run them when those pods are added to PreFilter state and nodeInfo. // If all filters succeed in this pass, we run them again when these // nominated pods are not added. This second pass is necessary because some // filters such  as inter-pod affinity may not pass without the nominated pods. // If there are no nominated pods for the node or if the  first run of the // filters fail, we don't run the second pass. // We consider only equal or higher priority pods in the first pass, because // those are the current "pod" must yield to them and not take a space opened // for running them. It is ok if the current "pod" take resources freed for // lower priority pods. // Requiring that the new pod is schedulable in both circumstances ensures that // we are making a conservative decision: filters like resources and inter-pod // anti-affinity are more likely to fail when the nominated pods are treated // as running, while filters like pod affinity are more likely to fail when // the nominated pods are treated as not running. We can't just assume the // nominated pods are running because they are not running right now and in fact, // they may end up getting scheduled to a different node. for i := 0; i < 2; i++ { stateToUse := state nodeInfoToUse := info if i == 0 { var err error podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, prof, pod, state, info) if err ! = nil { return false, nil, err } } else if ! podsAdded || ! status.IsSuccess() { break } statusMap := prof.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse) status = statusMap.Merge() if ! status.IsSuccess() && ! status.IsUnschedulable() { return false, status, status.AsError() } } return status.IsSuccess(), status, nil }Copy the code

The core of the podPassesFiltersOnNode method is Prof.runfilterPlugins, which have been abstracted into the Scheduler. framework.

RunFilterPlugins check each filter plugin for a given node for the Pod that needs to be scheduled, and if any plugin reports that it does not meet the criteria, then the entire node does not meet the criteria.

The specific implementation of this part involves a lot of knowledge about the Scheduler Framework. The reason for this part is not detailed enough. Simply speaking, the filter Plugin defined needs to realize the interface of filter.

/pkg/scheduler/framework/v1alpha1/framework.go

// RunFilterPlugins runs the set of configured Filter plugins for pod on // the given node. If any of these plugins doesn't return "Success", the // given node is not suitable for running pod. // Meanwhile, the failure message and status are set for the given node. func (f *framework) RunFilterPlugins( ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *NodeInfo, ) PluginToStatus { var firstFailedStatus *Status statuses := make(PluginToStatus) for _, pl := range f.filterPlugins { pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) if len(statuses) == 0 { firstFailedStatus = pluginStatus } if ! pluginStatus.IsSuccess() { if ! pluginStatus.IsUnschedulable() { // Filter plugins are not supposed to return any status other than // Success or Unschedulable. firstFailedStatus = NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message())) return map[string]*Status{pl.Name(): firstFailedStatus} } statuses[pl.Name()] = pluginStatus if ! f.runAllFilters { // Exit early if we don't need to run all filters. return statuses } } } return statuses }Copy the code

Port conflicts filter plug-in implementation is very simple, to get Pod need to put on the first node port information, to a given node is already in use on the port information, if found the same port, port, are thought to trigger the conflict problem, the filter plugin will return failure, leading to the Pod cannot be deployed on this node.

/pkg/scheduler/framework/plugins/nodeports/node_ports.go

// Filter invoked at the filter extension point. func (pl *NodePorts) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { wantPorts, err := getPreFilterState(cycleState) if err ! = nil { return framework.NewStatus(framework.Error, err.Error()) } fits := fitsPorts(wantPorts, nodeInfo) if ! fits { return framework.NewStatus(framework.Unschedulable, ErrReason) } return nil } func fitsPorts(wantPorts []*v1.ContainerPort, nodeInfo *framework.NodeInfo) bool { // try to see whether existingPorts and wantPorts will conflict or not existingPorts := nodeInfo.UsedPorts for _, cp := range wantPorts { if existingPorts.CheckConflict(cp.HostIP, string(cp.Protocol), cp.HostPort) { return false } } return true }Copy the code

After the above pre-selection process is completed, the optimization process is entered.

The main task of the optimization process is to score feasible nodes selected in the pre-selection process through score plugins, and then add up (calculate weights) and standardize the scores of all plug-ins. Finally, a slice of NodeScoreList is returned. Each element of the slice is a structure containing node name and score.

The main method of prioritizing is prioritizeNodes, which takes the following steps:

1. If no priority is configured, that is, no scoring plug-in is configured, the score of all nodes is set to 1;

2. Run the pre-scoring plug-in to perform the pre-scoring operation.

3. Run scoring plug-in to score;

4. Score summary, that is, add up the scores of all plug-ins for each node;

5. Do the same for extenders extensions. However, I feel that this step is not well written. The current code places extenders logic in the prioritizeNodes methods, which is not very well structured.

/pkg/scheduler/core/generic_scheduler.go

// prioritizeNodes prioritizes the nodes by running the score plugins, // which return a score for each node from the call to RunScorePlugins(). // The scores from each plugin are added together to make the score for that node, then // any extenders are run as well. // All scores are finally combined (added) to get the total weighted scores of all nodes func (g *genericScheduler) prioritizeNodes( ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, nodes []*v1.Node, ) (framework.NodeScoreList, error) { // If no priority configs are provided, then all nodes will have a score of one. // This is required to generate the priority list in the required format if len(g.extenders) == 0 && ! prof.HasScorePlugins() { result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{ Name: nodes[i].Name, Score: 1, }) } return result, nil } // Run PreScore plugins. preScoreStatus := prof.RunPreScorePlugins(ctx, state, pod, nodes) if ! preScoreStatus.IsSuccess() { return nil, preScoreStatus.AsError() } // Run the Score plugins. scoresMap, scoreStatus := prof.RunScorePlugins(ctx, state, pod, nodes) if ! scoreStatus.IsSuccess() { return nil, scoreStatus.AsError() } if klog.V(10) { for plugin, nodeScoreList := range scoresMap { klog.Infof("Plugin %s scores on %v/%v => %v", plugin, pod.Namespace, pod.Name, nodeScoreList) } } // Summarize all scores. result := make(framework.NodeScoreList, 0, len(nodes)) for i := range nodes { result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0}) for j := range scoresMap { result[i].Score += scoresMap[j][i].Score } } if len(g.extenders) ! = 0 && nodes ! = nil { var mu sync.Mutex var wg sync.WaitGroup combinedScores := make(map[string]int64, len(nodes)) for i := range g.extenders { if ! g.extenders[i].IsInterested(pod) { continue } wg.Add(1) go func(extIndex int) { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Inc() defer func() { metrics.SchedulerGoroutines.WithLabelValues("prioritizing_extender").Dec() wg.Done() }() prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes) if err ! = nil { // Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities return } mu.Lock() for i := range *prioritizedList { host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score if klog.V(10) { klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), host, g.extenders[extIndex].Name(), score) } combinedScores[host] += score * weight } mu.Unlock() }(i) } // wait for all go routines to finish wg.Wait() for  i := range result { // MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore, // therefore we need to scale the score returned by extenders to the score range used by the scheduler. result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority) } } if klog.V(10) { for i := range result { klog.Infof("Host %s => Score %d", result[i].Name, result[i].Score) } } return result, nil }Copy the code

RunScorePlugins(CTX Context.Context, state *CycleState, POD * v1.pod, Nodes []* v1.node) (PluginToNodeScores, *Status), which is itself a method defined in the Framework interface, runs a configured set of scoring plugins for each Node and returns a map with the scoring plug-in name as the key. Values are the corresponding node scores list.

The RunScorePlugins method does several things:

1. Score each node in parallel (at most 16 coroutines are enabled), that is, call the Score method of a series of scoring plug-ins for each node;

2. Score standardization is carried out for each scoring plug-in in parallel, essentially calling NormalizeScore method of scoring plug-in;

3. Recalculate the score of each scoring plug-in in parallel according to the weight of the configured plug-in (score multiplied by weight);

/pkg/scheduler/framework/v1alpha1/framework.go

// RunScorePlugins runs the set of configured scoring plugins. It returns a list that // stores for each scoring plugin name the corresponding NodeScoreList(s). // It also returns *Status, which is set to non-success if any of the plugins returns // a non-success status. func (f *framework) RunScorePlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodes []*v1.Node) (ps PluginToNodeScores, status *Status) { startTime := time.Now() defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(score, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) }() pluginToNodeScores := make(PluginToNodeScores, len(f.scorePlugins)) for _, pl := range f.scorePlugins { pluginToNodeScores[pl.Name()] = make(NodeScoreList, len(nodes)) } ctx, cancel := context.WithCancel(ctx) errCh := parallelize.NewErrorChannel() // Run Score method for each node in parallel. parallelize.Until(ctx, len(nodes), func(index int) { for _, pl := range f.scorePlugins { nodeName := nodes[index].Name s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName) if ! status.IsSuccess() { errCh.SendErrorWithCancel(fmt.Errorf(status.Message()), cancel) return } pluginToNodeScores[pl.Name()][index] = NodeScore{ Name: nodeName, Score: int64(s), } } }) if err := errCh.ReceiveError(); err ! = nil { msg := fmt.Sprintf("error while running score plugin for pod %q: %v", pod.Name, err) klog.Error(msg) return nil, NewStatus(Error, msg) } // Run NormalizeScore method for each ScorePlugin in parallel. parallelize.Until(ctx, len(f.scorePlugins), func(index int) { pl := f.scorePlugins[index] nodeScoreList := pluginToNodeScores[pl.Name()] if pl.ScoreExtensions() == nil { return } status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList) if ! status.IsSuccess() { err := fmt.Errorf("normalize score plugin %q failed with error %v", pl.Name(), status.Message()) errCh.SendErrorWithCancel(err, cancel) return } }) if err := errCh.ReceiveError(); err ! = nil { msg := fmt.Sprintf("error while running normalize score plugin for pod %q: %v", pod.Name, err) klog.Error(msg) return nil, NewStatus(Error, msg) } // Apply score defaultWeights for each ScorePlugin in parallel. parallelize.Until(ctx, len(f.scorePlugins), func(index int) { pl := f.scorePlugins[index] // Score plugins' weight has been checked when they are initialized. weight := f.pluginNameToWeightMap[pl.Name()] nodeScoreList := pluginToNodeScores[pl.Name()] for i, nodeScore := range nodeScoreList { // return error if score plugin returns invalid score. if nodeScore.Score > int64(MaxNodeScore) || nodeScore.Score < int64(MinNodeScore) { err := fmt.Errorf("score plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), nodeScore.Score, MinNodeScore, MaxNodeScore) errCh.SendErrorWithCancel(err, cancel) return } nodeScoreList[i].Score = nodeScore.Score * int64(weight) } }) if err := errCh.ReceiveError(); err ! = nil { msg := fmt.Sprintf("error while applying score defaultWeights for pod %q: %v", pod.Name, err) klog.Error(msg) return nil, NewStatus(Error, msg) } return pluginToNodeScores, nil }Copy the code

The following illustrates the simple imagelocality scoring plug-in. The main function of this plug-in is to check the local existence of the Image required by Pod on the node. The larger the Image file size is and the more widely distributed in the cluster nodes, the higher the score of this plug-in on the node. This indicates that scheduler prefers to schedule pods to nodes that do not need to download the image.

/pkg/scheduler/framework/plugins/imagelocality/image_locality.go

// Score invoked at the score extension point. func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) { nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err ! = nil { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } nodeInfos, err := pl.handle.SnapshotSharedLister().NodeInfos().List() if err ! = nil { return 0, framework.NewStatus(framework.Error, err.Error()) } totalNumNodes := len(nodeInfos) score := calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, totalNumNodes)) return score, nil } // sumImageScores returns the sum of image scores of all the containers that are already on the node. // Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate // the final score. Note that the init containers  are not considered for it's rare for users to deploy huge init containers. func sumImageScores(nodeInfo *framework.NodeInfo, containers []v1.Container, totalNumNodes int) int64 { var sum int64 for _, container := range containers { if state, ok := nodeInfo.ImageStates[normalizedImageName(container.Image)]; ok { sum += scaledImageScore(state, totalNumNodes) } } return sum } // scaledImageScore returns an adaptively scaled score for the given state of an image. // The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to. // This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or // a few nodes due to image locality. func scaledImageScore(imageState *framework.ImageStateSummary, totalNumNodes int) int64 { spread := float64(imageState.NumNodes) / float64(totalNumNodes) return int64(float64(imageState.Size) * spread) } // calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's // priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores. func calculatePriority(sumScores int64) int64 { if sumScores < minThreshold { sumScores = minThreshold } else if sumScores > maxThreshold { sumScores = maxThreshold } return int64(framework.MaxNodeScore) * (sumScores - minThreshold) / (maxThreshold - minThreshold) }Copy the code

Sched. preempt preempt strategy

Preemption strategy means that when no suitable nodes (such as cluster resource pressure) are found after the pre-selection and optimization mentioned above, i.e. scheduling fails, if possible (with Pod priority enabled), the Pod will preempt the node where the Pods of lower priority are located. If the preemption succeeds, these lower-priority Pods (called victims at this time) will be gracefully terminated and evicted, and Pods will be despatched to the node where the preemption occurred.

Generally speaking, if Pod priority is enabled, preemption is enabled by default, and it is not recommended to disable preemption (disabling the preemption will result in the failure of scheduling high-priority key PODS). You can manually disable the preemption if necessary.

The preemption process is as follows:

1. Update the PREemption Pod information (known as preemptor preemptor);

2. Call sched.algorithm. Preempt to execute the preemption logic, which returns the preempted node, the preempted Pods (victim), and the list of Pods to clear NominatedNodeName (preempted node name);

3. Update scheduler cache information to associate the nominated Pod (preemptor) with Node;

4. Delete preempted Pods (victim);

5. Delete the NominatedNodeName field of the preempted Pods.

/pkg/scheduler/scheduler.go

// preempt tries to create room for a pod that has failed to schedule, by preempting lower priority pods if possible. // If it succeeds, it adds the name of the node where preemption has happened to the pod spec. // It returns the node name and an error if any. func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, preemptor *v1.Pod, scheduleErr error) (string, error) { preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor) if err ! = nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err } node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr) if err ! = nil { klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) return "", err } var nodeName = "" if node ! = nil { nodeName = node.Name // Update the scheduling queue with the nominated pod information. Without // this, there would be a race condition between the next scheduling cycle // and the time the scheduler receives a Pod Update for the nominated pod. sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName) // Make a call to update nominated node name of the pod on the API server. err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName) if err ! = nil { klog.Errorf("Error in preemption process. Cannot set 'NominatedNodeName' on pod %v/%v: %v", preemptor.Namespace, preemptor.Name, err) sched.SchedulingQueue.DeleteNominatedPodIfExists(preemptor) return "", err } for _, victim := range victims { if err := sched.podPreemptor.deletePod(victim); err ! = nil { klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) return "", err } // If the victim is a WaitingPod, send a reject message to the PermitPlugin if waitingPod := prof.GetWaitingPod(victim.UID); waitingPod ! = nil { waitingPod.Reject("preempted") } prof.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) } metrics.PreemptionVictims.Observe(float64(len(victims))) } // Clearing nominated pods should happen outside of "if node ! = nil". Node could // be nil when a pod with nominated node name is eligible to preempt again, // but preemption logic does not find any node for it. In that case Preempt() // function of generic_scheduler.go returns the pod itself for removal of // the 'NominatedNodeName' field. for _, p := range nominatedPodsToClear { rErr := sched.podPreemptor.removeNominatedNodeName(p) if rErr ! = nil { klog.Errorf("Cannot remove 'NominatedNodeName' field of pod: %v", rErr) // We do not return as this error is not critical. } } return nodeName, err }Copy the code

The Preempt logic above is implemented by the Preempt method in Generic_scheduler. The preemptor is placed back in the scheduling queue behind Pods of the same priority. The scheduler reserves the preempted resource in the cache for the preemptor. However, if a Pod of higher priority preempts the node of the previous preemptor, the preemptor also needs to clear its NominatedNodeName. Leave the resources to the higher priority Pod.

/pkg/scheduler/core/generic_scheduler.go

// preempt finds nodes with pods that can be preempted to make room for "pod" to // schedule. It chooses one of the nodes and preempts the pods on the node and // returns 1) the node, 2) the list of preempted pods if such a node is found, // 3) A list of pods whose nominated node name should be cleared, and 4) any // possible error. // Preempt does not update its snapshot. It uses the same snapshot used in the // scheduling cycle. This is to avoid a scenario where preempt finds feasible // nodes without preempting any pod. When there are many pending pods in the // scheduling queue a nominated pod will go back to the queue and behind // other pods with the same priority. The nominated pod prevents other pods from // using the nominated resources and the nominated pod could take a long time // before it is retried after many other pending pods. func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) { // Scheduler may return various types of errors. Consider preemption only if // the error is of type FitError. fitError, ok := scheduleErr.(*FitError) if ! ok || fitError == nil { return nil, nil, nil, nil } if ! podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos(), g.enableNonPreempting) { klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name) return nil, nil, nil, nil } allNodes, err := g.nodeInfoSnapshot.NodeInfos().List() if err ! = nil { return nil, nil, nil, err } if len(allNodes) == 0 { return nil, nil, nil, ErrNoNodesAvailable } potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError) if len(potentialNodes) == 0 { klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name) // In this case, we should clean-up any existing nominated node name of the pod. return nil, nil, []*v1.Pod{pod}, nil } var pdbs []*policy.PodDisruptionBudget if g.pdbLister ! = nil { pdbs, err = g.pdbLister.List(labels.Everything()) if err ! = nil { return nil, nil, nil, err } } nodeToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs) if err ! = nil { return nil, nil, nil, err } // We will only check nodeToVictims with extenders that support preemption. // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims) if err ! = nil { return nil, nil, nil, err } candidateNode := pickOneNodeForPreemption(nodeToVictims) if candidateNode == nil { return nil, nil, nil, nil } // Lower priority pods nominated to run on this node, may no longer fit on // this node. So, we should remove their nomination. Removing their // nomination updates these pods and moves them to the active queue. It // lets scheduler find another place for them. nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) return candidateNode, nodeToVictims[candidateNode].Pods, nominatedPods, nil }Copy the code

At the heart of the preemption logic is the selectNodesForPreemption function of the above code, which walks through all nodes in parallel and selects a possible series of nodes and their victims. It returns a map with the node name as its key and the Pods (victims) that need to be terminated at that node.

After a possible node is selected, the preemption logic calls the pickOneNodeForPreemption function to select the most suitable one from the list of nodes according to certain rules and return it as the preemption node.

/pkg/scheduler/core/generic_scheduler.go

// selectNodesForPreemption finds all the nodes with possible victims for
 
// preemption in parallel.
 
func (g *genericScheduler) selectNodesForPreemption(
 
    ctx context.Context,
 
    prof *profile.Profile,
 
    state *framework.CycleState,
 
    pod *v1.Pod,
 
    potentialNodes []*framework.NodeInfo,
 
    pdbs []*policy.PodDisruptionBudget,
 
) (map[*v1.Node]*extenderv1.Victims, error) {
 
    nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
 
    var resultLock sync.Mutex
 
 
    checkNode := func(i int) {
 
        nodeInfoCopy := potentialNodes[i].Clone()
 
        stateCopy := state.Clone()
 
        pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, prof, stateCopy, pod, nodeInfoCopy, pdbs)
 
        if fits {
 
            resultLock.Lock()
 
            victims := extenderv1.Victims{
 
                Pods:             pods,
 
                NumPDBViolations: int64(numPDBViolations),
 
            }
 
            nodeToVictims[potentialNodes[i].Node()] = &victims
 
            resultLock.Unlock()
 
        }
 
    }
 
    parallelize.Until(ctx, len(potentialNodes), checkNode)
 
    return nodeToVictims, nil
 
}
Copy the code

Scheduler also has some strategic control over the selection of nodes and their victims, with the core principle of having minimal impact on existing running Pods.

SelectVictimsOnNode() calls selectsonNode () to victimsonNode () to receive victims Pod from each node. First, remove all low-priority pods from the victimsonNode to try to determine whether the preemptor can dispatch successfully. If so, Then, based on whether THE POD has PDB or not, it is divided into two groups of violatingVictims and nonViolatingVictims, and then the POD of each group is sorted according to the priority. The PDB(Pod Interrupt budget) is an object that Kubernetes guarantees copies are highly available.

Victims of violatingVictims(with PDB) first “delete” pods from a group, and record the number of deleted PDB pods. They then “delete” a POD from the nonViolatingVictims group. Each time they “delete” a POD, they check to see if the preemptor can run on the pod. If the preselection policy fails, the object does not meet the preemption requirement. Continue to “delete” POD and add it to victims until “delete” enough PODS to meet the preemption requirement. Finally, return victims and delete the number of PODS with PDB.

/pkg/scheduler/core/generic_scheduler.go

// selectVictimsOnNode finds minimum set of pods on the given node that should // be preempted in order to make enough room for "pod" to be scheduled. The // minimum set selected is subject to the constraint that a higher-priority pod // is never preempted when a lower-priority pod could be (higher/lower relative // to one another, not relative to the preemptor "pod"). // The algorithm first checks if the pod can be scheduled on the node when all the  // lower priority pods are gone. If so, it sorts all the lower priority pods by // their priority and then puts them into two groups of those whose PodDisruptionBudget // will be violated if preempted and other non-violating pods. Both groups are // sorted by priority. It first tries to reprieve as many PDB violating pods as // possible and then does them same for non-PDB-violating pods while checking // that the "pod" can still fit on the node. // NOTE: This function assumes that it is never called if "pod" cannot be scheduled // due to pod affinity, node affinity, or node anti-affinity reasons. None of // these predicates can be satisfied by removing more pods from the node. func (g  *genericScheduler) selectVictimsOnNode( ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget, ) ([]*v1.Pod, int, bool) { var potentialVictims []*v1.Pod removePod := func(rp *v1.Pod) error { if err := nodeInfo.RemovePod(rp); err ! = nil { return err } status := prof.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo) if ! status.IsSuccess() { return status.AsError() } return nil } addPod := func(ap *v1.Pod) error { nodeInfo.AddPod(ap) status := prof.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo) if ! status.IsSuccess() { return status.AsError() } return nil } // As the first step, remove all the lower priority pods from the node and // check if the given pod can be scheduled. podPriority := podutil.GetPodPriority(pod) for _, p := range nodeInfo.Pods { if podutil.GetPodPriority(p.Pod) < podPriority { potentialVictims = append(potentialVictims, p.Pod) if err := removePod(p.Pod); err ! = nil { return nil, 0, false } } } // If the new pod does not fit after removing all the lower priority pods, // we are almost done and this node is not suitable for preemption. The only // condition that we could check is if the "pod" is failing to schedule due to // inter-pod affinity to one or more victims, but we have decided not to // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. if fits, _, err := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo); ! fits { if err ! = nil { klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err) } return nil, 0, false } var victims []*v1.Pod numViolatingVictim := 0 sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) }) // Try to reprieve as many pods as possible. We first try to reprieve the PDB // violating victims and then other non-violating ones. In both cases, we start // from the highest priority victims. violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs) reprievePod := func(p *v1.Pod) (bool, error) { if err := addPod(p); err ! = nil { return false, err } fits, _, _ := g.podPassesFiltersOnNode(ctx, prof, state, pod, nodeInfo) if ! fits { if err := removePod(p); err ! = nil { return false, err } victims = append(victims, p) klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name) } return fits, nil } for _, p := range violatingVictims { if fits, err := reprievePod(p); err ! = nil { klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err) return nil, 0, false } else if ! fits { numViolatingVictim++ } } // Now we try to reprieve non-violating victims. for _, p := range nonViolatingVictims { if _, err := reprievePod(p); err ! = nil { klog.Warningf("Failed to reprieve pod %q: %v", p.Name, err) return nil, 0, false } } return victims, numViolatingVictim, true }Copy the code

Finally, the pickOneNodeForPreemption() function is used to select the best node to be the preemptor’s node based on six principles:

  • PDB violations are the nodes with the smallest values;
  • Select nodes with high priority and few priorities;
  • The priority input of all victims on each node is accumulated, and the smallest is selected.
  • If the sum of priorities of multiple nodes is the same, the node with the minimum number of victims is selected.
  • If the sum of the priorities of multiple nodes is equal, the one with the highest priority and the shortest POD running time is selected.
  • If multiple nodes are still selected according to the above policy, the first node is returned directly.

/pkg/scheduler/core/generic_scheduler.go

func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *v1.Node { if len(nodesToVictims) == 0 { return nil } minNumPDBViolatingPods := int64(math.MaxInt32) var minNodes1 []*v1.Node lenNodes1 := 0 for node, victims := range nodesToVictims { if len(victims.Pods) == 0 { // We found a node that doesn't need any preemption. Return it! // This should happen rarely when one or more pods are terminated between // the time that scheduler tries to schedule the pod and the time that // preemption logic tries to find nodes for preemption. return node } numPDBViolatingPods := victims.NumPDBViolations if numPDBViolatingPods < minNumPDBViolatingPods { minNumPDBViolatingPods = numPDBViolatingPods minNodes1 = nil lenNodes1 = 0 } if numPDBViolatingPods == minNumPDBViolatingPods { minNodes1 = append(minNodes1, node) lenNodes1++ } } if lenNodes1 == 1 { return minNodes1[0] } // There are more than one node with minimum number PDB violating pods. Find // the one with minimum highest priority victim. minHighestPriority := int32(math.MaxInt32) var minNodes2 = make([]*v1.Node, lenNodes1) lenNodes2 := 0 for i := 0; i < lenNodes1; i++ { node := minNodes1[i] victims := nodesToVictims[node] // highestPodPriority is the highest priority among the victims on this node. highestPodPriority := podutil.GetPodPriority(victims.Pods[0]) if highestPodPriority < minHighestPriority { minHighestPriority = highestPodPriority lenNodes2 = 0 } if highestPodPriority == minHighestPriority  { minNodes2[lenNodes2] = node lenNodes2++ } } if lenNodes2 == 1 { return minNodes2[0] } // There are a few nodes with minimum highest priority victim. Find the // smallest sum of priorities. minSumPriorities := int64(math.MaxInt64) lenNodes1 = 0 for i := 0; i < lenNodes2; i++ { var sumPriorities int64 node := minNodes2[i] for _, pod := range nodesToVictims[node].Pods { // We add MaxInt32+1 to all priorities to make all of them >= 0. This is // needed so that a node with a few pods with negative priority is not // picked over a node with a smaller number of pods with the same negative // priority (and similar scenarios). sumPriorities += int64(podutil.GetPodPriority(pod)) + int64(math.MaxInt32+1) } if sumPriorities < minSumPriorities { minSumPriorities = sumPriorities lenNodes1 = 0 } if sumPriorities == minSumPriorities { minNodes1[lenNodes1] = node lenNodes1++ } } if lenNodes1 == 1 { return minNodes1[0] } // There are a few nodes with minimum highest priority victim and sum of priorities. // Find one with the minimum number of pods. minNumPods := math.MaxInt32 lenNodes2 = 0 for i := 0; i < lenNodes1; i++ { node := minNodes1[i] numPods := len(nodesToVictims[node].Pods) if numPods < minNumPods { minNumPods = numPods lenNodes2 = 0 } if numPods == minNumPods { minNodes2[lenNodes2] = node lenNodes2++ } } if lenNodes2 == 1 { return minNodes2[0] } // There are a few nodes with same number of pods. // Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node)) latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]]) if latestStartTime == nil { // If the earliest start time of all pods on the 1st node is nil, just return it, // which is not expected to happen. klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0]) return minNodes2[0] } nodeToReturn := minNodes2[0] for i := 1; i < lenNodes2; i++ { node := minNodes2[i] // Get earliest start time of all pods on the current node. earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node]) if earliestStartTimeOnNode == nil { klog.Errorf("earliestStartTime is  nil for node %s. Should not reach here.", node) continue } if earliestStartTimeOnNode.After(latestStartTime.Time) { latestStartTime = earliestStartTimeOnNode nodeToReturn = node } } return nodeToReturn }Copy the code