background

The main function

All data add, delete, modify, search and WATCH in K8s are carried out through Apiserver. In order to avoid the access pressure to Etcd, a Cacher struct is abstracted in K8s. All etCD events are distributed and accessed through this object, which wraps the ETCD client as a storage shared among multiple stores

Function and dismantling

  • Store: Provides data operation interfaces, such as add, delete, modify, search and Watch
  • Watcher: Listen for changes in store data and do some processing

Note that watcher and Store are a general abstraction for data operations, such as Store and Watcher for etCD, Store and Watcher for caches, and Store and Watch for rest

Model to understand

  • Store: provides a unified interface to mask the underlying layerData is storedThe operation of the
  • Watcher: The watcher is used to get event changes from the store for some custom action processing
  • Client: If you want to develop new functions for Client, you just need to start a new Watcher to control the data changes you care about and complete custom logic processingData storage layerFramework (store)The business logicThe mulberry layer is decoupled

Detailed call diagram

Code implementation

Critical data structure

  • Cacher is responsible for interacting with the back-end EtCD Store and distributing events to all watcher
type Cacher struct {

	// Length performance indicator of the current incoming queue
	incomingHWM HighWateMark
	// Incoming event channel, which was sent to all Watchers
	incoming chan watchCacheEvent

	sync.RWMutex

	// The current cache ready state must be OK to be accessed
	ready *ready

	// Back-end storage data interface
	storage Store

	// Object type
	objectType reflect.Type

	watchCache watchCache
	reflector  *Reflector

	versioner Versioner

	triggerFunc TriggerPublisherFunc
	watcherIdx  int
	watchers    indexedWatchers

	dispatchTimeoutBudge *timeBudget

	stopLock sync.RWMutex
	stopped  bool
	stopCh   chan struct{}
	stopWg   sync.WaitGroup
}
Copy the code
  • CacherWatcher receives Cacher sent events and sends them to the REST WebSocket interface
// cacheWatcher implements the Watch interface
type cacheWatcher struct {
	sync.Mutex
	input     chan *watchCacheEvent
	result    chan Event
	done      chan struct{}
	filter    filterWithAttrsFunc
	stopped   bool
	forget     func(bool)
	versioner Versioner
}
Copy the code
  • The Reflector Watch backend transforms the data and sends the event to watchCache(the watch is passed to Reflector as a store).
/ / Reflector reflection
type Reflector struct {
	name string

	expectedType reflect.Type

	store Store

	listerWatcher ListerWatcher
}
Copy the code

Key method implementation

func NewCacherFromConfig(config Config) *Cacher {
    // First generate a watchCache
	watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc, config.Versioner)
	listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
	reflectorName := "storage/cacher.go:" + config.ResourcePrefix

	stopCh := make(chan struct{})
	cacher := &Cacher{
		ready:       newReady(),
		storage:     config.Storage,
		objectType:  reflect.TypeOf(config.Type),
		watchCache:  watchCache,
            // watchCache will be passed by the Party Store to the back end Reflector, which will convert the etCD data into an event after reflector gets the data
		reflector:   cache.NewNamedReflector(reflectorName, listerWatcher, config.Type, watchCache, 0),
		versioner:   config.Versioner,

	}
        // watchCache sets the SetOnEvent method to cacher's processEvent. All watchCache events are handled by the watchCache method
	watchCache.SetOnEvent(cacher.processEvent)
	go cacher.dispatchEvents()

	cacher.stopWg.Add(1)
	go func(a) {
		defer cacher.stopWg.Done()
		wait.Until(
			func(a) {
				if! cacher.isStopped() { cacher.startCaching(stopCh) } }, time.Second, stopCh, ) }()return cacher
}
Copy the code
  • ProcessEvent stores events in its incoming queue
func (c *Cacher) processEvent(event *watchCacheEvent) {
	if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
		fmt.Println("cacher %v: %v objects queued in comming channel", c.objectType.String(), curLen)
	}
	c.incoming <- *event
}
Copy the code
  • Events are passed to the front end Watcher
func (c *Cacher) dispatchEvents(a) {
	for {
		select {
		case event, ok := <-c.incoming:
			if! ok {return
			}
			c.dispatchEvent(&event)
		case <-c.stopCh:
			return}}}func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
	triggerValues, supported := c.triggerValues(event)

	c.Lock()
	defer c.Unlock()

	for _, watcher := range c.watchers.allWatchers {
		watcher.add(event, d.dispatchTimeoutBudge)
	}
	if supported {
		for _, triggerValue := range triggerValues {
			for _, watcher := range c.watchers.valueWatchers[triggerValue] {
				watcher.add(event, d.dispatchTimeoutBudge)
			}
		}
	} else {
		for _, watchers := range c.watchers.valueWatchers {
			for _, watcher := range watchers {
				watcher.add(event, c.dispatchTimeoutBudge)
			}
		}
	}
}
Copy the code

The complete code

package cacher

import (
	"context"
	"fmt"
	"reflect"
	"sync"
	"sync/atomic"
	"time"

	"k8s.io/apimachinery/pkg/api/meta"
	"k8s.io/apimachinery/pkg/conversion"
)

/ / HighWateMark performance
type HighWateMark int64

// Update atomic Update
func (hwm *HighWateMark) Update(current int64) bool {
	for {
		old := atomic.LoadInt64((*int64)(hwm))
		if current <= old {
			return false
		}
		if atomic.CompareAndSwapInt64((*int64)(hwm), old, current) {
			return true}}}// ready
type ready struct {
	ok bool
	c  *sync.Cond
}

/ / newReady new
func newReady(a) *ready {
	return &ready{c: sync.NewCond(&sync.Mutex{})}
}

// wait
func (r *ready) wait(a) {
	r.c.L.Lock()
	for! r.ok { r.c.Wait() } r.c.L.Unlock() }// check returns the current status
func (r *ready) check(a) bool {
	r.c.L.Lock()
	defer r.c.L.Unlock()
	return r.ok
}

// set changes the state
func (r *ready) set(ok bool) {
	r.c.L.Lock()
	defer r.c.L.Unlock()
	r.ok = ok
	r.c.Broadcast()
}

// TypeMeta API request metadata
type TypeMeta struct {
	Kind       string `json:"kind,omitempty" protobuf:"bytes,1,opt,name=kind"`
	APIVersion string `json:"apiVersion,omitempty" protobuf:"bytes,2,opt,name=apiVersion"`
}

// ListOption Specifies the parameters of the request
type ListOption struct {
	TypeMeta `json: ",inline"`

	LabelSelector string
	FieldSelector string
	// Whether to contain the initialized resource
	IncludeUninitialized bool
	// Use websocket to feedback resource Add, update, and remove event notifications
	Watch           bool
	ResourceVersion string
	TimeoutSecond   *int64
	Limit           int64
}

// ListerWatcher abstract interface
type ListerWatcher interface {
	List(option ListOption) (Object, error)
	Watch(option ListOption) (Interface, error)
}

/ / Reflector reflection
type Reflector struct {
	name string

	expectedType reflect.Type

	store Store

	listerWatcher ListerWatcher
}

// ListAndWatch gets the latest version and watch data changes
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	return nil
}

/ / MatchValue criticism
type MatchValue struct {
	IndexName string
	Value     string
}

// TriggerPublisherFunc gets the matching data
type TriggerPublisherFunc func(obj Object) []MatchValue

//
type filterWithAttrsFunc func(key string, l Set, f Set, uninitializer bool) bool

// cacheWatcherTo achieve thewatchinterfacetype cacheWatcher struct {
	sync.Mutex
	input     chan *watchCacheEvent
	result    chan Event
	done      chan struct{}
	filter    filterWithAttrsFunc
	stopped   bool
	forget     func(bool)
	versioner Versioner
}

func (c *cacheWatcher) Stop(a) {
	c.forget(true)
	c.stop()
}

func (c *cacheWatcher) stop(a) {
	c.Lock()
	defer c.Unlock()
	if c.stopped {
		c.stopped = true
		close(c.done)
		close(c.input)
	}
}

func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter filterWithAttrsFunc, forget func(bool).versioner Versioner) *cacheWatcher {
	watcher := &cacheWatcher{
		input: 		make(chan *watchCacheEvent, chanSize),
		result: 	make(chan Event, chanSize),
		done: 		make(chan struct{}),
		filter: 	filter,
		stopped: 	false,
		forget: 	forget
	}
	go watcher.process(initEvents, resourceVersion)
	return watcher
}

type watchersMap map[int]*cacheWatcher

func (wm watchersMap) terminateAll(a) {
	for key, watcher := range wm {
		delete(wm, key)
		watcher.Stop()
	}
}

type indexedWatchers struct {
	allWatchers   watchersMap
	valueWatchers map[string]watchersMap
}

func (i *indexedWatchers) terminateAll(objectType reflect.Type) {
	if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
		fmt.Println("Terminating all watchers from cacher %v", objectType)
	}
	i.allWatchers.terminateAll()
	for index, watchers := range i.valueWatchers {
		watchers.terminateAll()
		delete(i.valueWatchers, index)
	}
}

func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool) {
	if supported {
		i.valueWatchers[value].deleteWatcher(number)
		if len(i.valueWatchers[value]) == 0 {
			delete(i.valueWatchers, value)
		}
	} else {
		i.allWatchers.deleteWatcher(number)
	}
}

func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
	if supported {
		if_, ok := i.valueWatchers[value]; ! { i.valueWatchers[value] = watchersMap{} } i.valueWatchers[value].addWatcher(w, number) }else {
		i.allWatchers.addWatcher(w, number)
	}
}

type timeBudget struct {
	sync.Mutex
	budget time.Duration

	refresh   time.Duration
	maxBudget time.Duration
}

type Labels interface {
	Hash(label string) (exists bool)
	Get(label string) (value string)}type Selector interface {
	Matchs(Labels) bool
	Empty() bool
	String() string
	RequiresExactMatch(field string) (value string, found bool)
	DeepCopySelector() Selector
}

AttrFunc gets the Label and Field collection of the object
type AttrFunc func(obj Object) (Set, Set, bool, error)

// SelectPredicateThe representation of an objecttype SelectionPredicate struct {
	Label                Selector
	Fielld               Selector
	IncludeUninitialized bool
	GetAttrs             AttrFunc
	InedxFields          []string
	Limit                int64
	Continue             string
}

func (s *SelectionPredicate) MatcherIndex(a) []MatchValue {
	var result []MatchValue
	for , field := range s.InedxFields {
		if value, ok := s.Fielld.RequiresExactMatch(field); ok {
			result = append(result, MatchValue{IndexName: field, Value: value})
		}
	}
	return result
}

type Feature string

type FeatureGate interface {
	Enabled(key Feature) bool
}

type UID string

type Preconditions struct {
	UID *UID
}

type StatusError struct {
	ErrStatus metav1.Status
}

type errWatcher struct {
	result chan Event
}

func newErrWatcher(err error) *errWatcher {
	errEvent := Event{Type: Error}
	switch err := err.(type) {
	case Object:
		errEvent.Object = err
	case StatusError:
		errEvent.Object = &err.ErrStatus
	default:
		errEvent.Object = &metav1.Status{
			Status:  metav1.StatusFailure,
			Message: err.Error(),
			Reason:  metav1.StatusReasonInternalError,
			Code:    http.StatusInternalServerError,
		}
	}
}

type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)


// CacherThe cachetype Cacher struct {

	// Length performance indicator of the current incoming queue
	incomingHWM HighWateMark
	// Incoming event channel, which was sent to all Watchers
	incoming chan watchCacheEvent

	sync.RWMutex

	// The current cache ready state must be OK to be accessed
	ready *ready

	// Back-end storage data interface
	storage Store

	// Object type
	objectType reflect.Type

	watchCache watchCache
	reflector  *Reflector

	versioner Versioner

	triggerFunc TriggerPublisherFunc
	watcherIdx  int
	watchers    indexedWatchers

	dispatchTimeoutBudge *timeBudget

	stopLock sync.RWMutex
	stopped  bool
	stopCh   chan struct{}
	stopWg   sync.WaitGroup
}

func (c *Cacher) Versioner(a) Versioner {
	return c.storage.Versioner()
}

func (c *Cacher) Create(ctx context.Context, ket string, out Object preconditions *Preconditions) {
	c.storage.Create(ctx, key, out, preconditions)
}

func (c *Cacher) Delete(ctx context.Context, key string, out Object, preconditions *Preconditions) error {
	c.storage.Delete(ctx, key, out, preconditions)
}

func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (Interface, error) {
	watchRV,, err := c.versioner.ParseResourceVersion(resourceVersion)
	iferr ! =nil {
		return nil, err
	}
	
	c.ready.wait()

	c.watchCache.RLock()
	defer c.watchCache.RUnlock()
	initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
	iferr ! =nil {
		return newErrWatcher(err), nil
	}

	triggerValue, triggerSupported := "".false
	if matchValues := pred.MatchIndex(); len(matchValues) > 0 {
		triggerValue, triggerSupported = matchValues[0].Value, true
	}

	chanSize := 10
	ifc.triggerFunc ! =nil && !triggerSupported {
		chanSize = 100
	}

	c.Lock()
	defer c.Unlock()
	forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
	watcher := newCacheWatcher(watchRv, chanSize, initEvents, filterWithAttrsFunc(key, pred), forget, c.versioner)

	c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
	c.watcherIdx++
	return watcher, nil
}

func (c *Cacher) WatchList(ctx context.Context, key, string, resourceVersion string, pred SelectionPredicate) (Interface, error) {
	return c.Watch(ctx, key, resourceVersion, pred)
}

func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr Object, ignoreNotFound bool) error {
	if resourceVersion == "" {
		return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
	}

	getRv, err := c.versioner.ParseResourceVersion(resourceVersion)
	iferr ! =nil {
		return nil, err
	}

	if getRv == 0 && !c.ready.check() {
		return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
	}

	c.ready.wait()

	objValue, err := conversion.EnforcePtr(objPtr)
	iferr ! =nil {
		return nil, err
	}

	obj, exists, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRv, key, nil)
	iferr ! =nil {
		return nil, err
	}

	if exists {
		elem, ok := obj.(*storeElement)
		if! ok {return fmt.Errorf("non *storeElement returned form storage : %v", obj)
		}
		objValue.Set(reflect.ValueOf(elem.Object).Elem())
	} else {
		objValue.Set(reflect.Zero(objValue.Type()))
		if! ignoreNotFound {return fmt.Errorf("key: %v resourversion: %v", objValue, getRv)
		}
	}
	return nil
}

func (c *Cacher) List(ctx context.Context, ket string, resourceVersion string, pred SelectionPredicate, listObj Object) error {
	if resourceVersion == "" {
		// In fact, there is information about the current win rate
		return c.storage.list(ctx, key, resourceVersion, pred, listObj)
	}

	listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
	iferr ! =nil {
		return err
	}

	if listRV ==0 && !c.ready.check() {
		return c.storage.List(ctx, key, resourceVersion, pred, listObj)
	} 

	c.ready.wait()

	listPtr, err := conversion.EnforcePtr(listPtr)
	iferr ! =nil {
		return err
	}
	listVal, err := conversion.EnforcePtr(listPtr)
	iferr ! =nil|| listVal.Kind() ! = reflect.Slice {return fmt.Errorf("need a pointer to slice got %v", listVal.Kind())
	}

	filter := filterWithAttrsFunc(key, pred)

	objs, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV)
	iferr ! =nil {
		return err
	}

	if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Fielld.Empty() {
		// If an object is found to exceed the size of the slice, a new one is generated
		listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0.len(objs)))
	}
	for _, obj := range objs {
		elem, ok := obj.(*storeElement)
		if! ok {return fmt.Errorf("non *storeElement returned from storage: %v", obj)
		}
		if filter(elem.Key, elem.Fields, elem.Labels, elem.Uninitialized) {
			// Reflection needs to be learned later
			listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
		}
	}
	ifc.versioner ! =nil {
		if err := c.versioner.UpdateList(listObj, resourceVersion, ""); err ! =nil {
			return err
		}
	}
	return nil
}

func (c *Cacher) GuaranteedUpdate(
	ctx context.Context, key string, ptrToType Object, ignoreNotFound bool, preconditions * Preconditions, tryUpdate UpdateFunc, _... Object) error {
		ifelem, exists, err := c.watchCache.GetByKey(key); err ! =nil {
			fmt.Printf("GetByKey returned error: %v", err)
		} else if exists {
			currObj := elem.(*storeElement).Object.DeepCopyObject()
			return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
		}
		return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
}

func (c *Cacher) Count(pathPrefix string) (int64, error) {
	return c.storage.Count(pathPrefix)
}

func (c *Cacher) triggerValues(event *watchCacheEvent))([]string.bool) {
	if c.triggerFunc == nil {
		return nil.false
	}

	result := make([]string.2)
	matchValues := c.triggerFunc(event.Object)
	if len(matchValues) > 0 {
		result = append(result, matchValues[0].Value)
	}
	if event.PrevObject == nil {
		return result, len(result) > 0
	}
	prevMatchValues := c.triggerFunc(event.PrevObject)
	if len(prevMatchValues) > 0 {
		if len(result) == 0 || result[0] != prevMatchValues[0].Value {
			result = append(result, prevMatchValues[0].Value)
		}
	}
	return result, len(result) > 0
}

func (c *Cacher) processEvent(event *watchCacheEvent) {
	if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
		fmt.Println("cacher %v: %v objects queued in comming channel", c.objectType.String(), curLen)
	}
	c.incoming <- *event
}

func (c *Cacher) dispatchEvents(a) {
	for {
		select {
		case event, ok := <-c.incoming:
			if! ok {return
			}
			c.dispatchEvent(&event)
		case <-c.stopCh:
			return}}}func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
	triggerValues, supported := c.triggerValues(event)

	c.Lock()
	defer c.Unlock()

	for _, watcher := range c.watchers.allWatchers {
		watcher.add(event, d.dispatchTimeoutBudge)
	}
	if supported {
		for _, triggerValue := range triggerValues {
			for _, watcher := range c.watchers.valueWatchers[triggerValue] {
				watcher.add(event, d.dispatchTimeoutBudge)
			}
		}
	} else {
		for _, watchers := range c.watchers.valueWatchers {
			for _, watcher := range watchers {
				watcher.add(event, c.dispatchTimeoutBudge)
			}
		}
	}
}


func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj Object) error {

	if resourceVersion == ""| | -len(pred.Continue) > 0 || pred.Limit > 0) {
		// If resourceVersion is empty, obtain the corresponding data directly from the storage
		return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
	}

	// Call versioner to parse the list version
	listRv, err := c.versioner.ParseResourceVersion(resourceVersion)
	iferr ! =nil {
		return err
	}

	// listRv is 0 and the cache has not been updated
	if listRv == 0 && !c.ready.check() {
		return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
	}

	c.ready.wait()

	listPtr, err := meta.GetItemsPtr(listObj)
	iferr ! =nil {
		return err
	}
	listVal, err := conversion.EnforcePtr(listObj)
	iferr ! =nil|| listVal.Kind() ! = reflect.Slice {return fmt.Errorf("need a prointer to slice got %v", listVal.Kind())
	}
	filter := filterWithAttrsFunc(key, pred)

	// Get the corresponding objs from the back end based on the resource version converted above
	obj, exists, resourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRv, key)
	iferr ! =nil {
		return err
	}

	if exits {
		elem, ok := obj.(*storeElement)
		if! ok {return fmt.Errorf("non *storeElement returned from storage: %v", obj)
		}
		if filter(elem.Key, elem.Labels, elem.Fields, elem.Uninitialized) {
			listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())
		}
		ifc.versioner ! =nil {
			if err := c.versioner.UpdateList(listObj, resourceVersion, ""); err ! =nil {
				return err
			}
		}
	}
	return nil
}


func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
	successfulList := false
	c.watchCache.SetOnReplace(func(a) {
		successfulList = true
		c.ready.set(true)})defer func(a) {
		if successfulList {
			c.ready.set(false)
		}
	}()
	c.terminateAllWatchers()
	iferr := c.reflector.ListAndWatch(stopChannel); err ! =nil {
		fmt.Errorf("unexpected listAndWatch error: %v", err)
	}
}

func (c *Cacher) terminateAllWatchers(a) {
	c.Lock()
	defer c.Unlock()
	c.watchers.terminateAll(c.objectType)
}

func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func(bool) {
	return func(lock bool) {
		if lock {
			c.Lock()
			defer c.Unlock()
		} else {
			fmt.Errorf("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
		}
		c.watchers.deleteWatcher(index, triggerValue, triggerSupported)
	}
}
Copy the code

feeling

In fact, the core of Storage is the upper layer implementation of data change notification, Watcher is used to pay attention to data change transfer response. Next, I might pause and look at the apiserver side, and look at the Controller and the client-go volume part, but I’ll do that, Good Night