Focus on the target

How did Informer handle resource changes after discovering them

directory

  1. Look at the process of spending
  2. Master Index data structures
  3. Information is distributed
  4. Integrated thinking by Informer

Process

func (c *controller) processLoop(a) {
	for {
    // Pop out the Object element
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		iferr ! =nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// re-queue
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}

// Go to the implementation of Pop
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		// Call process to process the item and return it
		item, ok := f.items[id]
		delete(f.items, id)
		err := process(item)
		return item, err
	}
}

// Then look up the PopProcessFunc definition before creating the Controller
cfg := &Config{
		Process:           s.HandleDeltas,
	}

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()

	for _, d := range obj.(Deltas) {
		switch d.Type {
    // add, change, replace, synchronize
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
      // Go to indexer first
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
        // If the data already exists, execute the Update logic
				iferr := s.indexer.Update(d.Object); err ! =nil {
					return err
				}

				isSync := false
				switch {
				case d.Type == Sync:
					isSync = true
				case d.Type == Replaced:
					if accessor, err := meta.Accessor(d.Object); err == nil {
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
      	// Distribute the Update event
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
      	// Execute Add if no data is found
				iferr := s.indexer.Add(d.Object); err ! =nil {
					return err
				}
      	// Distribute the Add event
				s.processor.distribute(addNotification{newObj: d.Object}, false)}/ / delete
		case Deleted:
    	// delete indexer
			iferr := s.indexer.Delete(d.Object); err ! =nil {
				return err
			}
    	// Distribute the DELETE event
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil
}
Copy the code

Index

Index is defined as the local storage of the resource, which is consistent with the resource information in etCD.

// How to create Index
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
    // Initialization of indexer
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      exampleObject,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}

// Create a map and func Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
}

ThreadSafeStore is a concurrency safe map
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
	return &threadSafeMap{
		items:    map[string]interface{}{},
		indexers: indexers,
		indices:  indices,
	}
}
Copy the code

distribute

// In the Process code above, we see that after storing data into Indexer, a distribution function is called
s.processor.distribute()

// Create the distribution process
func NewSharedIndexInformer(a) SharedIndexInformer {
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
	}
	return sharedIndexInformer
}

// The sharedProcessor structure
type sharedProcessor struct {
	listenersStarted bool
 	/ / read/write locks
	listenersLock    sync.RWMutex
  // Common listener list
	listeners        []*processorListener
  // Synchronize the listener list
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}

Distribute function
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	// Distribute object to a list of synchronous listeners or normal listeners
	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}

// The add operation uses a channel
func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}
Copy the code

Summary

  1. InformerDepends on theReflectorModule, which has a component for xxxInformer, such aspodInformer
  2. resource-specificInformerContains a connection tokube-apiservertheclientThrough theListandWatchInterface to query resource changes
  3. If a resource change is detected, passControllerPut the data into the queueDeltaFIFOQueueIn, the production phase is completed
  4. inDeltaFIFOQueueOn the other side, there are consumers constantly dealing with the events of resource changes, and the processing logic is mainly divided into two steps
    1. The data is saved to the local store Indexer, whose underlying implementation is a concurrency safe threadSafeMap
    2. Some components need to monitor resource changes in real time and listen to events in real time. Then, they send events to the corresponding registered listeners and process them by themselves

Making: github.com/Junedayday/…

Blog: junes.tech/

Public id: Golangcoding