The original address

preface

Unexpectedly, I encountered the need for a “priority asynchronous queue” in my daily development. Github has similar functionality, and redis integration libraries include Kue, Bull, etc. However, for the pursuit of “simple, easy, light” for me, in fact, not satisfied. Based on the 80/20 rule, I decided to implement a lightweight open source library that only does 20% of the above mentioned functions: priority-async-queue.

Why are open source libraries named so long? The reason is that I don’t think developers need to look at the library name to know what it does. However, priority-async-queue is simply called “PAQ” for the sake of fluency.

You probably don’tpaq

Following the redux author’s script, first of all, we need to be clear that you probably don’t need PAQ.

You only need paQ when N asynchronous tasks cannot be executed in parallel and can only be executed sequentially.

Simply put, if you need to execute N asynchronous tasks without resource contention and occupation, data sharing, strict logical order, priority comparison, etc., PAQ may not be necessary, but will reduce execution efficiency and affect application performance.

Paq design idea

The design idea of PAQ is very simple, consisting of three categories:

  • TaskIs to describe the execution logic and configuration parameters of each task to be executed (asynchronous/synchronous).
  • PriorityQueueIs the priority queue that controls each task to be executed (asynchronous/synchronous) and has the basic attributes and operations of the queue.
  • AsyncQueueIs a queue that controls the strict order in which each asynchronous/synchronous task to be executed.

Here is the flowchart of paQ:

Paq basic concepts and apis

1. addTask

AddTask is the core method that creates a task and adds it to the PAQ queue.

paq.addTask([options, ]callback);
Copy the code

Options is an optional object containing the following properties:

{
  id: undefined./ / task id
  priority: 'normal'.// Task weights, such as low, normal, mid, high, urgent
  context: null.// The context in which the task is executed
  start: (ctx, options) = > {}, // The callback that the task will execute
  completed: (ctx, res) = > {}, // Callback after task execution is complete
  failed: (ctx, err) = > {},    // Callback after task execution failed
  remove: (ctx, options) = > {} // Callback after the task is deleted
}
Copy the code

Callback is a function that describes the logic to perform the task and takes two arguments: CTX and options:

  • ctxIs the PAQ instance to which the task belongs.
  • optionsIs the final value of the options parameter for this task.
paq.addTask((ctx, options) = > {
  console.log(ctx === paq); // true
});
Copy the code

2. removeTask

The removeTask method removes tasks in the waiting column based on the task ID.

paq.removeTask(taskId);
Copy the code

It returns true if the task was successfully deleted. Otherwise, it returns false.

3. pause

The pause method pauses the PAQ to continue the task.

paq.pause();
Copy the code

Note: However, you cannot pause the currently executing task because the progress of the currently executing asynchronous task cannot be detected.

4. isPause

IsPause returns whether the current queue is paused.

paq.isPause; // return true or false.
Copy the code

5. resume

The resume method is used to restart the PAQ queue to execute tasks.

paq.resume();
Copy the code

6. clearTask

The cleartTask method is used to clear all tasks in the PAQ wait queue.

paq.clearTask();
Copy the code

Paq usage

1. Basic usage

As soon as you add a task to the PAQ, the task is automatically executed.

const PAQ = require('priority-async-queue');
const paq = new PAQ();

paq.addTask((ctx, options) = > {
  console.log('This is a simple task! ');
});

// This is a simple task!
Copy the code

2. Synchronize tasks

You can use PAQ to perform a number of synchronization tasks, such as:

const syncTask = (n) = > {
  for (let i = 0; i < n; i++) {
    paq.addTask((a)= > {
      console.log('Step', i, 'sync');
      returni; }); }}; syncTask(3);

// Step 0 sync
// Step 1 sync
// Step 2 sync
Copy the code

3. Asynchronous tasks

You can also use PAQ to perform a series of asynchronous tasks, such as:

const asyncTask = (n) = > {
  for (let i = 0; i < n; i++) {
    paq.addTask((a)= > {
      return new Promise(resolve= > {
        setTimeout((a)= > {
          console.log('Step', i, 'async');
          resolve(i);
        }, i * 1000); }); }); }}; asyncTask(3);

// Step 0 async
// Step 1 async
// Step 2 async
Copy the code

4. Mix tasks

You can even use PAQ to perform a series of synchronous and asynchronous interleaved tasks, such as:

const mixTask = (n) = > {
  asyncTask(n);
  syncTask(n);
  asyncTask(n);
};

mixTask(2);

// Step 0 async
// Step 1 async
// Step 0 sync
// Step 1 sync
// Step 0 async
// Step 1 async
Copy the code

5. Bind the execution context

Sometimes if you need to specify a context to perform a task, for example:

const contextTask = (n) = > {
  var testObj = {
    name: 'foo'.sayName: (name) = > {
      console.log(name); }};for (let i = 0; i < n; i++) {
    paq.addTask({ context: testObj }, function () {
      this.sayName(this.name + i); }); }}; contextTask(3);

// foo0
// foo1
// foo2
Copy the code

Note: This does not exist in the arrow function, or it points to the context where it is defined.

6. Delay execution

Paq also supports deferred execution of tasks, such as:

const delayTask = (n) = > {
  for (let i = 0; i < n; i++) {
    paq.addTask({ delay: 1000 * i }, () => {
      console.log('Step', i, 'sync');
      returni; }); }}; delayTask(3);

// Step 0 sync
// Step 1 sync
// Step 2 sync
Copy the code

7. Priority

If the tasks to be performed have weights, for example:

const priorityTask = (n) = > {
  for (let i = 0; i < n; i++) {
    paq.addTask({ priority: i === n - 1 ? 'high' : 'normal'= > {}, ()return new Promise(resolve= > {
        setTimeout((a)= > {
          console.log('Step', i, 'async');
          resolve(i);
        }, i * 1000); }); }); }}; priorityTask(5);

// Step 0 async
// Step 4 async
// Step 1 async
// Step 2 async
// Step 3 async
Copy the code

The default priority mapping is as follows:

{
  "low": 1."normal": 0.// default
  "mid": - 1."high": 2 -."urgent": - 3
}
Copy the code

8. Callback functions

Sometimes, you want to be able to do something when a task starts, finishes, fails, gets deleted, for example

const callbackTask = (n) = > {
  for (let i = 0; i < n; i++) {
    paq.addTask({
      id: i,
      start: (ctx, options) = > {
        console.log('start running task id is', options.id);
      },
      completed: (ctx, res) = > {
        console.log('complete, result is', res);
      },
      failed: (ctx, err) = > {
        console.log(err); }}, () => {if (i < n / 2) {
        throw new Error(i + ' is too small! ');
      }
      returni; }); }}; callbackTask(5);

// start running task id is 0
// Error: 0 is too small!
// start running task id is 1
// Error: 1 is too small!
// start running task id is 2
// Error: 2 is too small!
// start running task id is 3
// complete, result is 3
// start running task id is 4
// complete, result is 4
Copy the code

9. Delete the task

Sometimes, you need to delete tasks, such as:

const removeTask = (n) = > {
  for (let i = 0; i < n; i++) {
    paq.addTask({
      id: i,
      remove: (ctx, options) = > {
        console.log('remove task id is', options.id); }}, () => {return new Promise(resolve= > {
        setTimeout((a)= > {
          console.log('Step', i, 'async');
          resolve(i);
        }, i * 1000);
      });
    });
  }
  console.log(paq.removeTask(3));
  console.log(paq.removeTask(5));
};

removeTask(5);

// remove task id is 3
// true
// false
// Step 0 async
// Step 1 async
// Step 2 async
// Step 4 async
Copy the code

Note: You must assign the ID when creating the task and delete the task based on the ID.

Paq event

If you need to monitor the state of the PAQ queue, PAQ provides the following event listeners:

1. addTask

paq.on('addTask', (options) => {
  // Triggered when the queue adds a task.
});
Copy the code

2. startTask

paq.on('startTask', (options) => {
  // Triggered when a task in the queue is about to execute.
});
Copy the code

3. changeTask

paq.on('changeTask', (options) => {
  // Triggered when a task in the queue changes.
});
Copy the code

4. removeTask

paq.on('removeTask', (options) => {
  // Triggered when the queue remove a task.
});
Copy the code

5. completed

paq.on('completed', (options, result) => {
  // Triggered when the task execution in the queue is complete.
});
Copy the code

6. failed

paq.on('failed', (options, err) => {
  // Triggered when a task in the queue fails to execute.
});
Copy the code

Finally, I’d like to add that paQ is a good idea to consider if you encounter a need that promise.all and Promise.race can’t solve.