Official account: Java Xiaokaxiu, website: Javaxks.com

Author: the wind’s blog, link: blog.csdn.net/qq330983778…

design

When I studied Redis, I found that the “Like” team shared an article about the design of delay queue:There is a like delay queueLet’s try that now

The business process

First, let’s analyze the process

  1. The user submits a task. The task is first pushed to a delay queue.
  2. After receiving a task, the delay queue pushes the task to the job Pool and calculates the execution time.
  3. The deferred task (containing only the task ID) is then generated and placed into a bucket
  4. The time component polls each bucket at any time and obtains task meta information from the job Pool when the time arrives.
  5. Pass if the validity of the monitoring task has been deleted. Continue polling. If the task is valid, the time is counted again
  6. If the time is valid, the time is calculated. If the time is valid, the task is put into the corresponding Ready queue by topic and then removed from the bucket. If the time is not valid, the time is recalculated and the contents of the previous bucket are removed
  7. The consumer polls the ready queue for the corresponding topic. After obtaining a job, perform your own service logic. At the same time, the server recalculates the execution time of the job obtained by the consumer according to the preset TTR and adds the job to the bucket.
  8. After the consumption is complete, the finish message is sent, and the server deletes the corresponding information based on the job ID.

User tasks pool delay task cycle time to complete the task submitted task submitted to delay tasks polling tasks task has reached the time users get the task complete set the timeout, then save the timeout in the quest to delay task to complete tasks or task deletion detected remove user tasks pool does not exist the queue delay task time cycle to complete the task

object

We can now see several components in the middle

  1. Delay queue, Redis delay queue. Implementing message passing
  2. Job Pool Job pool saves meta Job information. The K/V data structure is used, where key is ID and value is job
  3. Delay Bucket Delay Bucket is used to save services. The article describes the use of polling method to put a Bucket can be seen that it does not use topic to distinguish, the default for individuals to use sequential inserts
  4. Timer Time component that scans each Bucket. According to the article, there are multiple timers, but one Timer can scan only one Bucket at a time
  5. The Ready Queue is responsible for the tasks that need to be completed, but there are multiple Ready queues depending on the description and Topic.

Timer is responsible for polling, and Job Pool, Delay Bucket, and Ready Queue are collections of different responsibilities.

Task status

  • Ready: the executable state,
  • Delay: indicates that the clock is in the unexecutable state.
  • Reserved: has been read by the consumer, but has not completed consumption.
  • Deleted: Indicates that the consumption is complete or has been deleted.

External interfaces

interface describe data
add Add tasks Job data
pop Example Remove the pending task Topic is a group of tasks
finish To complete the task Task ID
delete Delete the task Task ID

Extra content

  1. First, according to the status description, both finish and DELETE operations set the task to a deleted state.
  2. According to the operations described in this article, the task may have been removed from the metadata by the time the finish or delete operations are performed, and the deleted state may only exist for a very short time, so it is deleted directly in the actual implementation.
  3. The article doesn’t say what happens when the response times out, so the individual is now putting it back on the pending queue.
  4. Because clustering is used in this article, Redis setNx locks are used to ensure that multiple time loops do not duplicate buckets. Since this is a simple implementation, it is very simple to set up a time queue processing per bucket. It’s also for convenience and simplicity. Distributed locking is described in my previous article.

implementation

Now we finish the design according to the design content. We completed this part of the design in four steps

Tasks and related objects

We need two objects, one is the job and the other is the delay job that saves the reference to the task.

The task object

@data @allargsconstructor @noargsconstructor public class Job implements Serializable {/** * */ @jsonSerialize (using = ToStringSerializer.class) private Long ID; /** * task type (specific business type) */ private String topic; /** * private long delayTime; /** * private long ttrTime; /** * Task specific message content, used to process specific business logic */ private String message; /** ** private int retryCount; /** * Task status */ private JobStatus status; }Copy the code

Task reference object

@data @allargsconstructor public class DelayJob implements Serializable {/** * private long jodId; /** * private long delayDate; /** * task type (specific business type) */ private String topic; public DelayJob(Job job) { this.jodId = job.getId(); this.delayDate = System.currentTimeMillis() + job.getDelayTime(); this.topic = job.getTopic(); } public DelayJob(Object value, Double score) { this.jodId = Long.parseLong(String.valueOf(value)); this.delayDate = System.currentTimeMillis() + score.longValue(); }}Copy the code

The container

Now we need to create three containers: Job task pool, delayed task container, and pending task container

The job task pool provides basic operations for the common K/V structure

@Component @Slf4j public class JobPool { @Autowired private RedisTemplate redisTemplate; private String NAME = "job.pool"; private BoundHashOperations getPool () { BoundHashOperations ops = redisTemplate.boundHashOps(NAME); return ops; } /** * addJob * @param job */ public void addJob (job job) {log.info(" addJob: {}", json.tojsonstring (job)); getPool().put(job.getId(),job); return ; } public Job getJob(Long jobId) {Object o = getPool().get(jobId); if (o instanceof Job) { return (Job) o; } return null; } @param jobId public void removeDelayJob (Long jobId) {log.info(" jobId: {}",jobId); // Remove task getPool().delete(jobId); }}Copy the code

Delay tasks, use sortable Zsets to store data, provide operations such as fetching minimum values

@Slf4j @Component public class DelayBucket { @Autowired private RedisTemplate redisTemplate; private static AtomicInteger index = new AtomicInteger(0); @Value("${thread.size}") private int bucketsSize; private List <String> bucketNames = new ArrayList <>(); @Bean public List <String> createBuckets() { for (int i = 0; i < bucketsSize; i++) { bucketNames.add("bucket" + i); } return bucketNames; } private String getThisBucketName() {int thisIndex = index.addAndGet(1); int i1 = thisIndex % bucketsSize; return bucketNames.get(i1); } @param bucketName * @return */ private BoundZSetOperations getBucket(String bucketName) {return redisTemplate.boundZSetOps(bucketName); } @param job */ public void addDelayJob(DelayJob) {log.info(" Add delayed job :{}", json.tojsonString (job)); String thisBucketName = getThisBucketName(); BoundZSetOperations bucket = getBucket(thisBucketName); bucket.add(job,job.getDelayDate()); } @return */ public DelayJob getFirstDelayTime(Integer index) {String name = bucketnames.get (index); BoundZSetOperations bucket = getBucket(name); Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1); if (CollectionUtils.isEmpty(set)) { return null; } ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0]; Object value = typedTuple.getValue(); if (value instanceof DelayJob) { return (DelayJob) value; } return null; } /** * Remove a delayed task * @param index * @param delayJob */ public void removeDelayTime(Integer index, delayJob delayJob) {String  name = bucketNames.get(index); BoundZSetOperations bucket = getBucket(name); bucket.remove(delayJob); }}Copy the code

The tasks to be completed are subdivided internally using topics, each corresponding to a list collection

@Component @Slf4j public class ReadyQueue { @Autowired private RedisTemplate redisTemplate; private String NAME = "process.queue"; private String getKey(String topic) { return NAME + topic; } /** * getQueue * @param topic * @return */ private BoundListOperations getQueue (String topic) {BoundListOperations ops =  redisTemplate.boundListOps(getKey(topic)); return ops; } /** * set task * @param delayJob */ public void pushJob(delayJob delayJob) {log.info(" Execute queue add task :{}",delayJob); BoundListOperations listOperations = getQueue(delayJob.getTopic()); listOperations.leftPush(delayJob); } @param topic * @return */ public DelayJob popJob(String topic) {BoundListOperations listOperations = getQueue(topic); Object o = listOperations.leftPop(); If (o instanceof DelayJob) {log.info(" Execute queue fetch task :{}", json.tojsonString ((DelayJob) o)); return (DelayJob) o; } return null; }}Copy the code

The polling process

Thread pools are set up to set up a polling operation for each bucket

@Component public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> { @Autowired private DelayBucket delayBucket; @Autowired private JobPool jobPool; @Autowired private ReadyQueue readyQueue; @Value("${thread.size}") private int length; @Override public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { ExecutorService executorService = new ThreadPoolExecutor( length, length, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); for (int i = 0; i < length; i++) { executorService.execute( new DelayJobHandler( delayBucket, jobPool, readyQueue, i)); }}}Copy the code

The test request

@author daify * @date 2019-07-29 10:26 **/ @restController @requestMapping ("delay") public class DelayController { @Autowired private JobService jobService; /** * add * @param Request * @return */ @requestMapping (value = "add",method = requestMethod.post) public String addDefJob(Job request) { DelayJob delayJob = jobService.addDefJob(request); return JSON.toJSONString(delayJob); } @return */ @requestMapping (value = "pop",method = requestMethod.get) public String getProcessJob(String topic) { Job process = jobService.getProcessJob(topic); return JSON.toJSONString(process); } @param jobId @return @requestMapping (value = "finish",method = requestmethod.delete) public String finishJob(Long jobId) { jobService.finishJob(jobId); return "success"; } @RequestMapping(value = "delete",method = RequestMethod.DELETE) public String deleteJob(Long jobId) { jobService.deleteJob(jobId); return "success"; }}Copy the code

test

Adding a Deferred Task

Request localhost:8000/delay/add via postman

The delayed task is now added to the thread pool

The 2019-08-12 21:21:36. 21444-589 the INFO [nio - 8000 - exec - 6] d.s amples. Redis. Delay. Container. JobPool: Adding a task to a task pool: {"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000} The 2019-08-12 21:21:36. 21444-609 the INFO [nio - 8000 - exec - 6] D.S.R edis. Delay. Container. DelayBucket: Add delay task :{"delayDate":1565616106609,"jodId":3,"topic":"test"}Copy the code

The task will be added to the ReadyQueue after 10 seconds according to the Settings

The 2019-08-12 21:21:46. 21444-744 the INFO [] - thread pool - 1-4 D.S.R edis. Delay. Container. ReadyQueue: DelayJob(jodId=3, delayDate=1565616106609, topic=test)Copy the code

For the task

Localhost :8000/delay/pop

At this point, the task is responded, its status is changed, its timeout is set, and then placed in the DelayBucket

The 2019-08-09 19:36:02. 58456-342 the INFO [nio - 8000 - exec - 3] D.S.R edis. Delay. Container. ReadyQueue: {"delayDate":1565321728704,"jodId":1,"topic":" test "} 2019-08-09 19:36:02.364 INFO 58456 -- [NIO-8000-exec-3] d.samples.redis.delay.container.JobPool : Adding a task to a task pool: {"delayTime":10000,"id":1,"message":" delay 10 seconds, ","retryCount":0,"status":"RESERVED","topic":" test ","ttrTime":30000} 2019-08-09 19:36:02.384 INFO 58456 -- [nio - 8000 - exec - 3] D.S.R edis. Delay. Container. DelayBucket: add a delay task: {" delayDate: "1565321792364," jodId ": 1," topic ":" test "}Copy the code

By design, after 30 seconds, the task will be replaced in the ReadyQueue if it has not been consumed

The 2019-08-12 21:21:48. 21444-239 the INFO [nio - 8000 - exec - 7] D.S.R edis. Delay. Container. ReadyQueue: {"delayDate":1565616106609,"jodId":3,"topic":"test"} 2019-08-12 21:21:482.261 INFO 21444 -- [NIO-8000-exec-7] d.samples.redis.delay.container.JobPool : Adding a task to a task pool: {"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}Copy the code

Deletion/consumption of tasks

Now we request: localhost: 8000 / delay/delete

In this case, the task will be removed from the Job pool, and the metadata will no longer exist, but the task will continue to circulate in the DelayBucket. However, if the metadata does not exist in the DelayBucket, the delayed task will be removed.

The 2019-08-12 21:21:54. 21444-880 the INFO [nio - 8000 - exec - 8] d.s amples. Redis. Delay. Container. JobPool: Remove tasks: task pool 3 2019-08-12 21:21:59. 21444-104 the INFO [- thread pool - 1-5] D.S.R edis. Delay. Handler. DelayJobHandler: Remove non-existent task :{"delayDate":1565616118261,"jodId":3,"topic":"test"}Copy the code

Download the source code for this article: gitee.com/daifyutils/…