Distributed delay queue design V1 ~ 2

One, the introduction

background

A lot of the time when we’re building systems, we’re dealing with real-time tasks, processing requests as they come in, and then giving feedback to the user immediately. But sometimes you encounter tasks that are not real-time, such as making important announcements at certain points in time. Or X minutes /Y hours after the user has done something, EG:

“PM: We need to alert this user 10 minutes after the call starts and send them a reward”

For specific actions, such as notifications, coupons, and so on. Most of the solutions I have encountered are to maintain a backend for a small service. However, as more backend and servers are used, this method is largely coupled to its own business, so a delayed queue service is required.

Noun explanation

Topic_list queue: Each incoming delay request should have another delay topic reference kafka, logically divided into a queue for each service to be processed separately;

Topic_info queue: there is a new queue for each topic in the queue. Each time the topic information is scanned to detect the number of new topic establishment and destruction management service coroutines.

Offset: current consumption progress;

New_offset: progress of new consumption, offset to be replaced;

Topic_offset_lock: distributed lock.

Second, design objectives

Feature list

1. The interface for adding delay information is called based on HTTP

2, has the storage queue feature, can save nearly 3 days of queue consumption data

3. Provide consumption functions

4. Delay notification

Performance indicators

Expected interface call usage: 3500 single-class tasks in one second and 1300 multi-second single-class tasks

Pressure test results:

Simple pressure measured

WRK write QPS: 259.3s Write 9000 records single thread without concurrency

Trigger performance/accuracy: 1000 per second, no extension in the test machine. At 3000 seconds, 1-2 seconds of delay occasionally occurs. Memory and CPU are affected.

3. System design

The interaction process

Sequence diagram

This design is based on HTTP interface invocation. When adding messages to the existing queue of topic, the messages will be added to the end of the corresponding topic queue and stored. When adding messages to the non-existing queue of topic, a new topic queue will be established first. The instance that captures the lock first obtains the corresponding queue offset, sets the new offset, can release the lock and let other instances fight for it, ejects a certain number of elements in the queue head, and then obtains the instance of the offset segment to store the detailed information, which is processed in the coroutine, and the main coroutine waits for the next trigger. Coroutines are then added to monitor triggering.

Module partition

1. Queue storage module

1· The delay.base module under Delay is mainly responsible for receiving write requests and writing queue information to storage, but not responsible for backend logic and invoking storage modules

2. Backend module. The Delay. backend module under Delay is responsible for time-triggered scanning of corresponding topic queues, invokes storage modules, accesses and reads storage modules, and invokes callback modules

1. Add Groutine to scan topic

2. Scan topic_list consumption information

3. Scan topic_list and close Groutine if there is no consumption within a certain period of time

3. Callback module, mainly responsible for sending the data that has reached the time and informing the corresponding service

3. Storage module

· Distributed locking module, multi-machine deployment of the system to ensure the uniqueness of each consumption. Lock the offset segment from each topic consumption to new_offset segment independently

2. Manage the list of topics. Manage the number of topics and control the number of coroutines

3. Topic_list, message queue

4. Topic_info, the message entity, may need to carry some information in the callback for unified processing

4. Unique number generation module.

5. Cache design

Full cache mode is currently used

The key design.

Topic Management list key: XX:DELAY_TOPIC_LIST type:list

Topic_list key: XX:DELAY_SIMPLE_TOPIC_TASK-%s(based on topic key) type:zset

Topic_info key: XX:DELAY_REALL_TOPIC_TASK-%s(based on topic key) type:hash

Topic_offset key: XX:DELAY_TOPIC_OFFSET-%s(based on topic key) Type :string

Topic_lock key: xx:DELAY_TOPIC_RELOAD_LOCK-%s(based on topic key) Type :string

6. Interface design

Delay.task. addv1 (addv1 to delay queue)

Sample request

curl -d 
'{" topic ":" XXX ", / / business topic "timing_moment" :, / / SEC, should time moment "content" : "{}" / / the message body, json string}'
'http://127.0.0.1:xxxx/delay/task/add'Copy the code

Return the sample

{
    "dm_error": 0."error_msg": "Operation successful"."task_id":112345465765
}Copy the code

Return in pull callback mode (v2 is no longer supported)

Sample request

curl -d 
'{"topic": "XXXX ", // business topic" task_id":1324568798765 // taskid, optional, if there is a specific message returned}'
'http://127.0.0.1:xxxx/delay/task/pull'Copy the code

Return the sample

{
    "dm_error": 0."error_msg": "Operation successful"
    "content":"{"\xxx"\}"
}Copy the code

Delay.task. addv2 (addv2 to delay queue)

Sample request

curl -d 
'{"topic": "XXX ", // business topic" timing_moment":, // "{/ / the message content (json string)" sn ":" message. Call ", / / the service discovery name (or to configure the service name) "url" : "/ ev/tp/XXXX", / / callback url "XXX" : "XXX" / / the other fields} "} '
'http://127.0.0.1:xxxx/delay/task/add'Copy the code

The sample

curl -d '{ "topic":"xxxx_push", "content":"{ "uid":"111111", "sn":"other.server", "url":"/xxxx/callback", "msg_type":"gift", }", "timing_moment":1565700615 }' 
http://127.0.0.1:xxxx/delay/task/addCopy the code

Return the sample

{
    "dm_error": 0."error_msg": "Operation successful"."task_id":112345465765
}Copy the code

7, MQ design (V2 no longer supported)

Kafka consumption mode

Topic: delay_base_push fixed return format {"topic": "xxxx", // Business topic"content": "{}"// Single production message content}Copy the code

Viii. Other designs

Unique design

The storage module is called and the unique number is generated using redis auto-increment combination logic as follows:

func (c *CacheManager) OperGenTaskid() (uint64, error) {
	now := time.Now().Unix()
	key := c.getDelayTaskIdKey()
	reply, err := c.DelayRds.Do("INCR", key)
	iferr ! = nil { log.Errorf("genTaskid INCR key:%s, error:%s", key, err)
		return 0, err
	}
	version := reply.(int64)
	ifVersion == 1 {// The default is that 100 tasks can be created in 1 second. time.Duration(100)*time.Second) } incrNum := version % 10000 taskId := (uint64(now)*10000 + uint64(incrNum)) log.Debugf("genTaskid INCR key:%s, taskId:%d", key, taskId)
	return taskId, nil
}Copy the code

Distributed lock design

func (c *CacheManager) SetDelayTopicLock(ctx context.Context, topic string) (bool, error) {
	key := c.getDelayTopicReloadLockKey(topic)
	reply, err := c.DelayRds.Do("SET", key, "lock"."NX"."EX", 2)
	iferr ! = nil { log.Errorf("SetDelayTopicLock SETNX key:%s, cal:%v, error:%s", key, "lock", err)
		return false, err
	}
	if reply == nil {
		return false, nil
	}
	log.Debugf("SetDelayTopicLock SETNXEX topic:%s lock:%d", topic, false)
	return true, nil
}Copy the code

Ix. Design considerations

Robustness,

Fuse breaker strategy:

There are a number of shortcomings in this version of the design. When Redis is not accessible, a large backlog of requests will strain the machine or instance, causing other services to become unavailable. If the number of retries exceeds the number of alarm times, an atomic operation, atomic.storeInt32 (&stopFlag,1), will be recorded, where stopFlag is a global variable, after atomic.loadInt32 (&stopFlag), The fuse is divided into three levels: on, off and half on. When the timer ends, stopFlag=2. The second time will be half on, and there is a probability of accessing Redis. Otherwise stopFlag=1 to continue the timer

insufficient

1. Call time

The golang write loop executes a timed task in one of three ways:

for {
    time.Sleep(time.Second)
    fmt.Println("test")}Copy the code

Time. Tick function:

t1:=time.Tick(3*time.Second)
for {
    select {
    case <-t1:
        fmt.Println("test")}}Copy the code

3. You can also use the time.Ticker function to obtain the Ticker structure for a scheduled task and then block the listening information. In this way, you can manually stop the scheduled task to reduce memory waste.

t:=time.NewTicker(time.Second)
for {
    select {
    case <-t.C:
        fmt.Println("test")
        t.Stop()
    }
}Copy the code

At the beginning, SLEEP was thought to be processed separately and the coroutine was directly stopped, so sleep was also used in the first version. However, after collecting data, I found that timer was created in all these methods and timed task processing coroutine was added. In fact, the timers generated by these two functions are put into the same timer heap (Golang time wheel) and are waiting to be processed in the scheduled task processing coroutine. The timer structures used by the Tick, Sleep, and time.After functions are all processed in the same coroutine, so there is no difference between Tick and Sleep. In fact, there are differences. This article does not discuss the advantages and disadvantages of time.sleep and time.tick when Golang performs tasks regularly, which will be discussed in subsequent articles. It is more flexible to use channel blocking coroutine to complete scheduled tasks. The timeout period and default execution method can be set together with SELECT, and active shutdown of timer can be set. Therefore, it is recommended to use time.Tick to complete scheduled tasks.

2. The storage module is faulty

At present, it is full cache without DB participation. Firstly, the high availability of Redis (CODIS) is a problem, and the judgment of “inaction” after the fuse is broken is also a problem. Therefore, for the future outlook, the first is:

· Data structure of single machine uses multi-time wheel. To reduce the distance of data, load data asynchronously to the machine to reduce the time loss caused by network I/O. It’s also about reducing reliance on Redis

2· Introduce ZooKeeper or add cluster backup to the leader. Ensure that at least two machines in the cluster load data for one topic, and the leader can coordinate consumption to ensure high availability