调度流程

调度过程主要分为三步:

  • schedule():执行predicate调度算法和priority调度算法,对选出的node按得分大小排序,返回得分最高的nodeName
  • assume():如果调度成功则将pod写入本地缓存schedulerCache,同时对本地缓冲中node资源占用进行更新,以便下次调度时以此最新的缓存作为参考。
  • bind():异步将pod的Binding信息写入到kube-apiserver中。一次调度完成。 scheduler-flowchart
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }

    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}
......
// scheduleOne does the entire scheduling workflow for a single pod.  It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne() {
    // NextPod()如果podQueue为空则会一直阻塞直到有pod可以pop
    pod := sched.config.NextPod()
    if pod.DeletionTimestamp != nil {
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        glog.V(3).Infof("Skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
        return
    }

    glog.V(3).Infof("Attempting to schedule pod: %v/%v", pod.Namespace, pod.Name)

    // Synchronously attempt to find a fit for the pod.
    start := time.Now()
    suggestedHost, err := sched.schedule(pod)
    metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInMicroseconds(start))
    if err != nil {
        return
    }
    // 调度算法执行完就马上向cache中写入pod,这样的好处在于当调度大量pod的时候不用等待pod已经在node上运行,下次调度算法就可以算上之前这个pod。
    // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
    // This allows us to keep scheduling without waiting on binding to occur.
    err = sched.assume(pod, suggestedHost)
    if err != nil {
        return
    }

    // bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
    go func() {
        err := sched.bind(pod, &v1.Binding{
            ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},
            Target: v1.ObjectReference{
                Kind: "Node",
                Name: suggestedHost,
            },
        })
        metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
        if err != nil {
            glog.Errorf("Internal error binding pod: (%v)", err)
        }
    }()
}

1. schedule()

// schedule implements the scheduling algorithm and returns the suggested host.
func (sched *Scheduler) schedule(pod *v1.Pod) (string, error) {
    host, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister)
    if err != nil {
        glog.V(1).Infof("Failed to schedule pod: %v/%v", pod.Namespace, pod.Name)
        copied, cerr := api.Scheme.Copy(pod)
        if cerr != nil {
            runtime.HandleError(err)
            return "", err
        }
        pod = copied.(*v1.Pod)
        sched.config.Error(pod, err)
        sched.config.Recorder.Eventf(pod, v1.EventTypeWarning, "FailedScheduling", "%v", err)
        sched.config.PodConditionUpdater.Update(pod, &v1.PodCondition{
            Type:    v1.PodScheduled,
            Status:  v1.ConditionFalse,
            Reason:  v1.PodReasonUnschedulable,
            Message: err.Error(),
        })
        return "", err
    }
    return host, err
}

schedule通过调用genericScheduler(实现了ScheduleAlgorithm接口)的Schedule方法,完成调度流程,最终的返回是一个string类型nodeName。当调度过程发生错误,先使用api.Scheme.Copy(pod)复制一份pod对象,然后调用sched.config.Error进行错误处理,实际调用的是MakeDefaultErrorFunc。之后向kube-apiserver记录Event以及更新pod状态就完成了schedule()的错误处理部分。

func (factory *ConfigFactory) MakeDefaultErrorFunc(backoff *util.PodBackoff, podQueue *cache.FIFO) func(pod *v1.Pod, err error) {
    return func(pod *v1.Pod, err error) {
        if err == core.ErrNoNodesAvailable {
            glog.V(4).Infof("Unable to schedule %v %v: no nodes are registered to the cluster; waiting", pod.Namespace, pod.Name)
        } else {
            if _, ok := err.(*core.FitError); ok {
                glog.V(4).Infof("Unable to schedule %v %v: no fit: %v; waiting", pod.Namespace, pod.Name, err)
            } else {
                glog.Errorf("Error scheduling %v %v: %v; retrying", pod.Namespace, pod.Name, err)
            }
        }
        backoff.Gc()
        // Retry asynchronously.
        // Note that this is extremely rudimentary and we need a more real error handling path.
        go func() {
            defer runtime.HandleCrash()
            podID := types.NamespacedName{
                Namespace: pod.Namespace,
                Name:      pod.Name,
            }

            entry := backoff.GetEntry(podID)
            if !entry.TryWait(backoff.MaxDuration()) {
                glog.Warningf("Request for pod %v already in flight, abandoning", podID)
                return
            }
            // Get the pod again; it may have changed/been scheduled already.
            getBackoff := initialGetBackoff
            for {
                pod, err := factory.client.Core().Pods(podID.Namespace).Get(podID.Name, metav1.GetOptions{})
                if err == nil {
                    if len(pod.Spec.NodeName) == 0 {
                        podQueue.AddIfNotPresent(pod)
                    }
                    break
                }
                if errors.IsNotFound(err) {
                    glog.Warningf("A pod %v no longer exists", podID)
                    return
                }
                glog.Errorf("Error getting pod %v for retry: %v; retrying...", podID, err)
                if getBackoff = getBackoff * 2; getBackoff > maximalGetBackoff {
                    getBackoff = maximalGetBackoff
                }
                time.Sleep(getBackoff)
            }
        }()
    }
}

sched.config.Error中首先判断错误种类打印日志。podbackoff中保存了每个pod对应的重新加入队列的间隔,每次加入间隔时间会加倍直到最大间隔。backoff.Gc()清理backoff中已经超过最大存活时间的条目。entry.TryWait(backoff.MaxDuration())会首先获取锁然后将上次的间隔时间翻倍后Sleep新的间隔,如果获取锁失败说明此pod已经在其他协程中Sleep则直接返回。之后重新从kube-apiserver中获取pod,当pod还没有被调度时(len(pod.Spec.NodeName) == 0)就可以把pod重新放入到未调度队列podQueue。如果从kube-apiserver获取失败,等待一段时间重新获取直到成功。

2. assume()

// assume signals to the cache that a pod is already in the cache, so that binding can be asnychronous.
func (sched *Scheduler) assume(pod *v1.Pod, host string) error {
    //......
    assumed := *pod
    assumed.Spec.NodeName = host
    if err := sched.config.SchedulerCache.AssumePod(&assumed); err != nil {
        glog.Errorf("scheduler cache AssumePod failed: %v", err)
        //......
        return err
    }
    // Optimistically assume that the binding will succeed, so we need to invalidate affected
    // predicates in equivalence cache.
    // If the binding fails, these invalidated item will not break anything.
    if sched.config.Ecache != nil {
        sched.config.Ecache.InvalidateCachedPredicateItemForPodAdd(pod, host)
    }
    return nil
}

assume()主要做了两个事情,首先调用SchedulerCacheAssumePod(),将pod的最新调度结果写入SchedulerCache,并更新schedulerCache中对应node的资源占用和hostPort端口占用。 然后使equivalencePodCache中的GeneralPredicates算法对应的缓存失效。因为GeneralPredicates算法包含了对node资源和端口占用的检查,当有新的pod调度在node上这两项资源都会得到更新,所以原始缓存必须要失效。

3. bind()

// bind binds a pod to a given node defined in a binding object.  We expect this to run asynchronously, so we
// handle binding metrics internally.
func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
    bindingStart := time.Now()
    // If binding succeeded then PodScheduled condition will be updated in apiserver so that
    // it's atomic with setting host.
    err := sched.config.Binder.Bind(b)
    if err != nil {
        glog.V(1).Infof("Failed to bind pod: %v/%v", assumed.Namespace, assumed.Name)
        //如果binding失败,则从cache中删除之前assume的pod
        if err := sched.config.SchedulerCache.ForgetPod(assumed); err != nil {
            return fmt.Errorf("scheduler cache ForgetPod failed: %v", err)
        }
        //ConfigFactory.MakeDefaultErrorFunc(podBackoff, f.podQueue)
        sched.config.Error(assumed, err)
        sched.config.Recorder.Eventf(assumed, v1.EventTypeWarning, "FailedScheduling", "Binding rejected: %v", err)
        sched.config.PodConditionUpdater.Update(assumed, &v1.PodCondition{
            Type:   v1.PodScheduled,
            Status: v1.ConditionFalse,
            Reason: "BindingRejected",
        })
        return err
    }

    //给pod加上deadline可以被删除
    if err := sched.config.SchedulerCache.FinishBinding(assumed); err != nil {
        return fmt.Errorf("scheduler cache FinishBinding failed: %v", err)
    }

    metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
    sched.config.Recorder.Eventf(assumed, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", assumed.Name, b.Target.Name)
    return nil
}

bind()是异步进行的,即assume()调用完成就会进入下一轮新的调度循环。sched.config.Binder.Bind(b)Binding这个对象写入到kube-apiserver中。如果写入发生错误,首先SchedulerCache.ForgetPod(assumed)将之前assume()加入到缓存中的pod删除,清除对应node资源占用。sched.config.Error(assumed, err)schedule()的错误处理相同,将失败的pod等一个间隔时间后重新放入未调度pod队列中。生成Event事件和更新pod状态与之前类似。而sched.config.Binder.Bind(b)成功写入kube-apiserver后,会调用SchedulerCache.FinishBinding(assumed),其主要作用是给SchedulerCache中的那些通过assume的pod加上过期时间ttl,从而可以定时清除他们释放内存占用,而之后调度的时候也并不会完全依赖这些SchedulerCache中assume的pod,因为当pod成功在node上创建后,Scheduler.Config中的NodeLister会定时同步全部node节点状态和资源占用,这些信息才是最准确的。至此,一个完整的调度循环就结束了。

results matching ""

    No results matching ""