Reprint, please declare the source ~ this article published on luozhiyun blog: www.luozhiyun.com/archives/44…

Recently, I have a requirement in my work. To put it simply, I will create millions of scheduled tasks in a short time. When creating tasks, I will add the corresponding amounts to prevent overbooking.

The performance of this requirement is low if the Go built-in Timer is used, because the Timer is implemented using the minimum heap, and the time complexity of both creation and deletion is O(log n). O(1) is much better if you use a time wheel.

For time round, I wrote a Java version of the time before wheel algorithm analysis: www.luozhiyun.com/archives/59…

Time wheel is widely used in Netty, Akka, Quartz, ZooKeeper, Kafka and other components. The following time wheel with Go implementation is Kafka code as a prototype to achieve, complete code: github.com/devYun/timi…

introduce

Simple time wheel

In the time wheel store tasks is a ring queue, the bottom of the array implementation, each element in the array can store a scheduled task list. The scheduled task list is a circular bidirectional linked list. Each item in the linked list represents the scheduled task item, which encapsulates the real scheduled task.

A time wheel consists of multiple time cells, each representing the basic time span (tickMs) of the current time wheel. The number of time cells of the time wheel is fixed and can be expressed by wheelSize, so the overall time span (interval) of the whole time wheel can be calculated by the formula tickMs×wheelSize.

The time wheel also has a dial pointer (currentTime) that represents the currentTime of the time wheel, which is an integer multiple of tickMs. Where currentTime points to is the due time cell, which represents all the tasks in the linked list that need to be processed by the time cell.

The figure below shows a time wheel with tickMs of 1S and wheelSize equal to 10. Each cell contains a linked list of scheduled tasks, in which there are real task items:

In the initial case, the dial pointer currentTime points to time grid 0. If the tickMs of the time wheel is 1ms and wheelSize is 10, the interval is equal to 10s. As shown in the figure below, a task with timing of 2s will be inserted into the linked list of tasks with time grid of 2, marked in red. As time goes on, the pointer currentTime moves forward. If 2s passes, currentTime will point to the position of time cell 2, and the list of tasks in this time cell will be retrieved and processed.

If the current pointer currentTime is 2, and a task with 9s is inserted, the new task will take the original time list and be stored in time 1

The time wheels mentioned here are simple time wheels with only one layer, and the total time range is between currentTime and currentTime+ Interval. If there is a 15s scheduled task that needs to restart a time wheel, a time wheel with a time span of at least 15s is sufficient. However, there is no bottom line. If you need a time round of 10,000 seconds, you will need such a large array to store, which will not only take up a lot of memory space, but also reduce efficiency by traversing such a large array.

Therefore, the concept of hierarchical time wheel is introduced.

Hierarchical time wheel

The figure shows a two-layer time wheel. The second layer time wheel is also composed of 10 time cells, and the span of each cell is 10s. The tickMs of the layer 2 time wheel is the interval of the layer 1 time wheel, that is, 10s. The wheelSize of each time wheel is fixed and is 10, so the overall time span interval of the second time wheel is 100s.

The figure shows the corresponding expiration time range of each time cell. We can clearly see that the expiration time range of the 0 th time cell of the second time round is [0,9]. In other words, one lattice of the second time round can represent all (10) lattice of the first time round;

If a 15s task is added to this time wheel, when the first time wheel cannot accommodate it, it will enter the second time wheel and insert it into the time cell with expiration time [10,19].

With the passage of time, when the original 15 s task 5 s left, there is a time round the drop operation, the first time round the overall time span is enough, this task is added to the time of the first layer round due time is 5, and then after 5 s, this task is really mature, eventually executes at the end of operation.

Code implementation

Because our Go version of the time wheel code is modeled after Kafka, there are a few minor details to implement the TimingWheel:

  • There is a root node for each linked list in the time cell of the time wheel to simplify the boundary conditions. It is an additional linked list node. This node, as the first node, does not store anything in its range, but is introduced for ease of operation.
  • Except for the layer 1 time round, the startMs of the high-level time round is set to the currentTime of the previous round when this layer time round was created. CurrentTime for each layer must be a multiple of tickMs. If not, currentTime is trimmed to a multiple of tickMs. CurrentTime = startMs – (startMs % tickMs);
  • The timer in Kafka only holds a reference to the first layer of the TimingWheel. It does not hold a reference to the other layer of the TimingWheel directly, but each layer of the time wheel has a reference to the application of the higher layer.
  • The timer in Kafka uses the DelayQueue to help advance the time wheel. In the operation, each linked list in each used time grid will be added to DelayQueue, which will be sorted according to the expiration time corresponding to the time round. The tasks with the shortest expiration time will be ranked at the head of DelayQueue. Fetching expired tasks from DelayQueue via a separate thread;

The structure of the body

type TimingWheel struct {
	// Time span, in milliseconds
	tick      int64 // in milliseconds
	// Number of time rounds
	wheelSize int64
	/ / the total span
	interval    int64 // in milliseconds
	// The current pointer points to time
	currentTime int64 // in milliseconds
	// Time grid list
	buckets     []*bucket
	// Delay queue
	queue       *delayqueue.DelayQueue 
	// The superior time wheel reference
	overflowWheel unsafe.Pointer // type: *TimingWheel

	exitC     chan struct{}
	waitGroup waitGroupWrapper
}
Copy the code

Tick, wheelSize, Interval and currentTime are easy to understand. Buckets represents the time lattice list, queue is a delay queue, all tasks are triggered by the delay queue, and overflowWheel is the reference of the upper time wheel.

type bucket struct {
	// The expiration time of the task
	expiration int64

	mu     sync.Mutex
	// Task queues with the same expiration time
	timers *list.List
}
Copy the code

In fact, the bucket encapsulates the task queue in the time frame and holds tasks with the same expiration time. After the tasks expire, the timers take out the queue for processing. The interesting thing here is that since there are multiple threads accessing the bucket concurrently, the atomic class is used to fetch the int64-bit value, and 64-bit alignment is required to ensure consistency in reading the 64-bit data on a 32-bit system. Specific can see this article: www.luozhiyun.com/archives/42…

type Timer struct {
  // Expiration time
	expiration int64 // in milliseconds
  // The specific task to be performed
	task       func(a)
	// The pointer to the bucket of the Timer
	b unsafe.Pointer // type: *bucket
	// The corresponding element in the bucket list
	element *list.Element
}
Copy the code

Timer is the smallest execution unit of a time wheel. It encapsulates a scheduled task and invokes a task to execute the task after it expires.

Initialize the time wheel

For example, now initialize a time wheel with tick 1s and wheelSize 10:

func main(a) {
	tw := timingwheel.NewTimingWheel(time.Second, 10)
	tw.Start() 
}

func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel {
  // Convert the passed tick to milliseconds
	tickMs := int64(tick / time.Millisecond)
  // If less than zero, panic
	if tickMs <= 0 {
		panic(errors.New("tick must be greater than or equal to 1ms"))}// Set the start time
	startMs := timeToMs(time.Now().UTC())
	// Initialize the TimingWheel
	return newTimingWheel(
		tickMs,
		wheelSize,
		startMs,
		delayqueue.New(int(wheelSize)),
	)
}

func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel {
  // Initialize the buckets size
	buckets := make([]*bucket, wheelSize)
	for i := range buckets {
		buckets[i] = newBucket()
	}
  // instantiate TimingWheel
	return &TimingWheel{
		tick:        tickMs,
		wheelSize:   wheelSize,
    // currentTime must be a multiple of tickMs, so use TRUNCate here for pruning
		currentTime: truncate(startMs, tickMs),
		interval:    tickMs * wheelSize,
		buckets:     buckets,
		queue:       queue,
		exitC:       make(chan struct{},}}Copy the code

Initialization is very simple, you can look at the code comments above.

Start time wheel

Let’s look at the start method:

func (tw *TimingWheel) Start(a) {
	// Poll performs an infinite loop to put the expired elements into the C pipe of the queue
	tw.waitGroup.Wrap(func(a) {
		tw.queue.Poll(tw.exitC, func(a) int64 {
			return timeToMs(time.Now().UTC())
		})
	})
	// Start an infinite loop to get data from C in queue
	tw.waitGroup.Wrap(func(a) {
		for {
			select {
			// All the data coming out of the queue are expired buckets
			case elem := <-tw.queue.C:
				b := elem.(*bucket)
				// The time wheel moves currentTime forward to the expiration time of the bucket
				tw.advanceClock(b.Expiration())
				// Fetch the data from the bucket queue and call the addOrRun method
				b.Flush(tw.addOrRun)
			case <-tw.exitC:
				return}}})}Copy the code

This uses a Wrap method wrapped by Util, which raises a goroutines asynchronously to execute the function passed in. See the source code link above.

The Start method starts two goroutines. The first goroutines calls the Poll method of the deferred queue, which cycles through the queue and puts the expired data into the C pipe of the queue. The second goroutines will loop indefinitely to C in the queue. If C has expired, it will call advanceClock to move currentTime forward to the expiration time of the bucket. The Flush method is then called to fetch the queue from the bucket and the addOrRun method is called to execute.

func (tw *TimingWheel) advanceClock(expiration int64) {
	currentTime := atomic.LoadInt64(&tw.currentTime)
	// Expiration time is greater than or equal to (current time +tick)
	if expiration >= currentTime+tw.tick {
		// Advance currentTime by setting currentTime to expiration
		currentTime = truncate(expiration, tw.tick)
		atomic.StoreInt64(&tw.currentTime, currentTime)

		// Try to advance the clock of the overflow wheel if present
		// If there is an upper time wheel, the reference to the upper time wheel is recursively called
		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
		ifoverflowWheel ! =nil {
			(*TimingWheel)(overflowWheel).advanceClock(currentTime)
		}
	}
}
Copy the code

The advanceClock method resets currentTime based on the expiration date to advance the time wheel.

func (b *bucket) Flush(reinsert func(*Timer)) {
	var ts []*Timer

	b.mu.Lock()
	// Loop to get the bucket queue node
	fore := b.timers.Front(); e ! =nil; {
		next := e.Next()

		t := e.Value.(*Timer)
		// Remove the head node from the bucket queue
		b.remove(t)
		ts = append(ts, t)

		e = next
	}
	b.mu.Unlock()

	b.SetExpiration(- 1) // TODO: Improve the coordination with b.Add()

	for _, t := range ts {
		reinsert(t)
	}
}
Copy the code

The Flush method is inserted into the TS array by iterating over the timers list in the bucket and then calls the Reinsert method, in this case the addOrRun method.

func (tw *TimingWheel) addOrRun(t *Timer) {
	// If it has expired, execute it directly
	if! tw.add(t) {// Execute scheduled tasks asynchronously
		go t.task()
	}
}
Copy the code

AddOrRun will call the add method to check whether the incoming Timer task has expired. If so, the task method will be asynchronously executed. The Add method will be examined next.

The entire start execution process is shown as follows:

  1. The start method starts a goroutines call poll to process the expired data in the DelayQueue and put the data into pipe C;
  2. The second goroutines method is started by the start method, which loops over the data of pipe C in DelayQueue, which actually holds a bucket, and then iterates over the bucket’s timers list. If the task has expired, it is executed asynchronously. If it does not expire, it is put back into the DelayQueue.

add task

func main(a) {
	tw := timingwheel.NewTimingWheel(time.Second, 10)
	tw.Start() 
	// Add a task
	tw.AfterFunc(time.Second*15.func(a) {
		fmt.Println("The timer fires")
		exitC <- time.Now().UTC()
	})
}
Copy the code

We add a scheduled task for 15s through the AfterFunc method, and if it expires, execute the function passed in.

func (tw *TimingWheel) AfterFunc(d time.Duration, f func(a)) *Timer {
	t := &Timer{
		expiration: timeToMs(time.Now().UTC().Add(d)),
		task:       f,
	}
	tw.addOrRun(t)
	return t
}
Copy the code

The AfterFunc method calls the addOrRun method based on the incoming task expiration time and the function that needs to be executed. The addOrRun method, seen above, determines whether a scheduled task needs to be executed based on the expiration date.

Let’s look at the add method:

func (tw *TimingWheel) add(t *Timer) bool {
	currentTime := atomic.LoadInt64(&tw.currentTime)
	// It has expired
	if t.expiration < currentTime+tw.tick {
		// Already expired
		return false
	// The expiration time is within the first loop
	} else if t.expiration < currentTime+tw.interval {
		// Put it into its own bucket
		// Get the position of the time wheel
		virtualID := t.expiration / tw.tick
		b := tw.buckets[virtualID%tw.wheelSize]
		// Put the task into the bucket queue
		b.Add(t) 
		// If the time is the same, return false to prevent multiple inserts into the queue
		if b.SetExpiration(virtualID * tw.tick) { 
			// Add the bucket to the delay queue
			tw.queue.Offer(b, b.Expiration())
		}

		return true
	} else {
		// Out of the interval. Put it into the overflow wheel
		// If the expiration time is longer than the first time round, put it into the previous layer
		overflowWheel := atomic.LoadPointer(&tw.overflowWheel)
		if overflowWheel == nil {
			atomic.CompareAndSwapPointer(
				&tw.overflowWheel,
				nil.// Note that the tick is changed to interval
				unsafe.Pointer(newTimingWheel(
					tw.interval,
					tw.wheelSize,
					currentTime,
					tw.queue,
				)),
			)
			overflowWheel = atomic.LoadPointer(&tw.overflowWheel)
		}
		// Go up
		return (*TimingWheel)(overflowWheel).add(t)
	}
}
Copy the code

The add method is divided into three parts according to the expiration time. The first part is less than the current time +tick, indicating that the expiration time has expired, so return false to execute the task.

The judgment of the second part will be based on whether expiration is smaller than the span of the time wheel. If expiration is smaller than the span of the time wheel, the timing task can be put into the current time wheel, and the time frame corresponding to buckets can be found by taking the modulus and put into the bucket queue. The SetExpiration method determines whether the Offer method of the delay queue has been executed according to the passed parameter to prevent repeated inserts.

The third part indicates that the time span of the scheduled task exceeds the current time wheel and needs to be upgraded to the time wheel of the previous layer. Note that the tick of the upper time wheel is the interval of the current time wheel, and the delay queue is the same. Then set it to the pointer overflowWheel and call add to recurse to the upper level.

This is the end of the time wheel, but it is important to note that in the implementation of the time wheel above, we used DelayQueue plus a ring queue to implement the time wheel. The TimingWheel time complexity is O(1) for insertion and deletion of scheduled task items, and the queues in DelayQueue use priority queues and time complexity is O(log n). However, buckets list is very small, so it does not affect performance.

Reference

Github.com/RussellLuo/…

zhuanlan.zhihu.com/p/121483218

Github.com/apache/kafk…