Author: Ying Jianjian, Xinhua Wisdom Cloud Computing Center

OpenYurt is the first non-invasive edge computing cloud native open source project in the industry. It provides users with cloud and edge integration experience through edge autonomy, cloud and edge collaboration, edge unification, and edge traffic closed-loop capabilities. In Openyurt, the edge network can use the data filtering framework to realize the closed-loop capability of edge traffic in different node pools.

Yurthub data filtering framework parsing

Yurthub is essentially a layer of Kube-Apiserver proxy, and a layer of cache is added on the basis of proxy, so that local cache can be used to ensure service stability when edge nodes are offline, effectively solving the problem of edge autonomy. Second, it can reduce the load on the cloud API caused by a large number of List & Watch operations.

The data filtering of Yurthub is sent to Kube-Apiserver through the Load Balancer through the POD on the node and the request of Kubelet. The agent receives the response message for data filtering and then returns the filtered data to the requester. If the node is an edge node, it will cache the resources in the response request body locally according to the request type. If it is a cloud node, it will not cache locally considering the good network status.

Yurthub filtering framework implementation schematic:

Yurthub currently contains four filtering rules. The user agent, resource, and verb requested by Addons will filter the data according to the filter.

Four kinds of filtering rules function and implementation

ServiceTopologyFilter 

Data filtering is performed for EndpointSlice resources. However, EndpointSlice must be supported in Kubernetes V1.18 or later. If Kubernetes V1.18 or later, endpointsFilter is recommended. Kubernetes. IO /service-name = kubernetes. IO /service-name = kubernetes. IO /service-name = kubernetes. IO /service-name Then determine whether the Servces resource has the value of OpenYurt. IO /topologyKeys Annotations. If so, determine the data filtering rule based on the value of the Annotations. Finally, the response data is updated and returned to Addons.

The value of Annotations is divided into two categories:

Kubernetes. IO /hostname: filters out only the endpoint IP of the same node

Openyurt. IO /nodepool or kubernetes. IO /zone: Obtain the node pool using this Annotations, and finally traverse the endpointSlice resource. The endpointSlice object is identified by the kubernetes. IO /hostname field in the endpointSlice object. The Endpoints in endpointSlice are then reorganized and returned to Addons.

Code implementation:

func (fh *serviceTopologyFilterHandler) reassembleEndpointSlice(endpointSlice *discovery.EndpointSlice) *discovery.EndpointSlice { var serviceTopologyType string // get the service Topology type if svcName, ok := endpointSlice.Labels[discovery.LabelServiceName]; ok { svc, err := fh.serviceLister.Services(endpointSlice.Namespace).Get(svcName) if err ! = nil { klog.Infof("skip reassemble endpointSlice, failed to get service %s/%s, err: %v", endpointSlice.Namespace, svcName, err) return endpointSlice } if serviceTopologyType, ok = svc.Annotations[AnnotationServiceTopologyKey]; ! ok { klog.Infof("skip reassemble endpointSlice, service %s/%s has no annotation %s", endpointSlice.Namespace, svcName, AnnotationServiceTopologyKey) return endpointSlice } } var newEps []discovery.Endpoint // if type of service Topology is  'kubernetes.io/hostname' // filter the endpoint just on the local host if serviceTopologyType == AnnotationServiceTopologyValueNode { for i := range endpointSlice.Endpoints { if endpointSlice.Endpoints[i].Topology[v1.LabelHostname] == fh.nodeName { newEps = append(newEps, endpointSlice.Endpoints[i]) } } endpointSlice.Endpoints = newEps } else if serviceTopologyType == AnnotationServiceTopologyValueNodePool || serviceTopologyType == AnnotationServiceTopologyValueZone { // if type of service Topology is openyurt.io/nodepool // filter the endpoint just on the node which is in the same nodepool with current node currentNode, err := fh.nodeGetter(fh.nodeName) if err ! = nil { klog.Infof("skip reassemble endpointSlice, failed to get current node %s, err: %v", fh.nodeName, err) return endpointSlice } if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok { nodePool, err := fh.nodePoolLister.Get(nodePoolName) if err ! = nil { klog.Infof("skip reassemble endpointSlice, failed to get nodepool %s, err: %v", nodePoolName, err) return endpointSlice } for i := range endpointSlice.Endpoints { if inSameNodePool(endpointSlice.Endpoints[i].Topology[v1.LabelHostname], nodePool.Status.Nodes) { newEps = append(newEps, endpointSlice.Endpoints[i]) } } endpointSlice.Endpoints = newEps } } return endpointSlice }Copy the code

EndpointsFilter

To filter data for endpoints resources, first check whether the corresponding service exists on the endpoint, and then use the label of node: Apps. Openyurt. IO/pool nodepool acquisition node, then get all the nodes of node under the pool, The Ready pod address and NotReady Pod address of the same node pool are reconstructed into new Endpoints and returned to Addons.

func (fh *endpointsFilterHandler) reassembleEndpoint(endpoints *v1.Endpoints) *v1.Endpoints { svcName := endpoints.Name _, err := fh.serviceLister.Services(endpoints.Namespace).Get(svcName) if err ! = nil { klog.Infof("skip reassemble endpoints, failed to get service %s/%s, err: %v", endpoints.Namespace, svcName, err) return endpoints } // filter the endpoints on the node which is in the same nodepool with current node currentNode,  err := fh.nodeGetter(fh.nodeName) if err ! = nil { klog.Infof("skip reassemble endpoints, failed to get current node %s, err: %v", fh.nodeName, err) return endpoints } if nodePoolName, ok := currentNode.Labels[nodepoolv1alpha1.LabelCurrentNodePool]; ok { nodePool, err := fh.nodePoolLister.Get(nodePoolName) if err ! = nil { klog.Infof("skip reassemble endpoints, failed to get nodepool %s, err: %v", nodePoolName, err) return endpoints } var newEpSubsets []v1.EndpointSubset for i := range endpoints.Subsets { endpoints.Subsets[i].Addresses = filterValidEndpointsAddr(endpoints.Subsets[i].Addresses, nodePool) endpoints.Subsets[i].NotReadyAddresses = filterValidEndpointsAddr(endpoints.Subsets[i].NotReadyAddresses, nodePool) if endpoints.Subsets[i].Addresses ! = nil || endpoints.Subsets[i].NotReadyAddresses ! = nil { newEpSubsets = append(newEpSubsets, endpoints.Subsets[i]) } } endpoints.Subsets = newEpSubsets if len(endpoints.Subsets) == 0 { // this endpoints has no nodepool valid addresses for ingress controller, return nil to ignore it return nil } } return endpoints }Copy the code

MasterServiceFilter

IP and port replacement for domain names under Services. This filter scenario is mainly in the edge pod seamlessly using InClusterConfig to access cluster resources.

func (fh *masterServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { list, err := fh.serializer.Decode(b) if err ! = nil || list == nil { klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of masterServiceFilterHandler, %v", err) return b, nil } // return data un-mutated if not ServiceList serviceList, ok := list.(*v1.ServiceList) if ! ok { return b, nil } // mutate master service for i := range serviceList.Items { if serviceList.Items[i].Namespace == MasterServiceNamespace && serviceList.Items[i].Name == MasterServiceName { serviceList.Items[i].Spec.ClusterIP = fh.host  for j := range serviceList.Items[i].Spec.Ports { if serviceList.Items[i].Spec.Ports[j].Name == MasterServicePortName { serviceList.Items[i].Spec.Ports[j].Port = fh.port break } } klog.V(2).Infof("mutate master service into ClusterIP:Port=%s:%d for request %s", fh.host, fh.port, util.ReqString(fh.req)) break } } // return the mutated serviceList return fh.serializer.Encode(serviceList) }Copy the code

DiscardCloudService

One of the two types of services is LoadBalancer. Because the edge end cannot access the LoadBalancer resource, this type of service is directly filtered out by this filter. The other is for x-tunnel-server-internal-svC in the kube-system namespace. This service mainly has cloud nodes for accessing Yurt-tunnel-server. For edge nodes, the service is directly filtered out.

func (fh *discardCloudServiceFilterHandler) ObjectResponseFilter(b []byte) ([]byte, error) { list, err := fh.serializer.Decode(b) if err ! = nil || list == nil { klog.Errorf("skip filter, failed to decode response in ObjectResponseFilter of discardCloudServiceFilterHandler %v", err) return b, nil } serviceList, ok := list.(*v1.ServiceList) if ok { var svcNew []v1.Service for i := range serviceList.Items { nsName := fmt.Sprintf("%s/%s", serviceList.Items[i].Namespace, serviceList.Items[i].Name) // remove lb service if serviceList.Items[i].Spec.Type == v1.ServiceTypeLoadBalancer { if serviceList.Items[i].Annotations[filter.SkipDiscardServiceAnnotation] ! = "true" { klog.V(2).Infof("load balancer service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName) continue } } // remove cloud clusterIP service if _, ok := cloudClusterIPService[nsName]; ok { klog.V(2).Infof("clusterIP service(%s) is discarded in ObjectResponseFilter of discardCloudServiceFilterHandler", nsName) continue } svcNew = append(svcNew, serviceList.Items[i]) } serviceList.Items = svcNew return fh.serializer.Encode(serviceList) } return b, nil }Copy the code

Current situation of filter framework

The current filtering framework is relatively rigid, and the resource filtering is hard-coded into the code. Only registered resources can be filtered accordingly. In order to solve this problem, the filtering framework needs to be reformed accordingly.

The solution

Solution a:

Use parameters or environment variables to customize the filter configuration, but this approach has the following disadvantages:

1. The configuration is complex, so the configuration that needs to be customized needs to be written to the startup parameter or read the environment variable, such as the following format:

--filter_serviceTopology=coredns/endpointslices#list,kube-proxy/services#list; watch --filter_endpointsFilter=nginx-ingress-controller/endpoints#list; watchCopy the code

2. Hot update cannot be performed. You need to restart Yurthub for each configuration modification to take effect.

Scheme 2:

1. Use configMap to customize filtering configurations to reduce configuration complexity configuration format (user-agent/resource#list,watch) Multiple resources are separated by commas (,). As follows:

filter_endpoints: coredns/endpoints#list; watch,test/endpoints#list; watch filter_servicetopology: coredns/endpointslices#list; watch filter_discardcloudservice: "" filter_masterservice: ""Copy the code

2. Use the Informer mechanism to ensure that the configuration takes effect in real time

Based on the above two points, we choose solution 2 in OpenYurt.

Problems encountered during development

The API address of the Informer Watch on the edge is the proxy address of Yurthub, so Yurthub cannot ensure that the ConfigMap data is normal until the proxy port is enabled. If addons requests are updated before ConfigMap data after startup is complete, data is returned to Addons without filtering, which can cause unexpected problems.

To solve this problem, we need to add WaitForCacheSync to apporve to ensure that the data synchronization is complete before returning the corresponding filtering data. Adding WaitForCacheSync to apporve will block configMap watch, so you need to add a whitelist before WaitForCacheSync. When Yurthub uses list & Watch to access configMap, we do not filter data directly. The corresponding code logic is as follows:

func (a *approver) Approve(comp, resource, verb string) bool { if a.isWhitelistReq(comp, resource, verb) { return false } if ok := cache.WaitForCacheSync(a.stopCh, a.configMapSynced); ! ok { panic("wait for configMap cache sync timeout") } a.Lock() defer a.Unlock() for _, requests := range a.nameToRequests { for _, request := range requests { if request.Equal(comp, resource, verb) { return true } } } return false }Copy the code

conclusion

1. It can be seen from the above expansion capabilities that YurtHub is not just a reverse proxy with data caching capability on edge nodes. Instead, it adds a new layer of encapsulation to Kubernetes node application lifecycle management, providing core management and control capabilities required by edge computing.

YurtHub is not only suitable for edge computing scenarios, but can be used as a standby component on the node side for any scenario using Kubernetes. I believe this will also drive YurtHub towards higher performance and stability.

Click here to learn about the OpenYurt project now!