跳至主要内容

kubernetes cloud controller manager

kubernetes cloud controller manager

本文所有代码基于 1.16.0-alpha.2 commit: bdde11a664
所以引用文档版本为 1.15.0

1 什么是 cloud controller manager(ccm)

在说 ccm 之前要了解一个 ccm 的前身 – cloud provider
cloud provider 是为了 k8s 更加容易在公有云环境下而提出的一个方案
比如在 aws azure 等环境下可以让 k8s 的资源与云厂商的资源进行匹配
具体的演进路线等可以阅读 这篇文章

2 能用 ccm 干什么

在思索 ccm 可以做什么时,要思考一个问题:kubernetes 的核心价值在于哪里?
云主机厂商的本质上是在售卖计算资源与服务,而 k8s 的价值是在于管理与调度容器
正如 k8s 描述的一样: Production-Grade Container Scheduling and Management
k8s 更加关心容器的调度与管理,其他的资源也都是为了容器而服务的
那么有什么资源对于 k8s 来说是可以被替代的?
负载均衡、路由、主机
k8s 不关心主机是实际在东京还是西雅图,也不关心负载均衡具体是如何实现的
它只需要主机上的 kubelet 在正常运行,可以通过负载均衡访问到暴露的服务
而这些恰恰是云厂商最为关心的事情,主机的配置、主机的位置、负载均衡的实现、路由如何到达
这时候再来看 ccm 的接口
LoadBalancer() (LoadBalancer, bool)
Instances() (Instances, bool)
Zones() (Zones, bool)
Clusters() (Clusters, bool)
Routes() (Routes, bool)
ProviderName() string
HasClusterID() bool
这样就不难猜测出云主机厂商可以用 ccm 在做些什么事情了
每当用户通过 k8s 创建一个资源的时候,可以同时对云主机运行的实例进行相应更改

2.1 现有的 ccm

ccm 目前被分为两种类型: In Tree/Out of Tree

2.1.1 In Tree

In Tree 指的是在包含在 kubernetes 代码中的 cloud provider
In Tree 可以看作在 ccm 的概念提出来之前,k8s 为了兼容云厂商而做出的妥协
现在 k8s 流氓做的大,所以要让云厂商兼容他了,所以 k8s 正在着手移除 In Tree 的 ccm 相关代码
在 这里 可以看到 k8s 对于 In Tree ccm 的相关移除计划
在 这里 可以看到 k8s 中残余的 In Tree ccm
In Tree 的代表厂商
  1. AWS
  2. GCE
  3. OpenStack
  4. Azure
  5. vSphere

2.1.2 Out of Tree

Out of Tree 指的就是那些不在 kubernetes 代码当中的(没赶上车或社区内的方案)
然而不用很久,仅会有一种 ccm 就是 Out of Tree Cloud Controller Manager
Out of Tree 的代表
  1. DigitalOcean
  2. Oracle Cloud Infrastructure
  3. Rancher

3 如何实现一个 ccm

实现一个简单的 ccm 十分的简单
只需要实现这个文件中的所有接口

4 ccm 背后的秘密

4.1 Out of Tree ccm 如何工作

1.在 kubelet 启动时会检查启动参数中是否有 "–cloud-provider=external"
//staging/src/k8s.io/cloud-provider/plugins.go
func IsExternal(name string) bool {
        return name == externalCloudProvider
}
2.当 kubelet 执行 initialNode 时判断是否 taint node
//pkg/kubelet/kubelet_nodes_status.go
func (kl *Kubelet) initialNode() (*v1.Node, error) {
        node := &v1.Node{
                ObjectMeta: metav1.ObjectMeta{
                        Name: string(kl.nodeName),
                        Labels: map[string]string{
                                v1.LabelHostname:      kl.hostname,
                                v1.LabelOSStable:      goruntime.GOOS,
                                v1.LabelArchStable:    goruntime.GOARCH,
                                kubeletapis.LabelOS:   goruntime.GOOS,
                                kubeletapis.LabelArch: goruntime.GOARCH,
                        },
                },
                Spec: v1.NodeSpec{
                        Unschedulable: !kl.registerSchedulable,
                },
        }
        nodeTaints := make([]v1.Taint, 0)
        if len(kl.registerWithTaints) > 0 {
                taints := make([]v1.Taint, len(kl.registerWithTaints))
                for i := range kl.registerWithTaints {
                        if err := k8s_api_v1.Convert_core_Taint_To_v1_Taint(&kl.registerWithTaints[i], &taints[i], nil); err != nil {
                                return nil, err
                        }
                }
                nodeTaints = append(nodeTaints, taints...)
        }

        unschedulableTaint := v1.Taint{
                Key:    schedulerapi.TaintNodeUnschedulable,
                Effect: v1.TaintEffectNoSchedule,
        }

        // If TaintNodesByCondition enabled, taint node with TaintNodeUnschedulable when initializing
        // node to avoid race condition; refer to #63897 for more detail.
        if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
                if node.Spec.Unschedulable &&
                        !taintutil.TaintExists(nodeTaints, &unschedulableTaint) {
                        nodeTaints = append(nodeTaints, unschedulableTaint)
                }
        }

        if kl.externalCloudProvider {
                taint := v1.Taint{
                        Key:    schedulerapi.TaintExternalCloudProvider,
                        Value:  "true",
                        Effect: v1.TaintEffectNoSchedule,
                }

                nodeTaints = append(nodeTaints, taint)
        }
        if len(nodeTaints) > 0 {
                node.Spec.Taints = nodeTaints
        }
        // Initially, set NodeNetworkUnavailable to true.
        if kl.providerRequiresNetworkingConfiguration() {
                node.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
                        Type:               v1.NodeNetworkUnavailable,
                        Status:             v1.ConditionTrue,
                        Reason:             "NoRouteCreated",
                        Message:            "Node created without a route",
                        LastTransitionTime: metav1.NewTime(kl.clock.Now()),
                })
        }

        if kl.enableControllerAttachDetach {
                if node.Annotations == nil {
                        node.Annotations = make(map[string]string)
                }

                klog.Infof("Setting node annotation to enable volume controller attach/detach")
                node.Annotations[volutil.ControllerManagedAttachAnnotation] = "true"
        } else {
                klog.Infof("Controller attach/detach is disabled for this node; Kubelet will attach and detach volumes")
        }

        if kl.keepTerminatedPodVolumes {
                if node.Annotations == nil {
                        node.Annotations = make(map[string]string)
                }
                klog.Infof("Setting node annotation to keep pod volumes of terminated pods attached to the node")
                node.Annotations[volutil.KeepTerminatedPodVolumesAnnotation] = "true"
        }

        // @question: should this be place after the call to the cloud provider? which also applies labels
        for k, v := range kl.nodeLabels {
                if cv, found := node.ObjectMeta.Labels[k]; found {
                        klog.Warningf("the node label %s=%s will overwrite default setting %s", k, v, cv)
                }
                node.ObjectMeta.Labels[k] = v
        }

        if kl.providerID != "" {
                node.Spec.ProviderID = kl.providerID
        }

        if kl.cloud != nil {
                instances, ok := kl.cloud.Instances()
                if !ok {
                        return nil, fmt.Errorf("failed to get instances from cloud provider")
                }

                // TODO: We can't assume that the node has credentials to talk to the
                // cloudprovider from arbitrary nodes. At most, we should talk to a
                // local metadata server here.
                var err error
                if node.Spec.ProviderID == "" {
                        node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(context.TODO(), kl.cloud, kl.nodeName)
                        if err != nil {
                                return nil, err
                        }
                }

                instanceType, err := instances.InstanceType(context.TODO(), kl.nodeName)
                if err != nil {
                        return nil, err
                }
                if instanceType != "" {
                        klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelInstanceType, instanceType)
                        node.ObjectMeta.Labels[v1.LabelInstanceType] = instanceType
                }
                // If the cloud has zone information, label the node with the zone information
                zones, ok := kl.cloud.Zones()
                if ok {
                        zone, err := zones.GetZone(context.TODO())
                        if err != nil {
                                return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
                        }
                        if zone.FailureDomain != "" {
                                klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneFailureDomain, zone.FailureDomain)
                                node.ObjectMeta.Labels[v1.LabelZoneFailureDomain] = zone.FailureDomain
                        }
                        if zone.Region != "" {
                                klog.Infof("Adding node label from cloud provider: %s=%s", v1.LabelZoneRegion, zone.Region)
                                node.ObjectMeta.Labels[v1.LabelZoneRegion] = zone.Region
                        }
                }
        }

        kl.setNodeStatus(node)

        return node, nil
}
3.当 ccm 的 pod 启动时会执行主函数创建 cmd 并执行
//cmd/cloud-controller-manager/controller-manager.go
func main() {
        rand.Seed(time.Now().UnixNano())

        command := app.NewCloudContrcollerManagerCommand()

        // TODO: once we switch everything over to Cobra commands, we can go back to calling
        // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
        // normalize func and add the go flag set by hand.
        // utilflag.InitFlags()
        logs.InitLogs()
        defer logs.FlushLogs()

        if err := command.Execute(); err != nil {
                os.Exit(1)
        }
}

//cmd/cloud-controller-manager/app/controllermanager.go
func NewCloudControllerManagerCommand() *cobra.Command {
        s, err := options.NewCloudControllerManagerOptions()
        if err != nil {
                klog.Fatalf("unable to initialize command options: %v", err)
        }

        cmd := &cobra.Command{
                Use: "cloud-controller-manager",
                Long: `The Cloud controller manager is a daemon that embeds
the cloud specific control loops shipped with Kubernetes.`,
                Run: func(cmd *cobra.Command, args []string) {
                        verflag.PrintAndExitIfRequested()
                        utilflag.PrintFlags(cmd.Flags())

                        c, err := s.Config(KnownControllers(), ControllersDisabledByDefault.List())
                        if err != nil {
                                fmt.Fprintf(os.Stderr, "%v\n", err)
                                os.Exit(1)
                        }

                        if err := Run(c.Complete(), wait.NeverStop); err != nil {
                                fmt.Fprintf(os.Stderr, "%v\n", err)
                                os.Exit(1)
                        }

                },
        }

        fs := cmd.Flags()
        namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault.List())
        verflag.AddFlags(namedFlagSets.FlagSet("global"))
        globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name())

        if flag.CommandLine.Lookup("cloud-provider-gce-lb-src-cidrs") != nil {
                // hoist this flag from the global flagset to preserve the commandline until
                // the gce cloudprovider is removed.
                globalflag.Register(namedFlagSets.FlagSet("generic"), "cloud-provider-gce-lb-src-cidrs")
        }
        for _, f := range namedFlagSets.FlagSets {
                fs.AddFlagSet(f)
        }
        usageFmt := "Usage:\n  %s\n"
        cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
        cmd.SetUsageFunc(func(cmd *cobra.Command) error {
                fmt.Fprintf(cmd.OutOrStderr(), usageFmt, cmd.UseLine())
                cliflag.PrintSections(cmd.OutOrStderr(), namedFlagSets, cols)
                return nil
        })
        cmd.SetHelpFunc(func(cmd *cobra.Command, args []string) {
                fmt.Fprintf(cmd.OutOrStdout(), "%s\n\n"+usageFmt, cmd.Long, cmd.UseLine())
                cliflag.PrintSections(cmd.OutOrStdout(), namedFlagSets, cols)
        })

        return cmd
}
4.在 ccm 执行 Run 时首先进行 ccm 的初始化;主要用来确定是 In Tree 还是 Out of Tree,并创建一个 cloudprovider 实例
//staging/src/k8s.io/cloud-provider/plugins.go
func InitCloudProvider(name string, configFilePath string) (Interface, error) {
        var cloud Interface
        var err error

        if name == "" {
                klog.Info("No cloud provider specified.")
                return nil, nil
        }

        if IsExternal(name) {
                klog.Info("External cloud provider specified")
                return nil, nil
        }

        for _, provider := range deprecatedCloudProviders {
                if provider.name == name {
                        detail := provider.detail
                        if provider.external {
                                detail = fmt.Sprintf("Please use 'external' cloud provider for %s: %s", name, provider.detail)
                        }
                        klog.Warningf("WARNING: %s built-in cloud provider is now deprecated. %s", name, detail)

                        break
                }
        }

        if configFilePath != "" {
                var config *os.File
                config, err = os.Open(configFilePath)
                if err != nil {
                        klog.Fatalf("Couldn't open cloud provider configuration %s: %#v",
                                configFilePath, err)
                }

                defer config.Close()
                cloud, err = GetCloudProvider(name, config)
        } else {
                // Pass explicit nil so plugins can actually check for nil. See
                // "Why is my nil error value not equal to nil?" in golang.org/doc/faq.
                cloud, err = GetCloudProvider(name, nil)
        }

        if err != nil {
                return nil, fmt.Errorf("could not init cloud provider %q: %v", name, err)
        }
        if cloud == nil {
                return nil, fmt.Errorf("unknown cloud provider %q", name)
        }

        return cloud, nil
}
5.在初始化结束后 ccm 将会启动一个 HTTP Server 并监听 10258 端口 由于 k8s 固定了 ccm 监听的端口,导致无法同时运行两个 ccm 随后会进行多个 ccm 实例之间的选举;选举机制保证了一个资源不会被 ccm 处理多次 选举成功后会进入到 ccm 的控制循环当中
if c.SecureServing != nil {
        unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
        handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
        // TODO: handle stoppedCh returned by c.SecureServing.Serve
        if _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
                return err
        }
}
if c.InsecureServing != nil {
        unsecuredMux := genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, checks...)
        insecureSuperuserAuthn := server.AuthenticationInfo{Authenticator: &server.InsecureSuperuser{}}
        handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, nil, &insecureSuperuserAuthn)
        if err := c.InsecureServing.Serve(handler, 0, stopCh); err != nil {
                return err
        }
}

run := func(ctx context.Context) {
        if err := startControllers(c, ctx.Done(), cloud, newControllerInitializers()); err != nil {
                klog.Fatalf("error running controllers: %v", err)
        }
}

if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
        run(context.TODO())
        panic("unreachable")
}

leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{
        Lock:          rl,
        LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
        RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
        RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
        Callbacks: leaderelection.LeaderCallbacks{
                OnStartedLeading: run,
                OnStoppedLeading: func() {
                        klog.Fatalf("leaderelection lost")
                },
        },
        WatchDog: electionChecker,
        Name:     "cloud-controller-manager",
})
6.在 startControllers 中会设置一个 informer 给这个实例,用来监听 APIServer 的事件,并运行资源相应的控制器
func startControllers(c *cloudcontrollerconfig.CompletedConfig, stopCh <-chan struct{}, cloud cloudprovider.Interface, controllers map[string]initFunc) error {
        // Initialize the cloud provider with a reference to the clientBuilder
        cloud.Initialize(c.ClientBuilder, stopCh)
        // Set the informer on the user cloud object
        if informerUserCloud, ok := cloud.(cloudprovider.InformerUser); ok {
                informerUserCloud.SetInformers(c.SharedInformers)
        }

        for controllerName, initFn := range controllers {
                if !genericcontrollermanager.IsControllerEnabled(controllerName, ControllersDisabledByDefault, c.ComponentConfig.Generic.Controllers) {
                        klog.Warningf("%q is disabled", controllerName)
                        continue
                }

                klog.V(1).Infof("Starting %q", controllerName)
                _, started, err := initFn(c, cloud, stopCh)
                if err != nil {
                        klog.Errorf("Error starting %q", controllerName)
                        return err
                }
                if !started {
                        klog.Warningf("Skipping %q", controllerName)
                        continue
                }
                klog.Infof("Started %q", controllerName)

                time.Sleep(wait.Jitter(c.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
        }

        // If apiserver is not running we should wait for some time and fail only then. This is particularly
        // important when we start apiserver and controller manager at the same time.
        if err := genericcontrollermanager.WaitForAPIServer(c.VersionedClient, 10*time.Second); err != nil {
                klog.Fatalf("Failed to wait for apiserver being healthy: %v", err)
        }

        c.SharedInformers.Start(stopCh)

func newControllerInitializers() map[string]initFunc {
        controllers := map[string]initFunc{}
        controllers["cloud-node"] = startCloudNodeController
        controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController
        controllers["service"] = startServiceController
        controllers["route"] = startRouteController
        return controllers
}

func startServiceController(ctx *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface, stopCh <-chan struct{}) (http.Handler, bool, error) {
        // Start the service controller
        serviceController, err := servicecontroller.New(
                cloud,
                ctx.ClientBuilder.ClientOrDie("service-controller"),
                ctx.SharedInformers.Core().V1().Services(),
                ctx.SharedInformers.Core().V1().Nodes(),
                ctx.ComponentConfig.KubeCloudShared.ClusterName,
        )
        if err != nil {
                // This error shouldn't fail. It lives like this as a legacy.
                klog.Errorf("Failed to start service controller: %v", err)
                return nil, false, nil
        }

        go serviceController.Run(stopCh, int(ctx.ComponentConfig.ServiceController.ConcurrentServiceSyncs))

        return nil, true, nil
}
7.然后进入到 k8s service controller 的运行当中
func (s *ServiceController) init() error {
        if s.cloud == nil {
                return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail")
        }

        balancer, ok := s.cloud.LoadBalancer()
        if !ok {
                return fmt.Errorf("the cloud provider does not support external load balancers")
        }
        s.balancer = balancer

        return nil
}
Author: Byron wang
Created: 2019-08-15 Thu 17:08

评论

此博客中的热门博文

在 LSF 中使用 docker 运行任务

LSF + Docker Table of Contents1. 环境信息2. 修改配置文件在 lsf 上启用 docker3. 验证4. 部署常见问题5. 部署参考链接 1 环境信息 docker 18.09.5
Kernel Version: 3.10.0-862.11.6.el7.x86_64
lsf 10.1.0.6
OS CentOs 7.6.1810 2 修改配置文件在 lsf 上启用 docker 1.conf/lsf.conf
添加/修改LSF_PROCESS_TRACKING=Y LSF_LINUX_CGROUP_ACCT=Y LSB_RESOURCE_ENFORCE="cpu memory" 2.conf/lsf.shared
添加docker Boolean () () (Docker container) 3.conf/lsf.cluster
添加$your-host-name ! ! 1 3.5 () () (docker) 4./conf/lsbatch/$clustername/configdir/lsb.applications
添加Begin Application NAME = app1 CONTAINER = docker[image(ubuntu:latest) options(--rm --network=host --ipc=host -v /etc/passwd:/etc/passwd -v /etc/group:/etc/group) starter(root)] DESCRIPTION = Test Docker Application Profile 1 End Application 5.badmin reconfig 验证是否可用 3 验证 在非 root 用户下, bsub -app app1 -I cat /etc/lsb-release
DISTRIB_ID=Ubuntu DISTRIB_RELEASE=18.04 DISTRIB_CODENAME=bionic DISTRIB_DESCRIPTION="Ubuntu 18.04.2 LTS" 4 部署常见问题 1.badmin reconfig 出现 …

k8s 源码阅读 -- eviction

Table of Contents1. 前言2. 资料3. 代码详解3.1. 代码参考3.2. 详细 1 前言 在某些情况下 k8s 会出现 evicted 的 pod, 然而这并不在 pod 的生命周期中.这就是 k8s 的驱逐机制。
当机器的一些资源(内存、磁盘)过小时,为了保证 node 不会受到影响,会将 pod 驱逐至其他的机器上 2 资料 可以在 这里看到相关资料
来看一下代码中,驱逐策略是怎样实现的 3 代码详解 3.1 代码参考kubernetes release-1.10 3.2 详细 pkg/kubelet/apis/kubeletconfig/v1beta/default.go
定义了这几个默认值作为阈值
pkg/kubelet/kubelet.go
kubelte 初始化了 eviction manager
在 runtime 相关模块被加载时,eviction manager 被加载进来
开始了 evict 相关的控制循环
接下来是 evict 真正工作的代码
代码目录是 pkg/kubelet/eviction/
主要看该目录下的两个文件 evictionmanager.go helpers.go
pkg/kubelet/eviction/evictionmanager.go
Start 是 evict manager 的入口
这里是一个死循环
循环中的主要函数是 synchronize 用来清理 pod、同步信息。这个就是今天的主角
先看一下 synchronize 的参数 diskInfoProvider podFunc
diskInfoProvider 是一个接口,用来提供磁盘的信息,作为是否发生驱逐的依据。实际函数在 pkg/kubelet/stats/ 下
synchronize 中仅用到了 HasDedicatedImageFs
podFunc 用来获取一个待检查的 pod 列表,实际函数在 pkg/kubelet/kubeletpods.go
首先检查 imagesfs, 数据从 cadvisor 中获取
获得容器信息和 kubelet 总计状态
summaryProvider 的实际函数在 /pkg/kubelet/server/stats/summary.go
开始监视当前的系统状态
监视这些数据 Node.Memory al…