Focus on the target

Understand how Kubelet works

directory

  1. The main function to run
  2. Run kubelet
  3. Core data management Kubelet
  4. Synchronous cycle
  5. Handle pod synchronization
  6. conclusion

Run

Find the run function from the main function, it’s a long code, so I’ve simplified it a little bit

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
	// A long list of configuration initialization and validation

  // done channel, which is used to notify the end of the run
	done := make(chan struct{})
	
	// Register with the configz module
	err = initConfigz(&s.KubeletConfiguration)
	iferr ! =nil {
		klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
	}

  // Get node information
	hostName, err := nodeutil.GetHostname(s.HostnameOverride)
	iferr ! =nil {
		return err
	}
	nodeName, err := getNodeName(kubeDeps.Cloud, hostName)
	iferr ! =nil {
		return err
	}

	switch {
  // Independent running mode
	case standaloneMode:
	// Initialize the client
	case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:}// Cgroup-related initialization
	var cgroupRoots []string
	nodeAllocatableRoot := cm.NodeAllocatableRoot(s.CgroupRoot, s.CgroupsPerQOS, s.CgroupDriver)
	cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
	kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
	iferr ! =nil {
		klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)
	} else ifkubeletCgroup ! ="" {
		cgroupRoots = append(cgroupRoots, kubeletCgroup)
	}

	runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
	iferr ! =nil {
		klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
	} else ifruntimeCgroup ! ="" {
		cgroupRoots = append(cgroupRoots, runtimeCgroup)
	}

	ifs.SystemCgroups ! ="" {
		cgroupRoots = append(cgroupRoots, s.SystemCgroups)
	}

  // The next big chunk is the initialization of ContainerManager
	if kubeDeps.ContainerManager == nil {
		if s.CgroupsPerQOS && s.CgroupRoot == "" {
			klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
			s.CgroupRoot = "/"
		}
    
		// CPU related information
		var reservedSystemCPUs cpuset.CPUSet

    // Instantiation of ContainerManager
		kubeDeps.ContainerManager, err = cm.NewContainerManager(
			kubeDeps.Mounter,
			kubeDeps.CAdvisorInterface,
      // Node configuration
			cm.NodeConfig{},
			s.FailSwapOn,
			devicePluginEnabled,
			kubeDeps.Recorder)

		iferr ! =nil {
			return err
		}
	}

	// Memory is OOM related
	oomAdjuster := kubeDeps.OOMAdjuster
	if err := oomAdjuster.ApplyOOMScoreAdj(0.int(s.OOMScoreAdj)); err ! =nil {
		klog.Warning(err)
	}

	// Preinitialize the Runtime
	err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
		kubeDeps, &s.ContainerRuntimeOptions,
		s.ContainerRuntime,
		s.RuntimeCgroups,
		s.RemoteRuntimeEndpoint,
		s.RemoteImageEndpoint,
		s.NonMasqueradeCIDR)
	iferr ! =nil {
		return err
	}

  / / run Kubelet
	iferr := RunKubelet(s, kubeDeps, s.RunOnce); err ! =nil {
		return err
	}

	// Notify deamon's systemd
	go daemon.SdNotify(false."READY=1")

  / / blocking
	select {
	case <-done:
		break
	case <-ctx.Done():
		break
	}

	return nil
}
Copy the code

RunKubelet

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	// Obtain node information
  hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
	iferr ! =nil {
		return err
	}
	nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
	iferr ! =nil {
		return err
	}
	hostnameOverridden := len(kubeServer.HostnameOverride) > 0

  // Create and initialize kubelet
	k, err := createAndInitKubelet()
	iferr ! =nil {
		return fmt.Errorf("failed to create kubelet: %v", err)
	}

	if runOnce {
		if_, err := k.RunOnce(podCfg.Updates()); err ! =nil {
			return fmt.Errorf("runonce failed: %v", err)
		}
		klog.Info("Started kubelet as runonce")}else {
    / / start kubelet
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
		klog.Info("Started kubelet")}return nil
}

// Start running, all concurrent
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
	/ / run
	go k.Run(podCfg.Updates())

	// Start kubelet HTTP server
	if enableServer {
		go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth,
			enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling, kubeCfg.EnableSystemLogHandler)

	}
  // Read-only port
	if kubeCfg.ReadOnlyPort > 0 {
		go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
	}
	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
		go k.ListenAndServePodResources()
	}
}
Copy the code

Kubelet

// where k is an interface definition, we need to go back
type Bootstrap interface {
	GetConfiguration() kubeletconfiginternal.KubeletConfiguration
	BirthCry()
	StartGarbageCollection()
	ListenAndServe(address net.IP, port uint, tlsOptions *server.TLSOptions, auth server.AuthInterface, enableCAdvisorJSONEndpoints, enableDebuggingHandlers, enableContentionProfiling, enableSystemLogHandler bool)
	ListenAndServeReadOnly(address net.IP, port uint, enableCAdvisorJSONEndpoints bool)
	ListenAndServePodResources()
	Run(<-chan kubetypes.PodUpdate)
	RunOnce(<-chan kubetypes.PodUpdate) ([]RunPodResult, error)
}

// View the corresponding instantiation function
func createAndInitKubelet(a) (k kubelet.Bootstrap, err error) {
	k, err = kubelet.NewMainKubelet()
	return k, nil
}

func NewMainKubelet(a) (*Kubelet, error) {
	// Initialization of parameters
	
  // The instantiation structure of klet
	klet := &Kubelet{}

  // The following is the padding of the various parameters in klet
	return klet, nil
}

func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	// Initialization of the internal module
	iferr := kl.initializeModules(); err ! =nil {
		kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
		klog.Fatal(err)
	}

	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

	ifkl.kubeClient ! =nil {
    // Synchronize node status with kube-apiserver
		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
		go kl.fastStatusUpdateOnce()
		go kl.nodeLeaseController.Run(wait.NeverStop)
	}
	go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

	if kl.makeIPTablesUtilChains {
		kl.initNetworkUtil()
	}

  // A goroutine for a kill pod
	go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop)

	kl.statusManager.Start()
	kl.probeManager.Start()

	ifkl.runtimeClassManager ! =nil {
		kl.runtimeClassManager.Start(wait.NeverStop)
	}

	kl.pleg.Start()
  // Synchronize the main logic
	kl.syncLoop(updates, kl)
}
Copy the code

syncLoop

func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
  // Start kubelet's main synchronization loop
	klog.Info("Starting kubelet main sync loop.")
  
  // Ticker once per second
	syncTicker := time.NewTicker(time.Second)
	defer syncTicker.Stop()
  // housekeeping cycle
	housekeepingTicker := time.NewTicker(housekeepingPeriod)
	defer housekeepingTicker.Stop()
	
	for {
		kl.syncLoopMonitor.Store(kl.clock.Now())
    / / synchronize
		if! kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {break
		}
		kl.syncLoopMonitor.Store(kl.clock.Now())
	}
}

// The three channels are important: configCh for configuration, syncCh for synchronization, and housekeepingCh for cleanup
func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
    // config channel is disabled
		if! open { klog.Errorf("Update channel is closed. Exiting the sync loop.")
			return false
		}
		// Corresponding to different operations
		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.REMOVE:
			klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodRemoves(u.Pods)
		case kubetypes.RECONCILE:
			klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodReconcile(u.Pods)
		case kubetypes.DELETE:
			klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		case kubetypes.SET:
			klog.Errorf("Kubelet does not support snapshot update")
		default:
			klog.Errorf("Invalid event type received: %d.", u.Op)
		}

		kl.sourcesReady.AddSource(u.Source)

	case e := <-plegCh:
		
	case <-syncCh:
		// Get the pod to be synchronized
    // Here we receive the nginx pod to be created in the example
		podsToSync := kl.getPodsToSync()
		if len(podsToSync) == 0 {
			break
		}
		klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s".len(podsToSync), format.Pods(podsToSync))
    // Start processing
		handler.HandlePodSyncs(podsToSync)
	case update := <-kl.livenessManager.Updates():

	case <-housekeepingCh:
		if! kl.sourcesReady.AllReady() {// Clear not ready, skip directly
			klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.")}else {
      // Start cleaning up pod
			klog.V(4).Infof("SyncLoop (housekeeping)")
			iferr := handler.HandlePodCleanups(); err ! =nil {
				klog.Errorf("Failed cleaning pods: %v", err)
			}
		}
	}
	return true
}
Copy the code

handler

Looking ahead to the code, handler is Kubelet

func (kl *Kubelet) HandlePodSyncs(pods []*v1.Pod) {
	start := kl.clock.Now()
	for _, pod := range pods {
    // Get the pod and distribute it
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start)
	}
}

func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// Call UpdatePod
	kl.podWorkers.UpdatePod(&UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		OnCompleteFunc: func(err error) {
			iferr ! =nil {
				metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start))
			}
		},
	})
}

Klet. PodWorkers = newPodWorkers(klet. SyncPod, kubedeps. Recorder, klet. WorkQueue, klet. ResyncInterval, backOffPeriod, klet.podCache)
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
	pod := options.Pod
	uid := pod.UID
	var podUpdates chan UpdatePodOptions
	var exists bool

	p.podLock.Lock()
	defer p.podLock.Unlock()
  // If pod does not exist, it is a new pod
	ifpodUpdates, exists = p.podUpdates[uid]; ! exists { podUpdates =make(chan UpdatePodOptions, 1)
		p.podUpdates[uid] = podUpdates

    // Concurrent processing
		go func(a) {
			defer runtime.HandleCrash()
			p.managePodLoop(podUpdates)
		}()
	}
	if! p.isWorking[pod.UID] { p.isWorking[pod.UID] =true
		podUpdates <- *options
	} else {
		update, found := p.lastUndeliveredWorkUpdate[pod.UID]
		if! found || update.UpdateType ! = kubetypes.SyncPodKill { p.lastUndeliveredWorkUpdate[pod.UID] = *options } } }func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
	var lastSyncTime time.Time
	for update := range podUpdates {
		err := func(a) error {
      // Synchronize pod functions
			err = p.syncPodFn(syncPodOptions{
				mirrorPod:      update.MirrorPod,
				pod:            update.Pod,
				podStatus:      status,
				killPodOptions: update.KillPodOptions,
				updateType:     update.UpdateType,
			})
			lastSyncTime = time.Now()
			return err
		}()
		
		p.wrapUp(update.Pod.UID, err)
	}
}

// Find the function where syncPodFn is instantiated
func (kl *Kubelet) syncPod(o syncPodOptions) error {
  
  // There is a long list of logic, which is not easy to read, so we will only focus on the core part
  
  // Call the Container Runtime to create the pod
	result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff)
	kl.reasonCache.Update(pod.UID, result)
	iferr := result.Error(); err ! =nil {
		for _, r := range result.SyncResults {
			ifr.Error ! = kubecontainer.ErrCrashLoopBackOff && r.Error ! = images.ErrImagePullBackOff {return err
			}
		}
		return nil
	}
	return nil
}
Copy the code

Summary

  1. Kubelet is the manager of the Kubernetes Node

  2. Kubelet receives pod messages from kube-Apiserver and triggers the synchronization function with Ticker periodically

  3. Kubelet manages the Container asynchronously by calling the Container Runtime Interface.

Making: github.com/Junedayday/…

Blog: junes.tech/

Public id: Golangcoding