kubelet源码分析

作者:李昂 邮箱:[email protected]

kubelet代码有些函数比较长,为了便于阅读在下面的本文只列出核心代码,会着重分析几个比较重要的manager以及各个manager和主循环syncloop之前的关系,希望能捋出一个kubelet大致的脉络,知其然也要知其所以然。(本文基于v1.9.0-alpha.0.572+9636522137039,其他版本会略有出入)

主启动流程

其实启动流程方面,kuberntes家族组件基本都是大同小异,基本都遵循以下路程:1.构建配置结构体;2.载入默认配置;3.解析加载命令行配置到配置结构体;4.通过配置结构体构建实际核心结构体;5.开始运行主循环。
kubelet这里多了一个ExperimentalDockershim,这是为以后全面使用CRI做测试用的,我们暂时不用管它。

func main() {
    s := options.NewKubeletServer()
    s.AddFlags(pflag.CommandLine)

    flag.InitFlags()
    logs.InitLogs()
    defer logs.FlushLogs()

    verflag.PrintAndExitIfRequested()

    if s.ExperimentalDockershim {
        if err := app.RunDockershim(&s.KubeletConfiguration, &s.ContainerRuntimeOptions); err != nil {
            fmt.Fprintf(os.Stderr, "error: %v\n", err)
            os.Exit(1)
        }
    }

    if err := app.Run(s, nil); err != nil {
        fmt.Fprintf(os.Stderr, "error: %v\n", err)
        os.Exit(1)
    }
}

之后就会来到app.Run函数,参数是我们上面构建的options.KubeletServer配置结构体和kubelet.Dependencies,这里第二个参数传入的是nil,因此之后会构造它。这个app.Run只是个外壳,内部实际调用了run函数。

// Run runs the specified KubeletServer with the given Dependencies.  This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) error {
    if err := run(s, kubeDeps); err != nil {
        return fmt.Errorf("failed to run Kubelet: %v", err)
    }
    return nil
}

func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {

    standaloneMode := true
    ......
    // Set feature gates based on the value in KubeletConfiguration
    err = utilfeature.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
    if err != nil {
        return err
    }
    // Register current configuration with /configz endpoint
    cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
    if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) {
    ...... 
    // if in standalone mode, indicate as much by setting all clients to nil
    if standaloneMode {
        kubeDeps.KubeClient = nil
        kubeDeps.ExternalKubeClient = nil
        kubeDeps.EventClient = nil
        glog.Warningf("standalone mode, no API client")
    } else if kubeDeps.KubeClient == nil || kubeDeps.ExternalKubeClient == nil || kubeDeps.EventClient == nil {
        // initialize clients if not standalone mode and any of the clients are not provided
        var kubeClient clientset.Interface
        var eventClient v1core.EventsGetter
        var externalKubeClient clientgoclientset.Interface

        clientConfig, err := CreateAPIServerClientConfig(s)

        var clientCertificateManager certificate.Manager
        if err == nil {
            ......
        } else {
            ......
        }
        kubeDeps.KubeClient = kubeClient
        kubeDeps.ExternalKubeClient = externalKubeClient
        if eventClient != nil {
            kubeDeps.EventClient = eventClient
        }
    }

    if kubeDeps.Auth == nil {
        auth, err := BuildAuth(nodeName, kubeDeps.ExternalKubeClient, s.KubeletConfiguration)
        if err != nil {
            return err
        }
        kubeDeps.Auth = auth
    }

    if kubeDeps.CAdvisorInterface == nil {
        kubeDeps.CAdvisorInterface, err = cadvisor.New(s.Address, uint(s.CAdvisorPort), s.ContainerRuntime, s.RootDirectory)
        if err != nil {
            return err
        }
    }

    // Setup event recorder if required.
    makeEventRecorder(&s.KubeletConfiguration, kubeDeps, nodeName)

    if kubeDeps.ContainerManager == nil {
        if s.CgroupsPerQOS && s.CgroupRoot == "" {
            glog.Infof("--cgroups-per-qos enabled, but --cgroup-root was not specified.  defaulting to /")
            s.CgroupRoot = "/"
        }
            ......
        if err != nil {
            return err
        }
    }

    if err := checkPermissions(); err != nil {
        glog.Error(err)
    }

    utilruntime.ReallyCrash = s.ReallyCrashForTesting

    rand.Seed(time.Now().UTC().UnixNano())

    // TODO(vmarmol): Do this through container config.
    oomAdjuster := kubeDeps.OOMAdjuster
    if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
        glog.Warning(err)
    }

    if err := RunKubelet(&s.KubeletFlags, &s.KubeletConfiguration, kubeDeps, s.RunOnce); err != nil {
        return err
    }
    ......
    if s.RunOnce {
        return nil
    }

    <-done
    return nil
}

run函数的主要目的就是为kubelet.Dependencies做初始化,但是初始化之前需要通过命令行是否传过来KubeConfig判断kubelet运行在standalone模式还是标准集群模式,如果是是合法的KubeConfig,那么就是非standalone模式,如果是standalone模式kubelet.Dependencies中很多client可以直接置为nil。kubelet.Dependencies实际上是一类kubelet运行时需要外部依赖的抽象,它包含了各类Client(dockerclient、kubeclient等)以及网络插件和数据卷插件等,这么做还有另一个好处是可以通过参数传进来一些Dependencies来帮助测试。最后如果是非standalone模式,那么将构建正式的Dependencies里的各类对象。这里特别说明的是Dependencies.ContainerManager是管理kubelet进程docker进程以及相关cgroup和oom_adj_score设置的,它和kubelet管理的pod上运行的container是没有关系的。之后在RunKubelet中就开始创建真正的kubelet核心结构体以及运行主循环。

func RunKubelet(kubeFlags *options.KubeletFlags, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.Dependencies, runOnce bool) error {
    hostname := nodeutil.GetHostname(kubeFlags.HostnameOverride)
    // Query the cloud provider for our node name, default to hostname if kcfg.Cloud == nil
    nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
    if err != nil {
        return err
    }
    // Setup event recorder if required.
    makeEventRecorder(kubeCfg, kubeDeps, nodeName)

    hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources)
    if err != nil {
        return err
    }

    hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources)
    if err != nil {
        return err
    }

    hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources)
    if err != nil {
        return err
    }

    privilegedSources := capabilities.PrivilegedSources{
        HostNetworkSources: hostNetworkSources,
        HostPIDSources:     hostPIDSources,
        HostIPCSources:     hostIPCSources,
    }
    capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0)

    credentialprovider.SetPreferredDockercfgPath(kubeCfg.RootDirectory)
    glog.V(2).Infof("Using root directory: %v", kubeCfg.RootDirectory)

    builder := kubeDeps.Builder
    if builder == nil {
        builder = CreateAndInitKubelet
    }
    if kubeDeps.OSInterface == nil {
        kubeDeps.OSInterface = kubecontainer.RealOS{}
    }
    k, err := builder(kubeCfg, kubeDeps, &kubeFlags.ContainerRuntimeOptions, kubeFlags.HostnameOverride, kubeFlags.NodeIP, kubeFlags.ProviderID)
    if err != nil {
        return fmt.Errorf("failed to create kubelet: %v", err)
    }

    // NewMainKubelet should have set up a pod source config if one didn't exist
    // when the builder was run. This is just a precaution.
    if kubeDeps.PodConfig == nil {
        return fmt.Errorf("failed to create kubelet, pod source config was nil")
    }
    podCfg := kubeDeps.PodConfig

    rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles))

    // process pods and exit.
    if runOnce {
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
            return fmt.Errorf("runonce failed: %v", err)
        }
        glog.Infof("Started kubelet %s as runonce", version.Get().String())
    } else {
        startKubelet(k, podCfg, kubeCfg, kubeDeps)
        glog.Infof("Started kubelet %s", version.Get().String())
    }
    return nil
}

func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.Dependencies, crOptions *options.ContainerRuntimeOptions, hostnameOverride, nodeIP, providerID string) (k kubelet.Bootstrap, err error) {
    // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
    // up into "per source" synchronizations

    k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, crOptions, hostnameOverride, nodeIP, providerID)
    if err != nil {
        return nil, err
    }

    k.BirthCry()

    k.StartGarbageCollection()

    return k, nil
}

func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.Dependencies) {
    // start the kubelet
    go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)

    // start the kubelet server
    if kubeCfg.EnableServer {
        go wait.Until(func() {
            k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)
        }, 0, wait.NeverStop)
    }
    if kubeCfg.ReadOnlyPort > 0 {
        go wait.Until(func() {
            k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
        }, 0, wait.NeverStop)
    }
}

RunKubelet根据穿过来的配置设置capabilities,如果kubeDeps中没有外部注入builder,那么会用默认的CreateAndInitKubelet函数创建kubelet核心对象。CreateAndInitKubelet调用kubelet.NewMainKubelet创建kubelet对象k,k.BirthCry()调用eventrecorder写入一条"kubelet start"事件,而StartGarbageCollection()会启动containerGC和imagemanager的GarbageCollect()方法开启容器和镜像的垃圾回收。startKubelet会正式启动kubelet,并且如果配置了EnableServer和ReadOnlyPort还会监听端口开启Api。这里k.Run(podCfg.Updates())传入的podCfg.Updates()返回一个channel,会收到了apiserver、file、url中的pod更新事件。到这里其实主启动流程就算执行完了,前面启动流程的部分逻辑比较简单,本文只是大致分析到。而后面的NewMainKubelet和k.Run(podCfg.Updates())才包含了kubelet核心逻辑。

results matching ""

    No results matching ""