Scheduler初始化

func CreateScheduler(
    s *options.SchedulerServer,
    kubecli clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer extensionsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    recorder record.EventRecorder,
) (*scheduler.Scheduler, error) {
    // 初始化各种Informer,构造ConfigFactory(configurator的实现)
    configurator := factory.NewConfigFactory(
        s.SchedulerName,
        kubecli,
        nodeInformer,
        podInformer,
        pvInformer,
        pvcInformer,
        replicationControllerInformer,
        replicaSetInformer,
        statefulSetInformer,
        serviceInformer,
        s.HardPodAffinitySymmetricWeight,
        utilfeature.DefaultFeatureGate.Enabled(features.EnableEquivalenceClassCache),
    )
    // schedulerConfigurator 覆盖了Create()方法,原本默认走的是ConfigFactory的Create
    // 现在会检测是否有policyconfigfile。
    // Rebuild the configurator with a default Create(...) method.
    configurator = &schedulerConfigurator{
        configurator,
        s.PolicyConfigFile,
        s.AlgorithmProvider,
        s.PolicyConfigMapName,
        s.PolicyConfigMapNamespace,
        s.UseLegacyPolicyConfig,
    }

    return scheduler.NewFromConfigurator(configurator, func(cfg *scheduler.Config) {
        cfg.Recorder = recorder
    })
}

factory.NewConfigFactory创建了ConfigFactory对象(实现了Configurator接口),这个对象里面主要各种informer的初始化,用来从kube-apiserver中同步各种资源的内容,用于后面调度算法。需要特别关注的是ConfigFactory两个重要的结构体成员:PodQueueschedulerCache,在ConfigFactorypodInformer的中注册了两套处理函数用于监听kube-apiserver中的最新的pod信息,分别用于把未调度的pod放入PodQueue中以及把已经调度过的pod放入schedulerCache中作为调度的参考。hardPodAffinitySymmetricWeightenableEquivalenceClassCache这两个参数用于调度后面算法中,此处先按下不表。

func NewConfigFactory(
    schedulerName string,
    client clientset.Interface,
    nodeInformer coreinformers.NodeInformer,
    podInformer coreinformers.PodInformer,
    pvInformer coreinformers.PersistentVolumeInformer,
    pvcInformer coreinformers.PersistentVolumeClaimInformer,
    replicationControllerInformer coreinformers.ReplicationControllerInformer,
    replicaSetInformer extensionsinformers.ReplicaSetInformer,
    statefulSetInformer appsinformers.StatefulSetInformer,
    serviceInformer coreinformers.ServiceInformer,
    hardPodAffinitySymmetricWeight int,
    enableEquivalenceClassCache bool,
) scheduler.Configurator {
    stopEverything := make(chan struct{})
    schedulerCache := schedulercache.New(30*time.Second, stopEverything)

    c := &ConfigFactory{
        client:                         client,
        podLister:                      schedulerCache,
        podQueue:                       cache.NewFIFO(cache.MetaNamespaceKeyFunc),
        pVLister:                       pvInformer.Lister(),
        pVCLister:                      pvcInformer.Lister(),
        serviceLister:                  serviceInformer.Lister(),
        controllerLister:               replicationControllerInformer.Lister(),
        replicaSetLister:               replicaSetInformer.Lister(),
        statefulSetLister:              statefulSetInformer.Lister(),
        schedulerCache:                 schedulerCache,
        StopEverything:                 stopEverything,
        schedulerName:                  schedulerName,
        hardPodAffinitySymmetricWeight: hardPodAffinitySymmetricWeight,
        enableEquivalenceClassCache:    enableEquivalenceClassCache,
    }

    c.scheduledPodsHasSynced = podInformer.Informer().HasSynced
    // scheduled pod cache
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return assignedNonTerminatedPod(t)
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc:    c.addPodToCache,
                UpdateFunc: c.updatePodInCache,
                DeleteFunc: c.deletePodFromCache,
            },
        },
    )
    // unscheduled pod queue
    podInformer.Informer().AddEventHandler(
        cache.FilteringResourceEventHandler{
            FilterFunc: func(obj interface{}) bool {
                switch t := obj.(type) {
                case *v1.Pod:
                    return unassignedNonTerminatedPod(t)
                default:
                    runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj))
                    return false
                }
            },
            Handler: cache.ResourceEventHandlerFuncs{
                AddFunc: func(obj interface{}) {
                    if err := c.podQueue.Add(obj); err != nil {
                        runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
                    }
                },
                UpdateFunc: func(oldObj, newObj interface{}) {
                    if err := c.podQueue.Update(newObj); err != nil {
                        runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err))
                    }
                },
                DeleteFunc: func(obj interface{}) {
                    if err := c.podQueue.Delete(obj); err != nil {
                        runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err))
                    }
                },
            },
        },
    )
    ......

回到CreateScheduler()中,构造schedulerConfigurator对象时使用了之前生成的configurator对象,在schedulerConfigurator中主要包含调度算法配置相关的字段,因为k8s支持使用默认提供的调度算法,同时也支持读取用户配置文件中指定的调度算法和自定义调度算法。最后调用scheduler.NewFromConfigurator函数传入之前创建的configurator和一个简单的赋值函数后就可以生成我们调度需要的Scheduler对象。

// NewFromConfigurator returns a new scheduler that is created entirely by the Configurator.  Assumes Create() is implemented.
// Supports intermediate Config mutation for now if you provide modifier functions which will run after Config is created.
func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Scheduler, error) {
    cfg, err := c.Create()
    if err != nil {
        return nil, err
    }
    // Mutate it if any functions were provided, changes might be required for certain types of tests (i.e. change the recorder).
    for _, modifier := range modifiers {
        modifier(cfg)
    }
    // From this point on the config is immutable to the outside.
    s := &Scheduler{
        config: cfg,
    }
    metrics.Register()
    return s, nil
}

NewFromConfigurator调用了之前传过来的configurator的Create()方法生成Schedulerconfig,之后用我们之前传过来的匿名函数func(cfg *scheduler.Config) {cfg.Recorder = recorder}cfgRecorder赋值,最后返回 Scheduler的对象。

// Create implements the interface for the Configurator, hence it is exported
// even though the struct is not.
func (sc schedulerConfigurator) Create() (*scheduler.Config, error) {
    policy, err := sc.getSchedulerPolicyConfig()
    if err != nil {
        return nil, err
    }
    // If no policy is found, create scheduler from algorithm provider.
    if policy == nil {
        if sc.Configurator != nil {
            return sc.Configurator.CreateFromProvider(sc.algorithmProvider)
        }
        return nil, fmt.Errorf("Configurator was nil")
    }

    return sc.CreateFromConfig(*policy)
}

Create()方法会检查用户是否自定义调度算法的配置文件已经相关的配置项(schedulerConfigurator中的相关项是由SchedulerServer中解析用户命令行参数构造的),如果没有定义则使用默认的algorithmProvider创建scheduler.Config

// Creates a scheduler from the name of a registered algorithm provider.
func (f *ConfigFactory) CreateFromProvider(providerName string) (*scheduler.Config, error) {
    glog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
    provider, err := GetAlgorithmProvider(providerName)
    if err != nil {
        return nil, err
    }

    return f.CreateFromKeys(provider.FitPredicateKeys, provider.PriorityFunctionKeys, []algorithm.SchedulerExtender{})
}

CreateFromProvider中首先根据providerName获取providerproviderName也是通过命令行传入到schedulerConfigurator中,默认值是"DefaultProvider",然后GetAlgorithmProvider使用之前已经注册完成的algorithmProviderMap返回provider实例。实际上algorithmProviderMapplugin/cmd/kube-scheduler/app/server.goimport _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"里就已经注册完成了,这里使用的golang的package的init()函数特性,实际最后完成注册逻辑是在plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go文件中。之后分析每个调度算法时会重点关注defaults.go这个文件。回到之前的流程,得到provider后就直接调用f.CreateFromKeys返回scheduler.Config

// Creates a scheduler from a set of registered fit predicate keys and priority keys.
func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String, extenders []algorithm.SchedulerExtender) (*scheduler.Config, error) {
    glog.V(2).Infof("Creating scheduler with fit predicates '%v' and priority functions '%v", predicateKeys, priorityKeys)

    if f.GetHardPodAffinitySymmetricWeight() < 1 || f.GetHardPodAffinitySymmetricWeight() > 100 {
        return nil, fmt.Errorf("invalid hardPodAffinitySymmetricWeight: %d, must be in the range 1-100", f.GetHardPodAffinitySymmetricWeight())
    }
    //用字符串set生成具体调度算法
    predicateFuncs, err := f.GetPredicates(predicateKeys)
    if err != nil {
        return nil, err
    }

    priorityConfigs, err := f.GetPriorityFunctionConfigs(priorityKeys)
    if err != nil {
        return nil, err
    }
    // priorityMetaProducer和predicateMetaProducer都是在default里注册工厂方法,然后调用GetPriorityMetadataProducer()
    // 传入f.getPluginArgs()由得到的PluginFactoryArgs生成PriorityMetadata()函数,最后生成metadata
    priorityMetaProducer, err := f.GetPriorityMetadataProducer()
    if err != nil {
        return nil, err
    }

    predicateMetaProducer, err := f.GetPredicateMetadataProducer()
    if err != nil {
        return nil, err
    }

    // Init equivalence class cache
    if f.enableEquivalenceClassCache && getEquivalencePodFunc != nil {
        f.equivalencePodCache = core.NewEquivalenceCache(getEquivalencePodFunc)
        glog.Info("Created equivalence class cache")
    }
    algo := core.NewGenericScheduler(f.schedulerCache, f.equivalencePodCache, predicateFuncs, predicateMetaProducer, priorityConfigs, priorityMetaProducer, extenders)

    podBackoff := util.CreateDefaultPodBackoff()
    return &scheduler.Config{
        SchedulerCache: f.schedulerCache,
        Ecache:         f.equivalencePodCache,
        // The scheduler only needs to consider schedulable nodes.
        NodeLister:          &nodePredicateLister{f.nodeLister},
        Algorithm:           algo,
        Binder:              f.getBinder(extenders),
        PodConditionUpdater: &podConditionUpdater{f.client},
        WaitForCacheSync: func() bool {
            return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodsHasSynced)
        },
        NextPod: func() *v1.Pod {
            return f.getNextPod()
        },
        Error:          f.MakeDefaultErrorFunc(podBackoff, f.podQueue),
        StopEverything: f.StopEverything,
    }, nil
}

CreateFromKeys()首先验证了hardPodAffinitySymmetricWeight参数是否在1-100范围内,此参数用在priorityCalculateInterPodAffinityPriority算法中,用于选出在相同拓扑域下和pod亲和度最大的nodef.GetPredicates(predicateKeys)通过predicate算法的name集合和在default.go中注册的每个predicate算法的工厂算法来生成名字-算法一一对应的map[string]algorithm.FitPredicatepredicateMetaProducer是为所有调度算法提前计算一些公用的数据例如pod的资源请求量,pod使用的hostPort等等。equivalencePodCache用于优化predicate算法,其缓存了由同一个controller(如ReplicaController或ReplicaSet)复制出来的多个pod不用重复应用所有predicate算法而是使用第一个pod调度结果就可以了,目前equivalencePodCache只应用到predicate算法中。NewGenericScheduler()生成了具体用于执行调度算法的实例genericSchedulerpodBackoff用于pod调度失败后控制重新调度的间隔,每次默认间隔都会加倍直到增加到间隔最大值。scheduler.Config中其它关键字段含义如下

  • Binder用于将调度完成的podnode生一个Binding写入到kube-apiserver中,这个过程称之为Bind。
  • PodConditionUpdater把pod最新状态写会到kube-apiserver中。
  • WaitForCacheSync等待podinformerfifo缓存队列中的全部被取走,podinformer就绪。
  • NextPod是从pod的未调度队中取出一个最新的pod
  • Error是处理pod调度失败的函数,间隔podBackoff时间后会重新将pod加入到未调度队列中。

至此,调度需要的所有组件已经构建或者初始化完成,之后的sched.Run()就会进入打到核心的调度循环。

results matching ""

    No results matching ""