This paper is divided into four parts:

(1). Cache package source code analysis and the use of Informer;

(2). Informers package source code parsing and the use of SharedInformerFactory, and the best practice of Informer in the actual use;

(3). Implementation of custom resource (CRD) Informer;

(4). The dynamic package source code parsing and DynamicSharedInformerFactory use;

This is part 2.

All code used in this article can be found in Articles /archive/ dive-into-Kubernetes-Informer at Main · wbsnail/articles · GitHub.

preface

In the previous part, I analyzed the main code of the cache package in kubernetes/ Client-Go repository, and summarized the working principle and implementation of Informer. In this part, I will focus on the analysis of the Informers package. And introduces the correct way to open SharedInformerFactory.

👮♂️ If you haven’t seen the previous section, I suggest you finish it first.

Informers package source code parsing

The factory pattern

As we learned in the previous section, the cache package exposes the following five Informer creation methods:

  • New
  • NewInformer
  • NewIndexerInformer
  • NewSharedInformer
  • NewSharedIndexInformer

They have different degrees of abstraction and encapsulation. NewSharedIndexInformer is the least abstract and the most encapsulated, but even NewSharedIndexInformer does not encapsulate specific resource types. Need to receive ListerWatcher and Indexers as parameters:

func NewSharedIndexInformer( lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers, ) SharedIndexInformer {}
Copy the code

To build Informer, we need to build ListWatcher and Indexers. How can you design for better encapsulation and reuse of code for multiple resource types? The Informers package uses the factory pattern: Create an Informer factory class for each built-in resource type, and build directly using the factory class when you want to use certain resource Informer.

package main

import (
	"fmt"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/tools/cache"
)

func main(a) {
	fmt.Println("----- 9-shared-informer-factory -----")

	// mustClientset is used to create an instance of kubernetes.interface,
	// The code is in the previous section;
	// The second parameter is defaultResync, which is the default resyncPeriod for building new Informer,
	// resyncPeriod was introduced in the previous section;
	informerFactory := informers.NewSharedInformerFactoryWithOptions(
		mustClientset(), 0, informers.WithNamespace("tmp"))
	configMapsInformer := informerFactory.Core().V1().ConfigMaps().Informer()
	configMapsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			configMap, ok := obj.(*corev1.ConfigMap)
			if! ok {return
			}
			fmt.Printf("created: %s\n", configMap.Name)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			configMap, ok := newObj.(*corev1.ConfigMap)
			if! ok {return
			}
			fmt.Printf("updated: %s\n", configMap.Name)
		},
		DeleteFunc: func(obj interface{}) {
			configMap, ok := obj.(*corev1.ConfigMap)
			if! ok {return
			}
			fmt.Printf("deleted: %s\n", configMap.Name)
		},
	})

	stopCh := make(chan struct{})
	defer close(stopCh)

	fmt.Println("Start syncing....")

	go informerFactory.Start(stopCh)

	<-stopCh
}
Copy the code

The output is similar to:

----- 9-shared-informer-factory -----
Start syncing....
created: demo1
created: demo
deleted: demo
created: demo
Copy the code

All code used in this article can be found in Articles /archive/ dive-into-Kubernetes-Informer at Main · wbsnail/articles · GitHub.

The structure of the code above should be very clear and does not require much explanation. Remarkably NewSharedInformerFactoryWithOptions USES the functional options (functional options) mode:

// SharedInformerOption defines the functional option type for SharedInformerFactory.
type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory

func NewSharedInformerFactoryWithOptions( client kubernetes.Interface, defaultResync time.Duration, options ... SharedInformerOption, ) SharedInformerFactory {}
Copy the code

The received options are executed in turn, adjusting the parameters of the SharedInformerFactory instance in this way.

If you don’t need to adjust, you can use the default method:

func NewSharedInformerFactory( client kubernetes.Interface, defaultResync time.Duration, ) SharedInformerFactory {}
Copy the code

👨🔧 The default namespace is all namespaces.

Using SharedInformerFactory, we can easily build Informer instances of various resources, such as:

configMapsInformer := informerFactory.Core().V1().ConfigMaps().Informer()
Copy the code

It is important to note that Informer built for each resource type are cached, and repeated calls to Informer() for the same resource return the same instance of Informer.

In addition to building Informer, SharedInformerFactory also supports building Lister, for example:

configMapLister := informerFactory.Core().V1().ConfigMaps().Lister()
Copy the code

The Lister returned is specific to the resource type and does not need to be converted. It is much easier to use than Indexer, such as ConfigMapLister:

// ConfigMapLister helps list ConfigMaps.
// All objects returned here must be treated as read-only.
type ConfigMapLister interface {
	// List lists all ConfigMaps in the indexer.
	// Objects returned here must be treated as read-only.
	List(selector labels.Selector) (ret []*v1.ConfigMap, err error)
	// ConfigMaps returns an object that can list and get ConfigMaps.
	ConfigMaps(namespace string) ConfigMapNamespaceLister
}

// ConfigMapNamespaceLister helps list and get ConfigMaps.
// All objects returned here must be treated as read-only.
type ConfigMapNamespaceLister interface {
	// List lists all ConfigMaps in the indexer for a given namespace.
	// Objects returned here must be treated as read-only.
	List(selector labels.Selector) (ret []*v1.ConfigMap, err error)
	// Get retrieves the ConfigMap from the indexer for a given namespace and name.
	// Objects returned here must be treated as read-only.
	Get(name string) (*v1.ConfigMap, error)
}
Copy the code

In addition to all the methods for a specific resource type, SharedInformerFactory exposes the following four methods:

	InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
	ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
	Start(stopCh <-chan struct{})
	WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Copy the code

InformerFor is used to get the Informer of the specified resource type. Attention! Informer built with this method will also be cached, and the next call to the same resource type will not call newFunc, thus affecting Informer obtained using the default method. In most cases, only within the Informers package, again using ConfigMap as an example:

func (f *configMapInformer) Informer(a) cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.ConfigMap{}, f.defaultInformer)
}
Copy the code

ForResource used according to the schema. GroupVersionResource dynamically build Informer, but it returns Informer and Lister is common type, use up is not so convenient specific types; And from the implementation point of view, it is a switch statement, which actually generates Informer and Lister of specific resource type according to the type. There is no support for third party CRD. It is estimated that it was intended to achieve the function of Dynamic package, but the completion degree is not high. The significance of existence is temporarily unknown 🕵️♂️

That leaves Start and WaitForCacheSync, which, as you might guess, Start all Informers together and wait for all Informers to synchronize for the first time.

Informer best practices

The SharedInformerFactory demo code above works, but there are a few problems:

(1). Event handling is a synchronous operation and may cause blocking (as seen in the previous section on the source code analysis of cache packages, Informer calls handler after receiving an event is a synchronous operation);

(2) there is no retry mechanism for handling events;

(3) Exit is not elegant enough, which may cause abnormal state;

(4). ResyncPeriod is set to 0, which may accumulate inconsistency with the server during operation;

For (1), (2) and (3), it can be solved by introducing workqueues. For (4), kube-Controller-manager (–min-resync-period) uses a resynchronization interval of 12h-24h (random within this range to avoid the pressure surge caused by simultaneous synchronization of all resource types), which can be considered a reasonable value.

The workqueue implementation is built into client-go, which is widely used in kube-controller-manager, and we use it directly to implement the workqueue:

After considering the above situation and determining the implementation scheme, I designed a relatively perfect controller: SlothController 🙉 :

Sloths report updates to the log output when they receive them, but they must sleep before each task, and there is a chance they will oversleep and fail. If they oversleep too many times and fail a task, sloths will skip the task. You can’t INTERRUPT a sloth’s sleep. After receiving an INTERRUPT signal, the sloth must wait until the task at hand has been processed before quitting (even if the result is failure).

It is recommended to start with the main function:

package main

import (
	"flag"
	"fmt"
	"github.com/spongeprojects/magicconch"
	"k8s.io/apimachinery/pkg/util/rand"
	"k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/informers"
	"k8s.io/client-go/kubernetes"
	listerscorev1 "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog/v2"
	"os"
	"os/signal"
	"sync"
	"time"
)

// SlothController SlothController!
type SlothController struct {
	factory           informers.SharedInformerFactory
	configMapLister   listerscorev1.ConfigMapLister
	configMapInformer cache.SharedIndexInformer
	queue             workqueue.RateLimitingInterface
	processingItems   *sync.WaitGroup

	// maxRetries how many times the sloths need to try before they give up
	maxRetries int
	// What is the chanceOfFailure of sloths 'task failure (%)
	chanceOfFailure int
	// Nap How long does it take a sloth to sleep
	nap time.Duration
}

func NewController(
	factory informers.SharedInformerFactory,
	queue workqueue.RateLimitingInterface,
	chanceOfFailure int,
	nap time.Duration,
) *SlothController {
	configMapLister := factory.Core().V1().ConfigMaps().Lister()
	configMapInformer := factory.Core().V1().ConfigMaps().Informer()
	configMapInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				// Simply put the key into the queue
				klog.Infof("[%s] received", key)
				queue.Add(key)
			}
		},
	})

	return &SlothController{
		factory:           factory,
		configMapLister:   configMapLister,
		configMapInformer: configMapInformer,
		queue:             queue,
		maxRetries:        3,
		chanceOfFailure:   chanceOfFailure,
		nap:               nap,
		processingItems:   &sync.WaitGroup{},
	}
}

// Run starts running the controller until an error occurs or stopCh is closed
func (c *SlothController) Run(sloths int, stopCh chan struct{}) error {
	// Runtime. HandleCrash is a panic Recover method from Kubernetes.
	// Provide a unified portal for Panic Recover,
	// By default, only logs are recorded.
	defer runtime.HandleCrash()
	// Close the queue before exiting to stop the sloths from taking on new tasks
	defer c.queue.ShutDown()

	klog.Info("SlothController starting...")

	go c.factory.Start(stopCh)

	// Wait for the first synchronization to complete
	for _, ok := range c.factory.WaitForCacheSync(stopCh) {
		if! ok {return fmt.Errorf("timed out waiting for caches to sync")}}for i := 0; i < sloths; i++ {
		go wait.Until(c.runWorker, time.Second, stopCh)
	}

	// Wait for stopCh to close
	<-stopCh

	// Wait for tasks in progress to complete
	klog.Info("waiting for processing items to finish...")
	c.processingItems.Wait()

	return nil
}

func (c *SlothController) runWorker(a) {
	for c.processNextItem() {
	}
}

ProcessNextItem is used to wait and process new tasks in the queue
func (c *SlothController) processNextItem(a) bool {
	// Block, waiting for a new task...
	key, shutdown := c.queue.Get()
	if shutdown {
		return false // The queue has entered the exit state, do not continue processing
	}

	c.processingItems.Add(1)

	// When the task is complete, mark completion regardless of success, because even though there are multiple sloths,
	// Multiple tasks with the same key are not processed in parallel.
	// If there are multiple events for the same key, do not block processing.
	defer c.queue.Done(key)

	result := c.processItem(key)
	c.handleErr(key, result)

	c.processingItems.Done()

	return true
}

// processItem is used to process a task synchronously
func (c *SlothController) processItem(key interface{}) error {
	// Process tasks slowly because sloths are lazy
	time.Sleep(c.nap)

	if rand.Intn(100) < c.chanceOfFailure {
		// Overslept!
		return fmt.Errorf("zzz... ")
	}

	klog.Infof("[%s] processed.", key)
	return nil
}

// handleErr is used to check the result of task processing and retry if necessary
func (c *SlothController) handleErr(key interface{}, result error) {
	if result == nil {
		// Clears retry records after each successful execution.
		c.queue.Forget(key)
		return
	}

	if c.queue.NumRequeues(key) < c.maxRetries {
		klog.Warningf("error processing %s: %v", key, result)
		// The execution fails. Try again
		c.queue.AddRateLimited(key)
		return
	}

	// Errors are recorded in the log because of too many retries. Also, do not forget to clear the retry record
	c.queue.Forget(key)
	// Runtime. HandleError is an error response method provided by Kubernetes.
	// Provide a unified entry to the error response.
	runtime.HandleError(fmt.Errorf("max retries exceeded, "+
		"dropping item %s out of the queue: %v", key, result))
}

func main(a) {
	fmt.Println("----- 10-sloth-controller -----")

	var sloths int
	var chanceOfFailure int
	var napInSecond int
	flag.IntVar(&sloths, "sloths".1."number of sloths")
	flag.IntVar(&chanceOfFailure, "chance-of-failure".0."chance of failure in percentage")
	flag.IntVar(&napInSecond, "nap".0."how long should the sloth nap (in seconds)")
	flag.Parse()

	kubeconfig := os.Getenv("KUBECONFIG")
	config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
	magicconch.Must(err)

	clientset, err := kubernetes.NewForConfig(config)
	magicconch.Must(err)

	/ / create SharedInformerFactory
	defaultResyncPeriod := time.Hour * 12
	informerFactory := informers.NewSharedInformerFactoryWithOptions(
		clientset, defaultResyncPeriod, informers.WithNamespace("tmp"))

	// Create the RateLimitingQueue with the default configuration. This queue supports retries and records the number of retries
	rateLimiter := workqueue.DefaultControllerRateLimiter()
	queue := workqueue.NewRateLimitingQueue(rateLimiter)

	controller := NewController(informerFactory, queue, chanceOfFailure,
		time.Duration(napInSecond)*time.Second)

	stopCh := make(chan struct{})

	// Respond to interrupt signal (Ctrl+C)
	interrupted := make(chan os.Signal)
	signal.Notify(interrupted, os.Interrupt)

	go func(a) {
		<-interrupted // Turn off stopCh when the first interrupt signal is received
		close(stopCh)
		<-interrupted // Exit immediately after receiving the second interrupt signal
		os.Exit(1)
	}()

	iferr := controller.Run(sloths, stopCh); err ! =nil {
		klog.Errorf("SlothController exit with error: %s", err)
	} else {
		klog.Info("SlothController exit")}}Copy the code

Here is a demonstration of how the sloth controller works. First, verify that one sloth can work properly without sleeping:

❯ sloth-controller --sloths=1 --chance of failure=0 --nap=0 I0508 00:45:41.142394 49248 main.go:79] SlothController starting... I0508 00:45:46.813814 49248 main.go:52] [TMP /demo] received I0508 00:45:46.813852 49248 main.go:141] [TMP /demo1] received I0508 00:45:47.333674 49248 main.go: 50] [tmp/demo1] processed.Copy the code

You can see that the task was handled very quickly without sleep, in microseconds.

Then add some sleepiness genes to the sloths (–nap=5) :

❯ sloth-controller --sloths=1 --chance of failure=0 --nap=5 I0508 00:49:49.672573 49494 main.go:79] SlothController starting... I0508 00:50:02.637114 49494 main.go:52] [TMP /demo] Received I0508 00:50:03.123127 49494 main.go:52] [TMP /demo1] Received I0508 00:50:07.637208 49494 main.go:141] [/ TMP /demo] processed.Copy the code

Notice that the processing time is longer and the task is queued, taking a total of more than 10 seconds because there is only one sloth.

Add more sloths (–sloths=3) :

❯ sloth-controller --sloths=3 --chance of failure=0 --nap=5 I0508 00:51:18.519972 49662 main.go:79] SlothController starting... I0508 00:51:22.299195 49662 main.go:52] [TMP /demo] Received I0508 00:51:22.827831 49662 main.go:52] [TMP /demo1] Received I0508 00:51:27.302323 49662 main.go:141] [/ TMP /demo] processed.Copy the code

Now the efficiency of task processing becomes higher, and the function of parallel task processing is realized.

Let’s try interrupting the sloth controller:

❯ sloth-controller --sloths=3 --chance of failure=0 --nap=5 I0508 00:52:39.233788 49771 main.go:79] SlothController starting... I0508 00:52:47.551730 49771 main.go:52] [TMP /demo] Received I0508 00:52:48.184259 49771 main.go:52] [TMP /demo1] Received I0508 00:52:48.184259 49771 main.go:52 ^C I0508 00:52:48.817727 49771 main. Go :98] waitingforprocessing items to finish... I0508 00:52:52.556681 49771 main.go:141] [/ TMP /demo] Processed. I0508 00:52:53.188194 49771 mainexit
Copy the code

Notice that after issuing the interrupt instruction, the program waits for both ongoing tasks to complete before exiting, achieving a graceful exit function.

The sloths were then given some more potent sleepiness genes that made them more likely to be unable to handle tasks (–chance-of-failure=50) :

❯ sloth-controller --sloths=3 --chance of failure=50 --nap=5 I0508 00:58:25.991831 50040 main.go:79] SlothController starting... I0508 00:58:29.627866 50040 main.go:52] [TMP /demo] Received W0508 00:58:34.630694 50040 main.go:154] Error processing tmp/demo: zzz... I0508 00:58:39.637953 50040 main.go:141Copy the code

As expected, a failed task occurs, and the failed task is retried and finally executed successfully, but in twice the time…

Finally, we added some extremely potent sleepiness genes (–chance-of-failure=99) 😰

❯ sloth-controller --sloths=3 --chance of failure=99 --nap=5 I0508 01:00:58.172928 50221 main.go:79] SlothController starting... I0508 01:01:12.565151 50221 main.go:52] [TMP /demo] Received W0508 01:01:17.565633 50221 main.go:154] Error processing tmp/demo: zzz... W0508 01:01:22.574243 50221 main.go:154] Error processing TMP /demo: ZZZ... W0508 01:01:27.588450 50221 main.go:154] Error processing TMP /demo: ZZZ... E0508 01:01:32.613327 50221 main.go:164] Max retries exceeded, dropping item TMP /demo out of the queue: ZZZ...Copy the code

However, after three failed attempts, the task is abandoned and retry is not performed. However, the controller is not affected.

As you can see from the above demonstration, the sloth controller does everything mentioned above and is a very reliable controller (assuming the sloth doesn’t sleep, of course).

👩⚕️ No animals were harmed during the experiment.

conclusion

In the above, we analyzed the source code for the Informers package and demonstrated the best practices for Informer in real life.

In the following chapters, I will continue to parse the source code of Informer, and introduce the implementation of custom resource (CRD) Informer and the content related to dynamic Informer.

Next part: Implementing custom resource (CRD) Informer.


[2/4]: Informers package source code analysis and best practices in the practical use of Informer

The resources

Bitnami Engineering: A deep dive into Kubernetes controllers

Bitnami Engineering: Kubewatch, an example of Kubernetes custom controller

Dynamic Kubernetes Informers | FireHydrant

Go at master · Kubernetes /client-go · GitHub

GitHub – kubernetes/sample-controller: Repository for sample controller. Complements sample-apiserver

Kubernetes Deep Dive: Code Generation for CustomResources

How to generate client codes for Kubernetes Custom Resource Definitions (CRD) | by Roger Liang | ITNEXT