introduce

Simple time wheel

In the time wheel store tasks is a ring queue, the bottom of the array implementation, each element in the array can store a scheduled task list. The scheduled task list is a circular bidirectional linked list. Each item in the linked list represents the scheduled task item, which encapsulates the real scheduled task.

A time wheel consists of multiple time cells, each representing the basic time span (tickMs) of the current time wheel. The number of time cells of the time wheel is fixed and can be expressed by wheelSize, so the overall time span (interval) of the whole time wheel can be calculated by the formula tickMs×wheelSize.

The time wheel also has a dial pointer (currentTime) that represents the currentTime of the time wheel, which is an integer multiple of tickMs. Where currentTime points to is the due time cell, which represents all the tasks in the linked list that need to be processed by the time cell.

The figure below shows a time wheel with tickMs of 1S and wheelSize equal to 10. Each cell contains a linked list of scheduled tasks, in which there are real task items:

In the initial case, the dial pointer currentTime points to time grid 0. If the tickMs of the time wheel is 1ms and wheelSize is 10, the interval is equal to 10s. As shown in the figure below, a task with timing of 2s will be inserted into the linked list of tasks with time grid of 2, marked in red. As time goes on, the pointer currentTime moves forward. If 2s passes, currentTime will point to the position of time cell 2, and the list of tasks in this time cell will be retrieved and processed.

If the current pointer currentTime is 2, and a task with 9s is inserted, the new task will take the original time list and be stored in time 1

The time wheels mentioned here are simple time wheels with only one layer, and the total time range is between currentTime and currentTime+ Interval. If there is a 15s scheduled task that needs to restart a time wheel, a time wheel with a time span of at least 15s is sufficient. However, there is no bottom line. If you need a time round of 10,000 seconds, you will need such a large array to store, which will not only take up a lot of memory space, but also reduce efficiency by traversing such a large array.

Therefore, the concept of hierarchical time wheel is introduced.

Hierarchical time wheel

The figure shows a two-layer time wheel. The second layer time wheel is also composed of 10 time cells, and the span of each cell is 10s. The tickMs of the layer 2 time wheel is the interval of the layer 1 time wheel, that is, 10s. The wheelSize of each time wheel is fixed and is 10, so the overall time span interval of the second time wheel is 100s.

The figure shows the corresponding expiration time range of each time cell. We can clearly see that the expiration time range of the 0 th time cell of the second time round is [0,9]. In other words, one lattice of the second time round can represent all (10) lattice of the first time round;

If a 15s task is added to this time wheel, when the first time wheel cannot accommodate it, it will enter the second time wheel and insert it into the time cell with expiration time [10,19].

With the passage of time, when the original 15 s task 5 s left, there is a time round the drop operation, the first time round the overall time span is enough, this task is added to the time of the first layer round due time is 5, and then after 5 s, this task is really mature, eventually executes at the end of operation.

Code implementation

Because our Go version of the time wheel code is modeled after Kafka, there are a few minor details to implement the TimingWheel:

  • There is a root node for each linked list in the time cell of the time wheel to simplify the boundary conditions. It is an additional linked list node. This node, as the first node, does not store anything in its range, but is introduced for ease of operation.
  • Except for the layer 1 time round, the startMs of the high-level time round is set to the currentTime of the previous round when this layer time round was created. CurrentTime for each layer must be a multiple of tickMs. If not, currentTime is trimmed to a multiple of tickMs. CurrentTime = startMs – (startMs % tickMs);
  • The timer in Kafka only holds a reference to the first layer of the TimingWheel. It does not hold a reference to the other layer of the TimingWheel directly, but each layer of the time wheel has a reference to the application of the higher layer.
  • The timer in Kafka uses the DelayQueue to help advance the time wheel. In the operation, each linked list in each used time grid will be added to DelayQueue, which will be sorted according to the expiration time corresponding to the time round. The tasks with the shortest expiration time will be ranked at the head of DelayQueue. Fetching expired tasks from DelayQueue via a separate thread;

The structure of the body

Type TimingWheel struct {tick int64 // in milliseconds // wheelSize int64 // In milliseconds // currentTime int64 // in milliseconds // buckets []*bucket queue // overflowWheel unsafe.Pointer // type: *TimingWheel exitC chan struct{} waitGroup waitGroupWrapper }Copy the code

Tick, wheelSize, Interval and currentTime are easy to understand. Buckets represents the time lattice list, queue is a delay queue, all tasks are triggered by the delay queue, and overflowWheel is the reference of the upper time wheel.

Type bucket struct {// Expiration time of task expiration int64 mu sync.Mutex // Timers *list. list}Copy the code

In fact, the bucket encapsulates the task queue in the time frame and holds tasks with the same expiration time. After the tasks expire, the timers take out the queue for processing. The interesting thing here is that since there are multiple threads accessing the bucket concurrently, the atomic class is used to fetch the int64-bit value, and 64-bit alignment is required to ensure consistency in reading the 64-bit data on a 32-bit system.

Type Timer struct {// expiration time int64 // in milliseconds // Task func() // Pointer to the bucket Unsafe.Pointer // type: *bucket // element *list.Element}Copy the code

Timer is the smallest execution unit of a time wheel. It encapsulates a scheduled task and invokes a task to execute the task after it expires.

Initialize the time wheel

For example, now initialize a time wheel with tick 1s and wheelSize 10:

func main() { tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start() } func NewTimingWheel(tick time.Duration, WheelSize int64) *TimingWheel {// Convert the incoming tick into a Millisecond tickMs := int64(tick/time.millisecond) // if the tick is less than 0, Panic if tickMs <= 0 {panic(errors.New("tick must be greater than or equal to 1ms"))} // Set startMs := TimeToMs (time.now ().utc ()) // Initialize TimingWheel return newTimingWheel(tickMs, wheelSize, startMs, delayqueue.New(int(wheelSize)), ) } func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, Queue * delayqueue.delayqueue) *TimingWheel {// buckets := make([]*bucket, WheelSize) for I := range buckets {buckets[I] = newBucket()} TickMs, wheelSize: wheelSize, // currentTime must be a multiple of tickMs, so use truncate here to trim currentTime: truncate(startMs, tickMs), interval: tickMs * wheelSize, buckets: buckets, queue: queue, exitC: make(chan struct{}), } }Copy the code

Initialization is very simple, you can look at the code comments above.

Start time wheel

Let’s look at the start method:

// Poll will execute an infinite loop, Tw.waitgroup.wrap (func() {tw.queue.Poll(tw.exitc, tw.exitc, tw.waitgroup.wrap (func() {tw.queue. Int64 {return timeToMs(time.now ().utc ()})}) tw.waitgroup.wrap (func() {for {select { // Bucket case elem := <-tw.queue.C: = <-tw.queue. * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * And call addOrRun to execute b.Flush(tw.addorrun) case < -tw.exitc: return}}})}Copy the code

This uses a Wrap method wrapped by Util, which raises a goroutines asynchronously to execute the function passed in. See the source code link above.

The Start method starts two goroutines. The first goroutines calls the Poll method of the deferred queue, which cycles through the queue and puts the expired data into the C pipe of the queue. The second goroutines will loop indefinitely to C in the queue. If C has expired, it will call advanceClock to move currentTime forward to the expiration time of the bucket. The Flush method is then called to fetch the queue from the bucket and the addOrRun method is called to execute.

func (tw *TimingWheel) advanceClock(expiration int64) { currentTime := atomic.LoadInt64(&tw.currentTime) // Expiration time >= (currentTime+ tick) if expiration >= currentTime+tw.tick {// change currentTime to expiration, CurrentTime = truncate(expiration, tw.tick) atomic.storeInt64 (&tw.currentTime, CurrentTime) // Try to advance the clock of the overflow wheel if present OverflowWheel := atomic.loadPointer (&tw.overflowWheel) if overflowWheel! = nil { (*TimingWheel)(overflowWheel).advanceClock(currentTime) } } }Copy the code

The advanceClock method resets currentTime based on the expiration date to advance the time wheel.

Func (b *bucket) Flush(reinsert func(*Timer)) {var ts []*Timer b.maxock ();  e ! = nil; {next := e.next () t := e.value.(*Timer) // Remove bucket queue from bucket queue. t) e = next } b.mu.Unlock() b.SetExpiration(-1) // TODO: Improve the coordination with b.Add() for _, t := range ts { reinsert(t) } }Copy the code

The Flush method is inserted into the TS array by iterating over the timers list in the bucket and then calls the Reinsert method, in this case the addOrRun method.

Func (TW *TimingWheel) addOrRun(t *Timer) { Tw.add (t) {// Perform scheduled tasks asynchronously go t.ask ()}}Copy the code

AddOrRun will call the add method to check whether the incoming Timer task has expired. If so, the task method will be asynchronously executed. The Add method will be examined next.

The entire start execution process is shown as follows:

  1. The start method starts a goroutines call poll to process the expired data in the DelayQueue and put the data into pipe C;
  2. The second goroutines method is started by the start method, which loops over the data of pipe C in DelayQueue, which actually holds a bucket, and then iterates over the bucket’s timers list. If the task has expired, it is executed asynchronously. If it does not expire, it is put back into the DelayQueue.

add task

Func main () {tw: = timingwheel NewTimingWheel (time. Second, 10) tw. Start () / / add tasks. Tw AfterFunc (time. The Second * 15, func() { fmt.Println("The timer fires") exitC <- time.Now().UTC() }) }Copy the code

We add a scheduled task for 15s through the AfterFunc method, and if it expires, execute the function passed in.

func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer {
    t := &Timer{
        expiration: timeToMs(time.Now().UTC().Add(d)),
        task:       f,
    }
    tw.addOrRun(t)
    return t
}
Copy the code

The AfterFunc method calls the addOrRun method based on the incoming task expiration time and the function that needs to be executed. The addOrRun method, seen above, determines whether a scheduled task needs to be executed based on the expiration date.

Let’s look at the add method:

Func (TW *TimingWheel) add(t *Timer) bool {currentTime := atom.loadInt64 (&tw.currentTime) CurrentTime +tw.tick {// Already expired return false} else if t.xpiration < currentTime+tw.interval { VirtualID := t.xpiration/tw.tick B := tw.buckets[virtualID%tw.wheelSize] // Put the task in the bucket queue b.addd (t) // If the time is the same, return false, If b.expiration (virtualID * tw.tick) {// Add this bucket to the delay queue tw.queue.Offer(b, B.expiration ()} return true} else {// Out of the interval. Put it into the overflow wheel Then into a layer of overflowWheel: = atomic. LoadPointer (& tw. OverflowWheel) if overflowWheel = = nil pareAndSwapPointer ({atomic.Com &tw.overflowWheel, nil, // Unsafe.Pointer(newTimingWheel(tw.interval, tw.wheelSize, currentTime, tw.queue,)), OverflowWheel = atomic.LoadPointer(&tw.overflowWheel)} return (*TimingWheel)(overflowWheel).add(t)}}Copy the code

The add method is divided into three parts according to the expiration time. The first part is less than the current time +tick, indicating that the expiration time has expired, so return false to execute the task.

The judgment of the second part will be based on whether expiration is smaller than the span of the time wheel. If expiration is smaller than the span of the time wheel, the timing task can be put into the current time wheel, and the time frame corresponding to buckets can be found by taking the modulus and put into the bucket queue. The SetExpiration method determines whether the Offer method of the delay queue has been executed according to the passed parameter to prevent repeated inserts.

The third part indicates that the time span of the scheduled task exceeds the current time wheel and needs to be upgraded to the time wheel of the previous layer. Note that the tick of the upper time wheel is the interval of the current time wheel, and the delay queue is the same. Then set it to the pointer overflowWheel and call add to recurse to the upper level.

This is the end of the time wheel, but it is important to note that in the implementation of the time wheel above, we used DelayQueue plus a ring queue to implement the time wheel. The TimingWheel time complexity is O(1) for insertion and deletion of scheduled task items, and the queues in DelayQueue use priority queues and time complexity is O(log n). However, buckets list is very small, so it does not affect performance.