kubelet组件

## kubelet 每个节点上都运行一个kubelet服务进程,默认监听10250端口。 ●接收并执行master发来的指令; ●管理Pod及Pod中的容器; ●每个kubelet进程会在API Server上注册节点自身信息,定期向master节点汇报节点的资源使用情况,并通过cAdvisor监控节点和容器的资源。 ### kubelet架构 ![image.png](https://cos.easydoc.net/97954506/files/l18xo5b7.png) ### kubelet 管理pod的核心流程 ![image.png](https://cos.easydoc.net/97954506/files/l18xp25m.png) ### 节点管理 节点管理主要是节点自注册和节点状态更新: ●kubelet 可以通过设置启动参数`--register-node`来确定是否向API Server注册自己; ●如果kubelet没有选择自注册模式,则需要用户自己配置Node资源信息,同时需要告知kubelet集群上的API Server的位置; ●kubelet 在启动时通过API Server 注册节点信息,并定时向 API Server 发送节点新消息,API Server在接收到新消息后,将信息写入etcd。 ### pod管理 获取Pod清单: ●文件:启动参数--config指定的配置目录下的文件(默认/etc/Kubernetes/manifests/)。 该文件每20秒重新检查一次(可配置) ●HTTP endpoint (URL) :启动参数--manifest-url设置。每20秒检查- 次这个端点(可配置)。 ●API Server: 通过API Server监听etcd目录,同步Pod清单。 ●HTTP Server: kubelet侦听HTTP请求,并响应简单的API以提交新的Pod清单。 ### pod启动流程 ![image.png](https://cos.easydoc.net/97954506/files/l18y91oa.png) ### kubelet启动pod的流程 ![image.png](https://cos.easydoc.net/97954506/files/l18ya8es.png) 监控 ![image.png](https://cos.easydoc.net/97954506/files/l1q40qm1.png) ## kubelet 代码走读 ```golang server.go:NewKubeletCommand()-->> kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)-->> plugins, err := ProbeVolumePlugins(featureGate) server.go:Run(kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate, stopCh)-->> run(s, kubeDeps, featureGate, stopCh)-->> kubeDeps.ContainerManager, err = cm.NewContainerManager() // init runtime service(CRI), -container-runtime=remote --runtime-request-timeout=15m --container-runtime-endpoint=unix:///var/containerd/containerd.sock kubelet.PreInitRuntimeService()-->> remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration) RunKubelet(s, kubeDeps, s.RunOnce)-->> createAndInitKubelet()-->> kubelet.NewMainKubelet()-->> makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)-->> updatechannel = cfg.Channel(kubetypes.ApiserverSource) klet := &Kubelet{} //*******init volume plugins runtime, err := kuberuntime.NewKubeGenericRuntimeManager() NewInitializedVolumePluginMgr()-->> kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh)-->> startKubelet()-->> k.Run(podCfg.Updates())-->> //*******run volume manager, and reconcile function would attach volume for attachable plugin and mount volume go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)-->> vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)-->> populatorLoop-->> dswp.findAndAddNewPods()-->> dswp.processPodVolumes(pod, mountedVolumesForPod, processedVolumesForFSResize)-->> mounts, devices := util.GetPodVolumeNames(pod) dswp.createVolumeSpec(podVolume, pod.Name, pod.Namespace, mounts, devices) dswp.desiredStateOfWorld.AddPodToVolume(uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)-->> dsw.volumesToMount[volumeName] = volumeToMount{} vm.reconciler.Run(stopCh)-->> reconciliationLoopFunc() -->> // reconcile every 100 ms mountAttachVolumes-->> rc.desiredStateOfWorld.GetVolumesToMount() rc.operationExecutor.AttachVolume()-->> // attachable plugin, e.g. CSI plugin operationGenerator.GenerateAttachVolumeFunc(volumeToAttach, actualStateOfWorld).Run() rc.operationExecutor.MountVolume()-->> // volume need to mount, like ceph, configmap, emptyDir oe.operationGenerator.GenerateMapVolumeFunc().Run()-->> volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec) volumePlugin.NewMounter() volumeMounter.SetUp() kl.syncLoop(updates, kl)-->> kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh)-->> //*******handle pod creation event handler.HandlePodAdditions(u.Pods)-->> kl.podManager.AddPod(pod) kl.canAdmitPod(activePods, pod) // check admit, if admit check fail, it will error out kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)-->> kl.podWorkers.UpdatePod()-->> p.managePodLoop(podUpdates)-->> p.syncPodFn()-->> kubelet.go:syncPod()-->> runnable := kl.canRunPod(pod) // check soft admin, pod will be pending if check fails kl.runtimeState.networkErrors() // check network plugin status kl.containerManager.UpdateQOSCgroups() kl.makePodDataDirs(pod) kl.volumeManager.WaitForAttachAndMount(pod) (kubeGenericRuntimeManager)kl.containerRuntime.SyncPod()-->> m.computePodActions(pod, podStatus) // create sandbox container? m.createPodSandbox(pod, podContainerChanges.Attempt)-->> m.osInterface.MkdirAll(podSandboxConfig.LogDirectory, 0755) //*******calling CRI m.runtimeService.RunPodSandbox(podSandboxConfig, runtimeHandler)-->>// k8s.io/cri-api/pkg/apis/runtime/v1alpha2/api.pb.go c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/RunPodSandbox") // call remote runtime service which is served by containerd start("init container", containerStartSpec(container))-->> startContainer() start("container", containerStartSpec(&pod.Spec.Containers[idx]))-->> startContainer()-->> m.imagePuller.EnsureImageExists(pod, container, pullSecrets, podSandboxConfig)-->> c.cc.Invoke(ctx, "/runtime.v1alpha2.ImageService/PullImage" m.runtimeService.CreateContainer(podSandboxID, containerConfig, podSandboxConfig)-->> c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/CreateContainer") m.internalLifecycle.PreStartContainer(pod, container, containerID) // set cpu set m.runtimeService.StartContainer(containerID)-->> c.cc.Invoke(ctx, "/runtime.v1alpha2.RuntimeService/StartContainer") containerd server.go:service.Register(grpcServer)-->> runtime.RegisterRuntimeServiceServer(s, instrumented)-->> s.RegisterService(&_RuntimeService_serviceDesc, srv)-->> Handler: _RuntimeService_RunPodSandbox_Handler, // api.pb.go srv.(RuntimeServiceServer).RunPodSandbox(ctx, in)-->> //"/runtime.v1alpha2.RuntimeService/RunPodSandbox" RunPodSandbox()-->> // pkg/server/sandbox_run.go sandboxstore.NewSandbox() c.ensureImageExists() c.getSandboxRuntime(config, r.GetRuntimeHandler()) netns.NewNetNS() c.setupPodNetwork(ctx, &sandbox)-->> c.netPlugin.Setup(ctx, id, path, opts...)-->> network.Attach(ctx, ns)-->> n.cni.AddNetworkList(ctx, n.config, ns.config(n.ifName))-->> c.addNetwork(ctx, list.Name, list.CNIVersion, net, result, rt)-->>// for each network plugin c.exec.FindInPath(net.Network.Type, c.Path) buildOneConfig(name, cniVersion, net, prevResult, rt) invoke.ExecPluginWithResult(ctx, pluginPath, newConf.Bytes, c.args("ADD", rt), c.exec) c.client.NewContainer(ctx, id, opts...) c.os.MkdirAll(sandboxRootDir, 0755) c.os.MkdirAll(volatileSandboxRootDir, 0755) c.setupSandboxFiles(id, config) container.NewTask(ctx, containerdio.NullIO, taskOpts...) task.Start(ctx)-->> c.client.TaskService().Create(ctx, request)-->> s.local.Create(ctx, r)-->> l.getRuntime(container.Runtime.Name) rtime.Create(ctx, r.ContainerID, opts)-->> b := shimBinary(ctx, bundle, opts.Runtime, m.containerdAddress, m.containerdTTRPCAddress, m.events, m.tasks) b.Start() shim.Create(ctx, opts)-->> c.client.Call(ctx, "containerd.task.v2.Task", "Create", req, &resp) runtime.RegisterImageServiceServer(s, instrumented) ```