kubelet cadvisor

作者 张杰  [email protected]

cAdvisor是谷歌开源的一个容器监控工具,该工具提供了webUI和REST API两种方式来展示数据,从而可以帮助管理者了解主机以及容器的资源使用情况和性能数据。

cAdvisor集成到了kubelet组件内,因此可以在kube集群中每个启动了kubelet的节点使用cAdvisor来查看该节点的运行数据

源码分析

cmd/kubelet/app/server.go run() 方法里,有对cadivisor 的初始化

    if kubeDeps.CAdvisorInterface == nil {
        imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
        // RootDirectory  : Directory path for managing kubelet files (volume mounts,etc). (default "/var/lib/kubelet")
        kubeDeps.CAdvisorInterface, err = cadvisor.New(s.Address, uint(s.CAdvisorPort), imageFsInfoProvider, s.RootDirectory, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
        if err != nil {
            return err
        }
    }

cadvisor.New 方法是在 pkg/kubelet/cadvisor/cadvisor_linux.go (如果是linux 环境, 在这里 k8s 用了 go语言的build 约束 方式,在文件的开头有:// +build cgo,linux 方式。 支持不同平台的build )

func New(address string, port uint, imageFsInfoProvider ImageFsInfoProvider, rootPath string, usingLegacyStats bool) (Interface, error) {
    sysFs := sysfs.NewRealSysFs()

...
    // Create and start the cAdvisor container manager.
    m, err := manager.New(memory.New(statsCacheDuration, nil), sysFs, maxHousekeepingInterval, allowDynamicHousekeeping, ignoreMetrics, http.DefaultClient)
    if err != nil {
        return nil, err
    }

...

    err = cadvisorClient.exportHTTP(address, port)
    if err != nil {
        return nil, err
    }
    return cadvisorClient, nil
}

New 方法主要有两个核心方法,manager.New 和 cadvisorClient.exportHTTP ,分别是创建cadvisor 的manager 和 httpserver

manager.New () 方法在 vendor/github.com/google/cadvisor/manager/manager.go 定义 主要是生成一个manage 对象。

    newManager := &manager{
        containers:               make(map[namespacedContainerName]*containerData),
        quitChannels:             make([]chan error, 0, 2),
        memoryCache:              memoryCache,
        fsInfo:                   fsInfo,
        cadvisorContainer:        selfContainer,
        inHostNamespace:          inHostNamespace,
        startupTime:              time.Now(),
        maxHousekeepingInterval:  maxHousekeepingInterval,
        allowDynamicHousekeeping: allowDynamicHousekeeping,
        ignoreMetrics:            ignoreMetricsSet,
        containerWatchers:        []watcher.ContainerWatcher{},
        eventsChannel:            eventsChannel,
        collectorHttpClient:      collectorHttpClient,
        nvidiaManager:            &accelerators.NvidiaManager{},
    }

其中containers 是本机container 的缓存,起数据来源是manger 的start 方法去更新的,后续我们会讲。 cadvisorClient.exportHTTP() 方法是起httpserver ,里面的核心方法是 cadvisorhttp.RegisterHandlers() 用来注册不同的url 和function

func (cc *cadvisorClient) exportHTTP(address string, port uint) error {
    // Register the handlers regardless as this registers the prometheus
    // collector properly.
    mux := http.NewServeMux()
        // 核心方法
    err := cadvisorhttp.RegisterHandlers(mux, cc, "", "", "", "")
    if err != nil {
        return err
    }

    cadvisorhttp.RegisterPrometheusHandler(mux, cc, "/metrics", containerLabels)

    // Only start the http server if port > 0
    if port > 0 {
        serv := &http.Server{
            Addr:    net.JoinHostPort(address, strconv.Itoa(int(port))),
            Handler: mux,
        }

        // TODO(vmarmol): Remove this when the cAdvisor port is once again free.
        // If export failed, retry in the background until we are able to bind.
        // This allows an existing cAdvisor to be killed before this one registers.
        go func() {
            defer runtime.HandleCrash()

            err := serv.ListenAndServe()
            for err != nil {
                glog.Infof("Failed to register cAdvisor on port %d, retrying. Error: %v", port, err)
                time.Sleep(time.Minute)
                err = serv.ListenAndServe()
            }
        }()
    }

    return nil
}

vendor/github.com/google/cadvisor/http/handlers.go cadvisorhttp.RegisterHandlers()

func RegisterHandlers(mux httpmux.Mux, containerManager manager.Manager, httpAuthFile, httpAuthRealm, httpDigestFile, httpDigestRealm string) error {
    // Basic health handler.
    // 注册healthz handler
    if err := healthz.RegisterHandler(mux); err != nil {
        return fmt.Errorf("failed to register healthz handler: %s", err)
    }

    // Validation/Debug handler.
    // 注册 /validate/ url 的方法
    mux.HandleFunc(validate.ValidatePage, func(w http.ResponseWriter, r *http.Request) {
        err := validate.HandleRequest(w, containerManager)
        if err != nil {
            http.Error(w, err.Error(), http.StatusInternalServerError)
        }
    })

    // Register API handler.
    // 注册API的方法  /api/ 
    if err := api.RegisterHandlers(mux, containerManager); err != nil {
        return fmt.Errorf("failed to register API handlers: %s", err)
    }

    // Redirect / to containers page.
    mux.Handle("/", http.RedirectHandler(pages.ContainersPage, http.StatusTemporaryRedirect))

    var authenticated bool

...
    return nil
}

以 api.RegisterHandlers() 方法为例 /vendor/github.com/google/cadvisor/api/handler.go 最终 调用 handleRequest() 方法

func RegisterHandlers(mux httpmux.Mux, m manager.Manager) error {
    apiVersions := getApiVersions()
    // 支持不同的version
    supportedApiVersions := make(map[string]ApiVersion, len(apiVersions))
    for _, v := range apiVersions {
        supportedApiVersions[v.Version()] = v
    }

    mux.HandleFunc(apiResource, func(w http.ResponseWriter, r *http.Request) {
        err := handleRequest(supportedApiVersions, m, w, r)
        if err != nil {
            http.Error(w, err.Error(), 500)
        }
    })
    return nil
}

handlerRequest() 方法

func handleRequest(supportedApiVersions map[string]ApiVersion, m manager.Manager, w http.ResponseWriter, r *http.Request) error {
    start := time.Now()
...

    const apiPrefix = "/api"
    if !strings.HasPrefix(request, apiPrefix) {
        return fmt.Errorf("incomplete API request %q", request)
    }
...

    version := requestElements[apiVersion]
    requestType := requestElements[apiRequestType]
    requestArgs := strings.Split(requestElements[apiRequestArgs], "/")

...
    return versionHandler.HandleRequest(requestType, requestArgs, m, w, r)

}

最终调用 versionHandler.HandleRequest() 我们以version1_1 和 containers types 为例

vendor/github.com/google/cadvisor/api/versions.go

func (self *version1_1) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error {
    switch requestType {
    case subcontainersApi:
        containerName := getContainerName(request)
        glog.V(4).Infof("Api - Subcontainers(%s)", containerName)

        // Get the query request.
        query, err := getContainerInfoRequest(r.Body)
        if err != nil {
            return err
        }

        // Get the subcontainers.
        containers, err := m.SubcontainersInfo(containerName, query)
        if err != nil {
            return fmt.Errorf("failed to get subcontainers for container %q with error: %s", containerName, err)
        }

        // Only output the containers as JSON.
        err = writeResult(containers, w)
        if err != nil {
            return err
        }
        return nil
    default:
        return self.baseVersion.HandleRequest(requestType, request, m, w, r)
    }
}

如果RequestType 是container, 转到 default 的 self.baseVersion.HandleRequest 方法

func (self *version1_0) HandleRequest(requestType string, request []string, m manager.Manager, w http.ResponseWriter, r *http.Request) error {
    switch requestType {
...
    case containersApi:
        containerName := getContainerName(request)
        glog.V(4).Infof("Api - Container(%s)", containerName)

        // Get the query request.
        query, err := getContainerInfoRequest(r.Body)
        if err != nil {
            return err
        }

        // Get the container.
        cont, err := m.GetContainerInfo(containerName, query)
        if err != nil {
            return fmt.Errorf("failed to get container %q with error: %s", containerName, err)
        }

        // Only output the container as JSON.
        err = writeResult(cont, w)
        if err != nil {
            return err
        }
...
    }
    return nil
}

最终会调用 m.GetContainerInfo() 方法,我们一步不追: vendor/github.com/google/cadvisor/manager/manager.go

func (self *manager) GetContainerInfo(containerName string, query *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
    cont, err := self.getContainerData(containerName)
    if err != nil {
        return nil, err
    }
    return self.containerDataToContainerInfo(cont, query)
}

核心方法: getContainerData() 这个方法的核心逻辑就是从 containers map 里拿到对应container 的数据,返回,然后让http w 写会,response 给client 端。 这样就实现了 api: curl http://kubeletip:4194/api/v1.1/containers 的api 请求的逻辑

func (self *manager) getContainerData(containerName string) (*containerData, error) {
    var cont *containerData
    var ok bool
    func() {
        self.containersLock.RLock()
        defer self.containersLock.RUnlock()

        // Ensure we have the container.
        cont, ok = self.containers[namespacedContainerName{
            Name: containerName,
        }]
    }()
    if !ok {
        return nil, fmt.Errorf("unknown container %q", containerName)
    }
    return cont, nil
}

那么上文也说了 ,containers 数据是哪里来的

代码追踪: pkg/kubelet/kubelet.go NewMainKubelet() 方法,构造kubelet 对象时候 ,会将 kubeDeps.CAdvisorInterface 赋值给Kubelet 的 cadvisor 变量

klet := &Kubelet{
        hostname:        hostname,
...
        cadvisor:                       kubeDeps.CAdvisorInterface,
...
        keepTerminatedPodVolumes:                keepTerminatedPodVolumes,
    }

在 最终的kubelet的 Run 方法里会调用 updateRuntimeUp() -> initializeRuntimeDependentModules() -> kl.cadvisor.Start()

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    if kl.logServer == nil {
        kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    }
    if kl.kubeClient == nil {
        glog.Warning("No api server defined - no node status update will be sent.")
    }

...
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
...
    kl.pleg.Start()
    kl.syncLoop(updates, kl)
}

我们追踪 kl.cadvisor.Start() 方法 (如果是linux 环境 pkg/kubelet/cadvisor/cadvisor_linux.go)

func (cc *cadvisorClient) Start() error {
    return cc.Manager.Start()
}

继续追 cc.Manager.Start() 又到了manager vendor/github.com/google/cadvisor/manager/manager.go


// Start the container manager.
func (self *manager) Start() error {
    err := docker.Register(self, self.fsInfo, self.ignoreMetrics)
    if err != nil {
        glog.Warningf("Docker container factory registration failed: %v.", err)
    }
...
    err = containerd.Register(self, self.fsInfo, self.ignoreMetrics)
    if err != nil {
        glog.Warningf("Registration of the containerd container factory failed: %v", err)
    }
...
    go self.globalHousekeeping(quitGlobalHousekeeping)

    return nil
}

函数中几个Register 方法,会将对应的Factory 注册到 全局变量 factories 中,k8s 在一些注册方法中,经常喜欢使用全局变量,这些风格可以在scheduler ,controller-manager 中经常看到,并且会加上类似 的sync.RWMutex锁

追踪核心方法: self.globalHousekeeping() 这个一个协程,会调用核心方法 detectSubcontainers()

func (self *manager) globalHousekeeping(quit chan error) {
    // Long housekeeping is either 100ms or half of the housekeeping interval.
...
    ticker := time.Tick(*globalHousekeepingInterval)
    for {
        select {
        case t := <-ticker:
            start := time.Now()

            // Check for new containers.
            err := self.detectSubcontainers("/")
            if err != nil {
                glog.Errorf("Failed to detect containers: %s", err)
            }

            // Log if housekeeping took too long.
            duration := time.Since(start)
            if duration >= longHousekeeping {
                glog.V(3).Infof("Global Housekeeping(%d) took %s", t.Unix(), duration)
            }
...        }
    }
}

detectSubcontainers(),此方法主要调用两个方法 createContainer和destroyContainer



```// Detect the existing subcontainers and reflect the setup here.
func (m *manager) detectSubcontainers(containerName string) error {
    added, removed, err := m.getContainersDiff(containerName)
    if err != nil {
        return err
    }

    // Add the new containers.
    for _, cont := range added {
        err = m.createContainer(cont.Name, watcher.Raw)
        if err != nil {
            glog.Errorf("Failed to create existing container: %s: %s", cont.Name, err)
        }
    }

    // Remove the old containers.
    for _, cont := range removed {
        err = m.destroyContainer(cont.Name)
        if err != nil {
            glog.Errorf("Failed to destroy existing container: %s: %s", cont.Name, err)
        }
    }

    return nil
}

以createContainer 为例 调用了 createContainerLocked() 最终处理了add 和 destroy container 而 containers 的更新 是在 watchForNewContainers 实现的

cadvisor 支持web 访问,

http://IP:4194/

results matching ""

    No results matching ""