After dispatching Pod to a Node Node in K8s, the subsequent state maintenance information is maintained by Kubelet on the corresponding machine. How to feed back the local running state in real time and inform Apiserver is the difficulty in design. This section mainly analyzes the core data structure of Pod through two processes of sensing state changes and detecting state changes, so as to understand the internal design

1. Status management

1.1 static Pod

Static pods mainly refer to those pods that are not created by sensing apiserver, because apiserver does not contain, but also need to maintain and obtain the status of such pods. K8s has designed a concept of mirrored pods. The main information of the Pod is consistent with that of the static Pod, and it is created in Apiserver. The mirrored Pod can be sensed by Apiserver to reflect the status of the real static Pod.

1.2 Status Data source

StatusManager is the key component for state synchronization. It needs to integrate the data in the current Pod operation and the data stored by Apiserver to determine the final state transition. Here, we will focus on the diagram first, and more states will be introduced later

2. Version consistency

Type versionedPodStatus struct {status v1.PodStatus // Incremented version number (each POD) Version uint64 // POD name & namespace, for sending updates to API server. podName string podNamespace string }Copy the code

In Kubelet, in order to ensure the synchronization with apiserver end information, a Pod state version information is saved locally, which in addition to save the current Pod state data and a version version number, through the monotonic increasing version number comparison to determine whether to carry on the synchronization of the state

3. Core source code implementation

The statusManager process is actually quite complicated. Today we will only talk about one scenario, that is, Kubelet senses a Pod update through Apiserver, and then combs the data flow in statusMangaer by following the data flow of this function

3.1 Core data structure

Core state-related data structures in Manager can be divided into two main categories: Mapping data maintenance (podManager, podStatuses, apiStatusVersions) data communication channels (podStatusChannel), The rest are podDeletionSafety checks for kublets that communicate with Apiserver and for pod deletions

Type manager struct {kubeClient clientset.Interface; PodManager kubepod.manager // Maps the version status information from the POD UID to the corresponding pod. podStatuses map[types.UID]versionedPodStatus podStatusesLock sync.RWMutex podStatusChannel chan podStatusSyncRequest // Store image pod version apiStatusVersions map [kubetypes MirrorPodUID] uint64 podDeletionSafety PodDeletionSafetyProvider}Copy the code

3.2 Setting the Pod status

Setting the Pod state is mainly in syncPod in Kubelet. After receiving the Pod event change, it will synchronize the latest Pod data with Apiserver to obtain the latest status of the current Pod in Apiserver

func (m *manager) SetPodStatus(pod *v1.Pod, status v1.PodStatus) { m.podStatusesLock.Lock() defer m.podStatusesLock.Unlock() for _, c := range pod.Status.Conditions { if ! kubetypes.PodConditionByKubelet(c.Type) { klog.Errorf("Kubelet is trying to update pod condition %q for pod %q. "+ "But it is not owned by kubelet.", string(c.Type), format.Pod(pod)) } } // Make sure we're caching a deep copy. status = *status.DeepCopy() // M.updatestatusinternal (Pod, status, pod.deletionTimestamp! = nil) }Copy the code

3.3 Updating the Internal Cache Status Generates a synchronization event

3.3.1 Obtaining cache status

Var oldStatus v1.PodStatus // Check the local cache data cachedStatus, isCached := m.podStatuses[pod.UID] if isCached { oldStatus = cachedStatus.status } else if mirrorPod, ok := m.podManager.GetMirrorPodByPod(pod); ok { oldStatus = mirrorPod.Status } else { oldStatus = pod.Status }Copy the code

3.3.2 Checking container status

The detection of container status is mainly aimed at detecting the legitimacy of container termination state forwarding. In fact, it is detected whether a terminated container can be restarted according to the set Pod RestartPolicy

if err := checkContainerStateTransition(oldStatus.ContainerStatuses, status.ContainerStatuses, pod.Spec.RestartPolicy); err ! = nil { klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err) return false } if err := checkContainerStateTransition(oldStatus.InitContainerStatuses, status.InitContainerStatuses, pod.Spec.RestartPolicy); err ! = nil { klog.Errorf("Status update on pod %v/%v aborted: %v", pod.Namespace, pod.Name, err) return false }Copy the code

3.3.3 Update PodCondition last conversion time

Set the PodCondition’s LastTransitionTime update to the current time using the condition in the latest status

    // Set ContainersReadyCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.ContainersReady)

    // Set ReadyCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodReady)

    // Set InitializedCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodInitialized)

    // Set PodScheduledCondition.LastTransitionTime.
    updateLastTransitionTime(&status, &oldStatus, v1.PodScheduled)Copy the code

3.3.4 Proofread information truncated for a long time

Firstly, the maximum number of bytes for each container will be determined according to the current number of containers. Then, the Message Message in the termination state in the container will be truncated and the time will be checked

    normalizeStatus(pod, &status)Copy the code

3.3.5 Checking status update conditions

If the corresponding data has been cached before and the cached data has not changed from the current state and does not need to be updated forcibly, it is returned directly

if isCached && isPodStatusByKubeletEqual(&cachedStatus.status, &status) && ! Klog.v (3).Infof("Ignoring same status for POD %q, status: %+v", format.Pod(pod), status) return false // No new status. }Copy the code

3.3.6 Generating synchronization Events to update the cache

Generates the latest status cache data and increments the local version information

NewStatus := versionedPodStatus{status: status, version: cachedStatus.version + 1, // Update cache podName: pod.Name, podNamespace: pod.Namespace, } // update new cache status m.podStatuses[pod.uid] = newStatus select {case m.podStatusChannel < -podStatusSyncrequest {pod.uid, NewStatus}: // Build a new synchronization request klog.v (5).infof ("Status Manager: Adding Pod: %q, with Status: (%d, %v) to podStatusChannel", pod.UID, newStatus.version, newStatus.status) return true default: // Let the periodic syncBatch handle the update if the channel is full. // We can't block, since we hold the mutex lock. klog.V(4).Infof("Skipping the status update for pod %q for now because the channel is full; status: %+v", format.Pod(pod), status) return false }Copy the code

3.4 Probe status updates

The detection state is actually the operating state of the container in Pod. For example, if the Readiness detection is set, when a container fails to be detected, Kubelet will notify the corresponding service to remove the Pod from the backend enpoint

3.4.1 Obtaining the current status

func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, Ready bool) {m.putStatusesLock. Lock() defer m.putStatuseslock. Unlock() // get the local container pod, ok := m.podManager.GetPodByUID(podUID) if ! ok { klog.V(4).Infof("Pod %q has been deleted, no need to update readiness", OldStatus, found := m.podStatuses[pod.uid] if! found { klog.Warningf("Container readiness changed before pod has synced: %q - %q", format.pod (Pod), containerId.string ()) return} ok := findContainerStatus(&oldStatus.status, containerID.String()) if ! ok { klog.Warningf("Container readiness changed for unknown container: %q - %q", format.Pod(pod), containerID.String()) return }Copy the code

3.4.2 Checking whether the status changes

If containerStatus.Ready == Ready {klog.v (4).Infof("Container Readiness Unchanged (% V): %q - %q", ready, format.Pod(pod), containerID.String()) return }Copy the code

3.4.3 Modifying the Ready State of a Container

Gets the state of the container, modified ready to the current state

    status := *oldStatus.status.DeepCopy()
    containerStatus, _, _ = findContainerStatus(&status, containerID.String())
    containerStatus.Ready = readyCopy the code

3.4.4 Modify according to the latest container status

The state in the corresponding PodCondition is modified based on the state detected by the container at the current runtime, and the internal update logic is called

updateConditionFunc := func(conditionType v1.PodConditionType, Condition v1. PodCondition) {conditionIndex: = 1 / / to get Pod corresponding PodCondition state for I, condition := range status.Conditions { if condition.Type == conditionType { conditionIndex = i break } } // Modified or additional Pod corresponding PodCondition state if conditionIndex! = -1 { status.Conditions[conditionIndex] = condition } else { klog.Warningf("PodStatus missing %s type condition: %+v", conditionType, status) status.Conditions = append(status.Conditions, Condition)}} / / computing Ready state updateConditionFunc (v1. PodReady GeneratePodReadyCondition (& pod. The Spec, and status. The Conditions, Status. ContainerStatuses status. Phase)) / / computing container Ready state updateConditionFunc (v1) ContainersReady, GenerateContainersReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)) m.updateStatusInternal(pod, status, false)Copy the code

3.5 Starting a Background Synchronization Task

The statusManager starts a background thread to consume synchronous requests in the update pipe

Func (m *manager) Start() {go wait.Forever(func() {select {case syncRequest := < -m.odStatusChannel: V(5).Infof("Status Manager: syncing pod: %q, with Status: (%d, %v) from podStatusChannel", syncRequest.podUID, syncRequest.status.version, syncRequest.status.status) m.syncPod(syncRequest.podUID, syncRequest.status) case <-syncTicker: m.syncBatch() } }, 0) }Copy the code

3.6 Synchronizing the Pod Status

3.6.1 Checking synchronization conditions

If Pod is not deleted, return false. If Pod is not deleted, return false. If Pod is not deleted, we still need to update its status

if ! m.needsUpdate(uid, status) { klog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid) return }Copy the code

3.6.2 Obtaining the latest Pod Data using apiserver

If no Pod information is obtained, exit directly

pod, err := m.kubeClient.CoreV1().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{}) if errors.IsNotFound(err) { klog.V(3).Infof("Pod %q does not exist on the server", Format.PodDesc(status.podname, status.podnamespace, uid)) // If Pod has been deleted, return if err! = nil { klog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err) return }Copy the code

3.6.3 Calling the Patch Interface for update

Merge the smallest state with the previous state, and then call kubeClient to modify the apiserver side state

OldStatus: = pod. Status. DeepCopy () / / update the server-side state newPod, patchBytes, err: = statusutil. PatchPodStatus (m.k ubeClient, pod.Namespace, pod.Name, pod.UID, *oldStatus, mergePodStatus(*oldStatus, status.status)) klog.V(3).Infof("Patch status for pod %q with %q", format.Pod(pod), patchBytes) if err ! = nil { klog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err) return }Copy the code

3.6.4 Updating the Local Apiserver Version Information

Pod = newPod klog.v (3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status) m.apiStatusVersions[kubetypes.MirrorPodUID(pod.UID)] = status.versionCopy the code

3.6.5 Checking deleting a Pod

This is the last stage, that is, the Pod on the Apiserver is deleted after all the resources corresponding to the Pod have been released

// If pod DeletionTimestamp is set, If m. canbedeleted (Pod, status.status) { deleteOptions := metav1.NewDeleteOptions(0) deleteOptions.Preconditions = Metav1. NewUIDPreconditions (string (pod) UID)) / / call apiserver to delete err = pod m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions) if err ! = nil { klog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err) return } klog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod)) m.deletePodStatus(uid) }Copy the code

Explore the whole design of living is probably like this, hope big guys pay more attention to, communicate together. K8s source read e-books address: www.yuque.com/baxiaoshi/t…

Wechat id: Baxiaoshi2020

Watch the bulletin number to read more source code analysis articles

More articles can be found at www.sreguide.com

This post is posted by OpenWrite, a blogging platform