A large number of scheduling tasks exist in a system, and scheduling tasks have time lag, and if a large number of scheduling tasks use their own scheduler to manage the life cycle of the task, it will waste CPU resources and be very inefficient. This article introduces latency operations in Go-Zero, which may allow developers to schedule multiple tasks by focusing on specific business execution functions and execution times “now or later.” For delay operations, there are usually two options:

  1. Timer: Timer maintains a priority queue, executes at a point in time, and then stores the tasks to be executed in a map
  2. collectionIn thetimingWheel, maintains an array of task groups, and each slot maintains a two-way linked list of tasks. When execution begins, the timer executes the tasks in the slot at specified intervals.

Solution 2 reduces maintenance tasks from priority queue O(nlog(n)) to bidirectional linked list O(1), and executing tasks only requires polling tasks O(n) at a point in time, without placing and removing elements O(nlog(n)) like priority queue.

Let’s take a look at go-Zero’s own use of timingWheel:

The cache timingWheel

First, we will use timingWheel in the cache of collection:

timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
  key, ok := k.(string)
  if! ok {return
  }
  cache.Del(key)
})
iferr ! =nil {
  return nil, err
}

cache.timingWheel = timingWheel
Copy the code

During cache initialization, timingWheel is also initialized for key expiration processing, and the parameters are represented in sequence:

  • interval: Time division scale
  • numSlots: time slot
  • execute: executes the function at a point in time

In the cache function, the expiration key is deleted, and the expiration is controlled by the timingWheel.

Next, passcachetimingWheelUse to recognize.

Initialize the

// Do the actual initialization
func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) ( *TimingWheel, error) {
	tw := &TimingWheel{
		interval:      interval,                     // Single grid time interval
		ticker:        ticker,                       // Timer, do the time push, in interval as the unit of advance
		slots:         make([]*list.List, numSlots), / / time round
		timers:        NewSafeMap(),                 // Store task{key, value} map [parameters required for execute]
		tickedPos:     numSlots - 1.// at previous virtual circle
		execute:       execute,                      // Execute the function
		numSlots:      numSlots,                     // Initialize slots num
		setChannel:    make(chan timingEntry),       // The following channels are passed by task
		moveChannel:   make(chan baseEntry),
		removeChannel: make(chan interface{}),
		drainChannel:  make(chan func(key, value interface{})),
		stopChannel:   make(chan lang.PlaceholderType),
	}
	// Prepare all the lists stored in the slot
	tw.initSlots()
	// Start asynchronous coroutines and use channels for task communication and delivery
	go tw.run()

	return tw, nil
}
Copy the code

The above shows the “time wheel” of timingWheel intuitively, and the details of the progress will be explained later around this diagram.

Go tw.run() open a coroutine to do time push:

func (tw *TimingWheel) run(a) {
	for {
		select {
      // Timer to do time push -> scanAndRunTasks()
		case <-tw.ticker.Chan():
			tw.onTick()
      // Add task will enter task into setChannel
		casetask := <-tw.setChannel: tw.setTask(&task) ... }}}Copy the code

The execution of the timer starts when the timer is initialized and rotates within the internal time frame. Then the underlying layer keeps fetching tasks from the list in the slot and giving them to execute.

Task Operation

The cache is initialized, followed by the set cache key:

func (c *Cache) Set(key string, value interface{}) {
	c.lock.Lock()
	_, ok := c.data[key]
	c.data[key] = value
	c.lruCache.add(key)
	c.lock.Unlock()

	expiry := c.unstableExpiry.AroundDuration(c.expire)
	if ok {
		c.timingWheel.MoveTimer(key, expiry)
	} else {
		c.timingWheel.SetTimer(key, value, expiry)
	}
}
Copy the code
  1. Look at thedata mapDoes the key exist in
  2. If yes, updateexpire -> MoveTimer()
  3. The first time you set key ->SetTimer()

Therefore, it is clear how to use timingWheel. Developers can add or update the timingWheel as required.

SetTimer() MoveTimer() is a task operation that sends a task to a channel, and the coroutine opened in run() continues to fetch the channel.

The SetTimer () – > setTask () :

  • Not exist task:getPostion -> pushBack to list -> setPosition
  • Exist task:get from timers -> moveTask()

MoveTimer() -> moveTask()

From the above call chain, there is one function that will be called: moveTask()

func (tw *TimingWheel) moveTask(task baseEntry) {
	// Timers: Map => Obtain [positionEntry "pos, task"] from key
	val, ok := tw.timers.Get(task.key)
	if! ok {return
	}

	timer := val.(*positionEntry)
  	// {delay < interval} => The delay is smaller than a time interval. If there is no smaller scale, the task should be executed immediately
	if task.delay < tw.interval {
		threading.GoSafe(func(a) {
			tw.execute(timer.item.key, timer.item.value)
		})
		return
	}
	// If > interval, calculate the new pos, circle in the time round by delay
	pos, circle := tw.getPositionAndCircle(task.delay)
	if pos >= timer.pos {
		timer.item.circle = circle
    // Record the move offset before and after. Rejoin the team for the later process
		timer.item.diff = pos - timer.pos
	} else if circle > 0 {
		// Go to the next layer and convert circle to part of diff
		circle--
		timer.item.circle = circle
		// Since it is an array, add numSlots.
		timer.item.diff = tw.numSlots + pos - timer.pos
	} else {
		// If offset is advanced, the task is still at the first layer
		// Delete the old task and rejoin the queue, waiting to be executed
		timer.item.removed = true
		newItem := &timingEntry{
			baseEntry: task,
			value:     timer.item.value,
		}
		tw.slots[pos].PushBack(newItem)
		tw.setTimerPosition(pos, newItem)
	}
}
Copy the code

The above process has the following conditions:

  • delay < internal: Because < single time precision indicates that the task has expired and needs to be executed immediately
  • change-orienteddelay:
    • new >= old:<newPos, newCircle, diff>
    • newCircle > 0: computes diff and converts Circle to the next layer, so diff + Numslots
    • If the delay time is simply shortened, delete the old task flag, add it back to the list, and wait for the next loop to be executed

Execute

In the previous initialization, the run() timer was pushed, mainly by passing tasks from the list to execute Func. Let’s start with the execution of the timer:

// Timer "will execute every internal interval"
func (tw *TimingWheel) onTick(a) {
  // Update the current execution tick position with each execution
	tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
  // Get the bidirectional list of tasks stored in the current tick location
	l := tw.slots[tw.tickedPos]
	tw.scanAndRunTasks(l)
}
Copy the code

Next up is how to execute:

func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
	// Store the task{key, value} that needs to be executed.
	var tasks []timingTask

	fore := l.Front(); e ! =nil; {
		task := e.Value.(*timingEntry)
    // Delete map data in scan
		if task.removed {
			next := e.Next()
			l.Remove(e)
			tw.timers.Del(task.key)
			e = next
			continue
		} else if task.circle > 0 {
			// The current execution point has expired, but is not in the first layer, so the current layer is completed and will be dropped to the next layer
      // However, pos is not modified
			task.circle--
			e = e.Next()
			continue
		} else if task.diff > 0 {
			// Diff must be queued again because diff has been marked before
			next := e.Next()
			l.Remove(e)
			pos := (tw.tickedPos + task.diff) % tw.numSlots
			tw.slots[pos].PushBack(task)
			tw.setTimerPosition(pos, task)
			task.diff = 0
			e = next
			continue
		}
		// The above cases are not executable, and the ones that can be executed will be added to the tasks
		tasks = append(tasks, timingTask{
			key:   task.key,
			value: task.value,
		})
		next := e.Next()
		l.Remove(e)
		tw.timers.Del(task.key)
		e = next
	}
	// For range tasks, execute each task->execute
	tw.runTasks(tasks)
}
Copy the code

The branching situation is explained in the comment and can be viewed in conjunction with the previous moveTask(), where circle drops and diff’s calculation is the key to associating the two functions.

As for diff calculation, it involves the calculation of POS, circle:

// interval: 4min, d: 60min, numSlots: 16, tickedPos = 15
// step = 15, pos = 14, circle = 0
func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
	steps := int(d / tw.interval)
	pos = (tw.tickedPos + steps) % tw.numSlots
	circle = (steps - 1) / tw.numSlots
	return
}
Copy the code

The above process can be simplified as follows:

steps = d / interval
pos = step % numSlots - 1
circle = (step - 1) / numSlots
Copy the code

conclusion

  1. The timingWheel is driven by the timer. When the time moves forward, the timingWheel will take out the tasks in the list “two-way linked list” in the current time lattice and pass them to execute for execution. Because it is promoted by internal fixed time scale, there may be: a task of 60s, internal = 1s, so that 59 empty loops will be run.

  2. In terms of extension time, circle layer is adopted, so that the original numSlots can be reused continuously, because the timer is continuously looped, and the execution can drop the upper slot to the lower one, so that the upper task can be executed in the continuous loop. Such a design can break through long term constraints without creating additional data structures.

At the same time, there are a lot of practical component tools in Go-Zero, and good use of tools can help improve service performance and development efficiency. I hope this article can bring you some gains.

The resources

  • go-zero
  • Go – zero documentation
  • Go to zero in the collection. The Cache

🏆 nuggets technical essay | double festival special articles