“This is the third day of my participation in the First Challenge 2022, for more details: First Challenge 2022”.

Cron source code to read

Robfig /cron/v3 is a Golang timed task library that supports CRon expressions. Cron source code for real textbook level existence (probably my kind of thing…) , really reflects the low coupling high cohesion incisively and vividly, in addition, it involves decorator mode, concurrent processing and so on are worth learning.

Using Cron, you can easily implement a scheduled task as follows:

go get github.com/robfig/cron/v3@v3. 0. 0
Copy the code
package main

import (
   `fmt`
   `github.com/robfig/cron/v3`
   `time`
)

func main(a) {
   c := cron.New(cron.WithSeconds())
   _, err := c.AddFunc("*/3 * * * * *".func(a) {
       fmt.Printf("running in %v \n", time.Now().String())
   })
   iferr ! =nil {
       return
   }
   c.Start()
   ch := make(chan struct{})
   <- ch
}

Copy the code

As you can see from the above examples, the most common cron functions are:

  • New(): instantiates a cron object
  • Cron.AddFunc(): Adds a job to the Cron object that takes two arguments, the first beingcronExpression, the second is a function with no arguments and no return value (job)
  • Cron.Stop(): Stops scheduling. After Stop, no unexecuted jobs will be awakened, but the jobs that have started will not be affected.

A CRON expression is a string of five Spaces, each part representing seconds, minutes, hours, days, months, and weeks from left to right. Each part of a crON expression is a number and some special characters representing an agreed time item. In robfig/cron, the following special characters are allowed for each part:

Field name Is it mandatory? Allowed values Special characters allowed
Seconds Yes 0-59 – * /
Minutes Yes 0-59 – * /
Hours Yes 0-23 – * /
Day of month Yes 1-31 * /, -?
Month Yes 1-12 or JAN-DEC – * /
Day of week Yes 0-6 or SUN-SAT * /, -?

The meanings of these special characters are as follows:

  • *: Matches all the values of the field, for example0, 0 times 1, 1 times, the third field is*Represents (January 1) every hour.
  • /: indicates the range increment, for example*/12 * * * * *The command is executed every 12 seconds
  • .: used to separate items in the same group, as in* * 5,10,15, 3,4 * *Represents the 5th, 10th, 15th of each March or April (3.05, 3.10, 3.15, 4.05, 4.10, 4.15)
  • -: indicates the range, such as*/5 * 10-12 * * *This parameter is executed every five seconds from 10:00 to 12:00 every day
  • ?With:*

The cron expression is simple, but it can accommodate complex usage scenarios for scheduled tasks, such as 0, 0, 10 * * 1-5 at 10:00 am from Monday to Friday. In addition, Cron has several predefined schedules:

Entry Description Equivalent To
@yearly (or @annually) Run once a year, midnight, Jan. 1st 0, 0, 1, 1 star
@monthly Run once a month, midnight, first of month 0, 0, 1 * *
@weekly Run once a week, midnight between Sat/Sun 0, 0 * * 0
@daily (or @midnight) Run once a day, midnight 0 0 * * *
@hourly Run once an hour, beginning of hour 0 * * * *

You can also use the pre-defined @every

to indicate how long it is every, such as @every 10m

.

An overview of the source

Cron is not a large library. The core files and functions are as follows:

  • chain.go: Decorator mode, in which Chain can be used to add multiple decorators to a job for functions such as logging
  • constantdelay.go: As the name implies, provides a simple constant delay, such as every 5 minutes, with minimum granularity support up to seconds
  • cron.go: Provides core functions
  • logger.go: defines a Logger interface that can be plugged into a structured logging system
  • option.go: Is related to changes to the default behavior
  • parser.go: Parses cron expressions
  • spec.go:

Core data structures and interfaces

type Entry truct

An Entry encapsulates a job added to Cron. Each Entry has an ID, and in addition, the Entry stores the last time the job was run and the next time it will be run.

type EntryID int

type Entry struct {
	ID EntryID
	Schedule Schedule
	Next time.Time
	Prev time.Time
	WrappedJob Job
	Job Job
}
Copy the code

type Cron struct

type Cron struct {
    entries   []*Entry          // All jobs added to Cron are saved
    chain     Chain
    stop      chan struct{}     // Chan that receives Stop()
    add       chan *Entry       // Chan that receives AddJob() while Cron is running
    remove    chan EntryID      // Receive chan to remove Job signal
    snapshot  chan chan []Entry // Snapshot signal
    running   bool              // Indicates whether Cron is running
    logger    Logger
    runningMu sync.Mutex        // The lock needs to be preempted before Cron runs to ensure concurrency security
    location  *time.Location
    parser    ScheduleParser    // The cron expression parser
    nextID    EntryID           // ID of the Entry corresponding to the Job to be added
    jobWaiter sync.WaitGroup
}
Copy the code

interface

// The Cron expression parser interface, the Parse method receives a Cron expression spec,
// Returns a parsed Schedule object
type ScheduleParser interface {
	Parse(spec string) (Schedule, error)
}

// An object of type Schedule is used to enter the work cycle of the Job. It contains a Next() method,
// Is used to return the time of the next Job execution
type Schedule interface {
	Next(time.Time) time.Time
}

// Job is an interface for submitted cron jobs.
type Job interface {
	Run()
}
Copy the code

Implementation of the interface

The realization of the ScheduleParser

In parser.go, we can find an implementation of the ScheduleParser interface parser:

type Parser struct {
	options ParseOption
}

func (p Parser) Parse(spec string) (Schedule, error){... }Copy the code

Parser is created by the NewParser() method:

func NewParser(options ParseOption) Parser {
	optionals := 0
	if options&DowOptional > 0 {
		optionals++
	}
	if options&SecondOptional > 0 {
		optionals++
	}
	if optionals > 1 {
		panic("multiple optionals may not be configured")}return Parser{options}
}
Copy the code

In addition, a private global variable standardParser is created in Parser. go:

var standardParser = NewParser(
	Minute | Hour | Dom | Month | Dow | Descriptor,
)
Copy the code

This parser is used later in Cron.

The implementation of the Schedule

The implementation of Schedule, in spec.go, defines a SpecSchedule structure that implements the Schedule interface:

type SpecSchedule struct {
	Second, Minute, Hour, Dom, Month, Dow uint64
	Location *time.Location
}

func (s *SpecSchedule) Next(t time.Time) time.Time{... }Copy the code

The realization of the Job

Job is simply a function passed in by the user, and its implementation is in Cron.go:

type FuncJob func(a)

func (f FuncJob) Run(a) { f() }
Copy the code

conclusion

The class diagram of the core data structure in Cron is as follows:

New()

The New() method in cron.go creates and returns a pointer to a Corn object, which is implemented as follows:

func New(opts ... Option) *Cron {
	c := &Cron{
		entries:   nil,
		chain:     NewChain(),
		add:       make(chan *Entry),
		stop:      make(chan struct{}),
		snapshot:  make(chan chan []Entry),
		remove:    make(chan EntryID),
		running:   false,
		runningMu: sync.Mutex{},
		logger:    DefaultLogger,
		location:  time.Local,
		parser:    standardParser,
	}
	for _, opt := range opts {
		opt(c)
	}
	return c
}
Copy the code

This function takes a variable set of parameters of type Option, which is actually a class of functions:

type Option func(*Cron)
Copy the code

Corn has built-in functions of type Option, all in option.go, starting With With, that change the default behavior of Cron. These functions are executed after the Cron is created in New().

Also, note that the value of C. parser is standardParser, which was introduced in the previous section and is located in Parser. go and is a variable of type Parse, which is a default implementation of SchedleParse.

AddFunc()

AddFunc() is used to add a job to Corn:

func (c *Cron) AddFunc(spec string, cmd func(a)) (EntryID, error) {
    / / packaging
	return c.AddJob(spec, FuncJob(cmd))
}

func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
	schedule, err := c.parser.Parse(spec)
	iferr ! =nil {
		return 0, err
	}
	return c.Schedule(schedule, cmd), nil
}
Copy the code

In AddJob(), standardParser.parse () is called to interpret the CRon expression into schedule. Finally, They call the Schedule() method:

func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	c.nextID++
	entry := &Entry{
		ID:         c.nextID,
		Schedule:   schedule,
		WrappedJob: c.chain.Then(cmd),
		Job:        cmd,
	}
	if! c.running { c.entries =append(c.entries, entry)
	} else {
		c.add <- entry
	}
	return entry.ID
}
Copy the code

This method creates the Entry structure and appends it to Cron’s list of entries. If Cron is already running, it sends the created Entry to Cron’s Add Chan, which is handled in Run ().

Entries () and Entry ()

These two methods are used to return a set of snapshots of Cron entries, entries () returns a snapshot of all jobs, Entry(ID EntryID) returns a snapshot of a particular job based on the ID, essentially iterating through the return value of entries () :

func (c *Cron) Entry(id EntryID) Entry {
	for _, entry := range c.Entries() {
		if id == entry.ID {
			return entry
		}
	}
	return Entry{}
}
Copy the code

The key is the implementation of Entries() :

func (c *Cron) Entries(a) []Entry {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		replyChan := make(chan []Entry, 1)
		c.snapshot <- replyChan
		return <-replyChan
	}
	return c.entrySnapshot()
}
Copy the code

When obtaining a snapshot, the processing logic of Cron varies depending on whether the Cron is running. To avoid Cron running during snapshot acquisition, you need to compete with runningMutex.

If Cron is not running, call entrySnapshot() directly to return the snapshot:

func (c *Cron) entrySnapshot(a) []Entry {
	var entries = make([]Entry, len(c.entries))
	for i, e := range c.entries {
		entries[i] = *e
	}
	return entries
}
Copy the code

This is simply the case. If Cron is already running, it sends a signal to C. napshot, which is processed in cron.run() :

case replyChan := <-c.snapshot:
    replyChan <- c.entrySnapshot()
    continue
Copy the code

This is a bit of a hook, and Entries() create a new chan replyChan and send it to C.spapshot. After listening for this signal through multiplexing in run(), C.entrysnapshot () is called and the results are sent to replyChan, and Entries() blocks waiting for the results and returns.

Why are there two cases when you end up calling c.entrysnapshot ()? More on that later.

Remove()

Remove() is used to delete a job and the logic is similar to Entries() :

func (c *Cron) Remove(id EntryID) {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		c.remove <- id
	} else {
		c.removeEntry(id)
	}
}

func (c *Cron) removeEntry(id EntryID) {
	var entries []*Entry
	for _, e := range c.entries {
		ife.ID ! = id { entries =append(entries, e)
		}
	}
	c.entries = entries
}
Copy the code

C. emove signal processing in run() :

case id := <-c.remove:
    timer.Stop()
    now = c.now()
    c.removeEntry(id)
    c.logger.Info("removed"."entry", id)
Copy the code

Stop()

Stop() is used to Stop Cron, but jobs that are already running are not interrupted, that is, no new jobs are scheduled after Stop() :

func (c *Cron) Stop(a) context.Context {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		c.stop <- struct{}{}
		c.running = false
	}
	ctx, cancel := context.WithCancel(context.Background())
	go func(a) {
         // Wait for all jobs in progress to complete
		c.jobWaiter.Wait()
         Cancelctx.done () cancelctx.done (
		cancel()
	}()
	return ctx
}
Copy the code

Stop() returns a Context, specifically a cancelCtx. The user can listen for cancelctx.done () to know when Cron has actually stopped.

Start()

Start() is used to Start Cron execution:

func (c *Cron) Start(a) {
	c.runningMu.Lock()
	defer c.runningMu.Unlock()
	if c.running {
		return
	}
	c.running = true
	go c.run()
}
Copy the code

This function does three things:

  1. Acquiring a lock
  2. willc.runningSet totrueIndicates that cron is already running
  3. Start a Goroutine executionc.run().runThe middle will keep going round and roundc.entriesIf an entry is allowed to be executed, a separate Goroutine is opened to perform the job

Run is the core of cron. It handles most things after cron starts executing, including adding jobs, deleting jobs, executing jobs, etc. It is a large function with nearly 100 lines, and its structure is as follows:

func (c *Cron) run(a) {
	c.logger.Info("start")

    // The first part
	now := c.now()
	for _, entry := range c.entries {
		entry.Next = entry.Schedule.Next(now)
		c.logger.Info("schedule"."now", now, "entry", entry.ID, "next", entry.Next)
	}

    // Part 2
	for {
        / / 2.1
		sort.Sort(byTime(c.entries))

        / / 2.2
		var timer *time.Timer
		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
			timer = time.NewTimer(100000 * time.Hour)
		} else {
			timer = time.NewTimer(c.entries[0].Next.Sub(now))
		}

        / / 2.3
		for {
			select {}
			break}}}Copy the code

It probably includes the following parts:

  • The first part: iterates through the C.entries list, calculates the Next execution time of the job from schedule.next () and assigns the entry.Next field.

  • The second part is an endless loop, which can be divided into three parts:

    • 2.1: Sort is called to sort the elements in the entries in chronological order of the Next field.

    • 2.2: This section is an initialization of the timer: if there are no jobs available, the timer is set to fire after 100,000 hours (sleep), otherwise the timer will fire when the first job is allowed to be executed, and when the timer is triggered, section 2.3 will do the rest.

    • 2.3: Again, this is the core of the whole run, and the core of the run is an infinite loop, and the core of the loop is a select multiplexing, and the multiplexing is listening for five signals, and how these five signals are generated we’ve already talked about above, They are timer trigger signal timer.c, signal for adding jobs during running C. dd, snapshot signal C. Snapshot, signal for stopping cron C. Top, signal for removing jobs C. emove.

      for {
          select {
              case now = <-timer.C:
                  // ...
              
              case newEntry := <-c.add:
                  // ...
              
              case replyChan := <-c.snapshot:
                  // ...
                  continue
              
              case <-c.stop:
                  // ...
                  return
              
              case id := <-c.remove:
                 // ...
          }
      
          break
      }
      Copy the code

      Let’s look at the processing of each signal separately:

Processing of timer.c

case now = <-timer.C:
    now = now.In(c.location)
    c.logger.Info("wake"."now", now)

    // Run every entry whose next time was less than now
    for _, e := range c.entries {
        if e.Next.After(now) || e.Next.IsZero() {
            break
        }
        c.startJob(e.WrappedJob)
        e.Prev = e.Next
        e.Next = e.Schedule.Next(now)
        c.logger.Info("run"."now", now, "entry", e.ID, "next", e.Next)
    }
Copy the code

This signal can be triggered in two ways:

  1. After sorting the entries, the 0th bit of the job can be executed.
  2. After sleeping for 100,000 hours, the timer was triggered…..

In processing such signals, run traverses all entries, as the jobs are sorted by the time of next execution, so if the signal is triggered in the first case, at least one job can be executed. We traverse the entire entries until we find a job whose execution time is longer than the current time. Note The preceding traversal items can be executed, while the following items cannot be executed. If this signal is emitted because of the second case, it will break on the first judgment

Executing the job calls the cron.startJob() method, which opens a Goroutine for each job to execute the user function:

func (c *Cron) startJob(j Job) {
	c.jobWaiter.Add(1)
	go func(a) {
		defer c.jobWaiter.Done()
		j.Run()
	}()
}
Copy the code

The operation here is simple and violent, and the goroutine is directly used to execute the task. Ensure that the scheduled task ends. If the execution time of the scheduled task is too long and the execution rate is too high, the Goroutine leakage may occur, which may cause memory overflow.

As for jobWaiter, it is used to inform the user when Cron really ends, combined with Stop().

Treatment of C. dd

case newEntry := <-c.add:
    timer.Stop()
    now = c.now()
    newEntry.Next = newEntry.Schedule.Next(now)
    c.entries = append(c.entries, newEntry)
    c.logger.Info("added"."now", now, "entry", newEntry.ID, "next", newEntry.Next)
Copy the code

If the cron is added in the process of running a homework, will stop the timer (new job needs to be sorted), and calculate the new job next execution time (cron is not running when adding homework without this step, because in the first step to Start will be centralized computing, after the centralized computing, step into the second death cycle, “And finally add the new job to the list.

Treatment of C. spapshot

case replyChan := <-c.snapshot:
    replyChan <- c.entrySnapshot()
    continue
Copy the code

This signal, as mentioned above, is triggered when Cron is running and the user requests a snapshot of a job. It is not returned directly in Entries() because once Cron is started, the elements in the Entries list are sorted continuously. This operation is performed in another Goroutine, which may result in dirty data being returned directly.

Also note the continue. If there is no continue, the select will exit after the case is executed, followed by a break, which may result in other events that satisfy c. spnapshot. The for enclosing select does not exist in this case.

So why only C. spapshot needs to continue? In fact, the ultimate purpose of this select is for run to re-block and wait for the next event signal. The others do not re-block because they need to reorder the entries after executing the select. Snapshots do not require a careful comparison of C. dd and C. spapshot.

Treatment of C. top

case <-c.stop:
    timer.Stop()
    c.logger.Info("stop")
    return
Copy the code

This is simple, stop the timer and end the Run Goroutine, since the jobs are executed in their own goroutine, so the return of the run() Goroutine does not affect them.

Treatment of C. rove

case id := <-c.remove:
    timer.Stop()
    now = c.now()
    c.removeEntry(id)
    c.logger.Info("removed"."entry", id)
Copy the code

The logic is the same as c add.

Option

New() takes a set of option parameters that change the default behavior of Cron. These parameters are actually functions that are executed after Cron is initialized. Cron has built-in functions that return functions of type option. Here’s a quick look at what these functions do:

WithLocation

This parameter is used to change the time zone. The value can be obtained from time.Local by default

func WithLocation(loc *time.Location) Option {
	return func(c *Cron) {
		c.location = loc
	}
}
Copy the code

It can be used like this:

c := cron.New(cron.WithLocation(nyc))
Copy the code

WithSeconds

Used to override the default Cron parsing format, week, Month, the default format is mins is Minute | Hour Dom | | the Month | Dow

func WithSeconds(a) Option {
	return WithParser(NewParser(
		Second | Minute | Hour | Dom | Month | Dow | Descriptor,
	))
}
Copy the code

The allowed fields are as follows:

const (
	Second         ParseOption = 1 << iota // Seconds field, default 0
	SecondOptional                         // Optional seconds field, default 0
	Minute                                 // Minutes field, default 0
	Hour                                   // Hours field, default 0
	Dom                                    // Day of month field, default *
	Month                                  // Month field, default *
	Dow                                    // Day of week field, default *
	DowOptional                            // Optional day of week field, default *
	Descriptor                             // Allow descriptors such as @monthly, @weekly, etc.
)
Copy the code

WithParser

If you find Cron expressions difficult to understand and remember, you can write your own parser and replace it with this function.

func WithParser(p ScheduleParser) Option {
	return func(c *Cron) {
		c.parser = p
	}
}
Copy the code

WithChain

Modify the default decorator

func WithChain(wrappers ... JobWrapper) Option {
	return func(c *Cron){ c.chain = NewChain(wrappers...) }}Copy the code

WihLogger

Use a custom logger

func WithLogger(logger Logger) Option {
	return func(c *Cron) {
		c.logger = logger
	}
}
Copy the code

Chain

This is a good decorator pattern to learn. Let’s look at how decorators work by default:

The Cron structure has only one Chain field of type Chain, which is initialized with NewChain() when New() is executed:

c := &Cron{
    entries:   nil,
    chain:     NewChain(),
    // ...
}
Copy the code

NewChain() receives a set of decorator functions and uses them to initialize a Chain object:

type Chain struct {
	wrappers []JobWrapper
}

func NewChain(c ... JobWrapper) Chain {
	return Chain{c}
}
Copy the code

Each Entry structure holds a WrappedJob Job property, which is initialized in Schedule() by calling the chain’s Than() method:

entry := &Entry{
    ID:         c.nextID,
    Schedule:   schedule,
    WrappedJob: c.chain.Then(cmd),
    // ...
}
Copy the code

In Then(), these decorators are executed:

func (c Chain) Then(j Job) Job {
	for i := range c.wrappers {
		j = c.wrappers[len(c.wrappers)-i- 1](j)
	}
	return j
}
Copy the code

Then() returns the Job after the decorator is executed (the decorated Job), which explains why, in run(), startJob() is passed e.wheeler Job instead of E.ob.

Now that you know how a decorator works, let’s look at the three built-in decorators provided in Chain.go

Recover

Similar to the built-in recover(), it catches the panic during the run and records it using the supplied logger. All it does is insert a defer func(){}() into the user’s Job.

func Recover(logger Logger) JobWrapper {
	return func(j Job) Job {
		return FuncJob(func(a) {
			defer func(a) {
				if r := recover(a); r ! =nil {
					const size = 64 << 10
					buf := make([]byte, size)
					buf = buf[:runtime.Stack(buf, false)]
					err, ok := r.(error)
					if! ok { err = fmt.Errorf("%v", r)
					}
					logger.Error(err, "panic"."stack"."... \n"+string(buf))
				}
			}()
			j.Run()
		})
	}
}
Copy the code

DelayIfStillRunning

For example, if a Job needs to be executed 10 seconds, the execution frequency is once a second. If we want to ensure that only one Job is executed at the same time, we can use this decorator. An exclusive lock is added for each Job. The lock is obtained before the Job is executed and released when the Job exits. If a Job waits for the lock for more than one minute, the lock is recorded in a log.

func DelayIfStillRunning(logger Logger) JobWrapper {
	return func(j Job) Job {
		var mu sync.Mutex
		return FuncJob(func(a) {
			start := time.Now()
			mu.Lock()
			defer mu.Unlock()
			if dur := time.Since(start); dur > time.Minute {
				logger.Info("delay"."duration", dur)
			}
			j.Run()
		})
	}
}
Copy the code

SkipIfStillRunning

This decorator uses a chan with a capacity of 1 to consume the data in the chan before executing the Job, and then fills the chan with another data after executing the Job. SQL > select * from chan (select * from chan); if there is data in chan (select * from chan), execute (select * from chan);

func SkipIfStillRunning(logger Logger) JobWrapper {
	return func(j Job) Job {
		var ch = make(chan struct{}, 1)
		ch <- struct{} {}return FuncJob(func(a) {
			select {
			case v := <-ch:
				defer func(a) { ch <- v }()
				j.Run()
			default:
				logger.Info("skip")}})}}Copy the code

conclusion

Several features of Cron:

  1. Allow Add or remove jobs in allow: Send signals through chan, select listening, and reorder.
  2. Decorator mechanism: Allows decorators to be added to jobs, which are executed at Entry initialization.
  3. Low coupling:New()When can be passedOptionTo change some of the default behavior, such as implementing your own Cron interpreter.
  4. Each Job is executed using a separate Goroutine.
  5. Stop Cron Does not Stop a Job that has been started but is still being executedContextTo know when it’s done.