Asynchronous tool Async/neo-async

Async provides a number of tools to help us handle flow control more elegantly, i.e. the order in which tasks are executed and the timing of calls. The entire library consists of three major parts:

  • Collections: Collection operations
  • ControlFlow: ControlFlow
  • Utils: gadgets

Collections: Collection operations

Collection tool is to deal with all elements of a collection according to certain rules, which is the basis of flow control.

Each series

EachSeries limits the number of concurrent tasks to 1, and eachLimit can define the number of concurrent tasks. The difference between eachOf and each is that the second argument to execute the callback passes the key of the currently executing task. The implementation is relatively simple, just use each element to execute the callback one by one, with the middle of the write function asynchrony and iterator generation processing, if there is an error do not continue to execute after.

EachOfLimit is the basis for the other methods, and most of the functions are used underneath. The collection is first encapsulated as iterators, and then the tasks are executed one by one. Flow limiting is performed before the task is started and after the task completion callback.

Others (Map, filter…)

The other functions are similar in that they process each element of the collection, but the difference is mainly in the structure of the output.

ControlFlow: ControlFlow

Flow control is the flow process of controlling tasks, which are usually represented as a specific function.

Synchronous start task (applyEach/Parallel/times/sortBy/race)

They all start tasks synchronously, but the difference is how they are used. Unlike each, which has specific parameters that apply to all tasks, each has all parameters that apply to a given function.

Perform tasks sequentially (Series/xxxSeries/Waterfall/Retry/Forever)

Series synchronizes all tasks sequentially, and the internal implementation is a single-task PARALLEL. Waterfall transmits the results of one mission to the next.

(Whilst/Until/tryEach)

Whilst and until are similar to while loops, where the task is triggered when a certain condition is reached. TryEach returns as soon as a mission succeeds

Enable tasks based on dependency order (Auto/autoInject)

Task dependence is determined by Kahn algorithm to determine whether there is cyclic dependence, and then synchronously start the task. Auto uses strings to declare dependencies, and autoInject uses function parameters to declare dependencies (matching the parameter list with a re).

// Parse function parameters, automatically inject parameters
function parseParams(func) {
    // The function converts to a string, removing comments
    const src = func.toString().replace(STRIP_COMMENTS, ' ');
    // Matches normal function arguments/arrow function arguments
    let match = src.match(FN_ARGS) || src.match(ARROW_FN_ARGS);
    if(! match)throw new Error('could not parse args in autoInject\nSource:\n' + src)
    / / /...
}
Copy the code

Dynamically adding tasks (Queue/priorityQueue)

Task queues are stored based on bidirectional lists. Task queues are generated when the queue is initialized. You can then add the corresponding event listening method, and the event will fire at the corresponding point in time. To add a task is to execute it.

The task queue uses a bidirectional list to quickly add and delete elements in any position. The task queue also provides the header adding task method unshift/unshiftAsync and the tail adding task method push/pushAsync, as well as the removal of any task method. Iterator (Symbol. Iterator, Symbol. Of or… Method to iterate over the contents of the list.

  • Concurrency: Concurrency of a task. Concurrency can be changed dynamically.
  • Payload: Indicates the number of tasks that can be processed in each time. If the payload is n, the number of tasks that can be passed to the function is n at most.
  • Started: Indicates whether to start task processing. Set to true whenever an element is added to the queue.
  • Pause: Indicates whether to suspend task processing.
  • Idle: Indicates whether the queue is idle. When the task queue and the current processing queue are empty, the queue is idle.
  • Running: Number of tasks being processed.
  • WorkersList: List of tasks currently being processed.
  • Length: indicates the length of the unprocessed task queue.

Events are optional and can be used to dynamically change queue attributes or to perform business during the event-triggering phase. The code uses a publish/subscribe mode to handle events. Event subscriptions can be added when the queue is initialized, and event functions are fired at various stages of the run.

  • Error: This parameter is triggered when the execution fails.
  • Drain: The task queue is triggered after execution.
  • The current task list is equal in length to the number of concurrent tasks.
  • Unsaturated: Triggered when the task is not saturated, the current processing task list length is less than the number of concurrent tasks.
  • Empty: Triggered when the list to be processed is empty.

Adding tasks is the primary method, first wrapping the callback before the data is queued, in this case to accommodate asynchronous Promise callbacks, and then inserting the task into the queue. SetImmediate mediate then uses setImmediate to delay the start task and optimize performance so that multiple simultaneous add-ins only start once. If the current state allows the task to be executed, the task is passed to the worker function, which executes the callback when it is finished. Finally, the callback will immediately start the task to continue the loop.

Utils: gadgets

SetImmediate – Delay function

The following functions can be executed immediately after the return, such as the event loop scheduling delay execution, the difference is that the scheduling timing is different. defer = setImmediate || process.nextTick || setTimeout(fn, 0) (fn, … args) => defer(() => fn(… args));

awaitify

Like Node’s built-in util.promisify, awaitify wraps a Promise function whose last argument is not a callback.

Traditional node asynchronous function calls use callback as the last parameter, and NodeJS implements asynchronous result notification by way of callback. Promise and async/await are ideal methods to solve this problem. Asynchronous functions of callback type can realize chained callback through Promise, and async/await further synchronizes Promise. Finally, the code that calls back layers is converted into synchronous code.

isAsync

You can use this to determine if a function is defined in ES6 async mode

The Promise is determined by whether there is a then function.

function isAsync(fn) {
    return fn[Symbol.toStringTag] === 'AsyncFunction';
}
Copy the code

asyncify

This method mainly wraps the function call as an asynchronous call, with the last parameter as a callback.

The functions used in this library are not only ‘node-style asynchronous functions’, but also ES6 async functions. However, it does not need to pass the callback, so the output is different, so this method is used to mask the relevant differences.

Node-style asynchronous functions: function (arg1, arg2,… , callback) {}), callback is an error message that should be called after execution, ideally before the end of the loop.

The setImmediate mediate mediate mediate mediate mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate Mediate

// Convert the synchronous function to an asynchronous parameter form
function add(arg1, arg2) {
    return arg1 + arg2
}
// Convert promises to asynchronous pass-throughs
function add(arg1, arg2) {
    return new Promise((resolve) = > {
        setTimeout((a)= > {
            resolve(arg1 + arg2)
        }, 1000); })}// Convert async to async
async function add(arg1, arg2) {
    return arg1 + arg2
}
// call asynchronously
const addAsync = async.asyncify(add)
addAsync(1.3, (err, res) => {
    console.log(res)
})
//.catch(e => console.error(e)); Callback cannot be caught here if it throws an exception
Copy the code

EnsureAsync – Stack overflow caused by a callback

This function wraps the input function as a function that must delay execution. Deferred execution is a function that does not execute the callback immediately after it is called, but executes it sometime later. Use the synchronous function’s immediate callback nature to determine whether the wrapper function is running synchronously, and if so, use setImmediate to ensure latency

ES6 async functions are delayed by default

The following code would cause stack overflow in older versions of the library because the next element call would continue immediately after a synchronous callback, and if there were too many synchronous calls, the stack would get deeper and deeper and eventually overflow.

// index.js
const async = require("./dist/async")
const fs = require('fs')
let buffer = Buffer.alloc(10.1);
let i = 0
const cache = {}
function sometimesAsync(arg, callback) {
    if(cache[arg]) {
        if(i++ %1000= = =0) {
            console.trace()
        }
        return callback(null); / / synchronize
    } else {
        fs.writeFile(arg,buffer, (err, res) => {
            cache[arg] = true
            callback(null) / / asynchronous}); }}async.mapSeries(
    new Array(10000).fill('./data'),
    // async.ensureAsync(sometimesAsync), 
    sometimesAsync,
    (err, res) => {
        console.log(cache)
    }
);
Copy the code

The hole has been filled, we look at the location of the source to repair below, try to restore this problem, then we execute code see stack overflow results (commitId: cd6beba687bec8112357ff72b6a610cf245590cd) :

// eachOfLimit.js
// ...
// var looping = false;
// ...
//  else if (!looping) {
// replenish();
// }
else {
    replenish();
}
// looping = true;
// looping = false;
Copy the code

Node –stack_trace_limit=50 index.js

// sometimesAsync Trace at sometimesAsync (/... /index.js:25:21) at eachfn (/... /async/dist/async.js:247:13) at replenish (/... /async/dist/async.js:447:21) at iterateeCallback (/... /async/dist/async.js:431:21) at /... /async/dist/async.js:325:20 at _iteratee (/... /async/dist/async.js:249:17) at sometimesAsync (/... /index.js:28:16) at eachfn (/... /async/dist/async.js:247:13) at replenish (/... /async/dist/async.js:447:21) at iterateeCallback (/... /async/dist/async.js:431:21) at /... /async/dist/async.js:325:20 at _iteratee (/... /async/dist/async.js:249:17) at sometimesAsync (/... /index.js:28:16) ... // async. EnsureAsync (sometimesAsync) Trace at sometimesAsync (/... /index.js:25:21) at /... /async/dist/async.js:2329:16 at eachfn (/... /async/dist/async.js:247:13) at replenish (/... /async/dist/async.js:447:21) at iterateeCallback (/... /async/dist/async.js:431:21) at /... /async/dist/async.js:325:20 at _iteratee (/... /async/dist/async.js:249:17) atsetImmediateThe $1(/... /async/dist/async.js:2324:42) at Immediate.defer [as _onImmediate] (/... /async/dist/async.js:73:45) at runCallback (timers.js:705:18) at tryOnImmediate (timers.js:676:5) at processImmediate (timers.js:658:5)Copy the code

As you can see, if the callback keeps running synchronously, the call stack will keep growing and eventually the stack overflow exception will be reported. If the ensureAsync package is used, the stack will not overflow any number of times.

When writing callback functions, we should follow the principle that if it is a delayed callback, then all exits should be delayed, and if it is a synchronous callback, then all exits should be synchronous, and should not be like sometimesAsync, which has the possibility to execute the callback synchronously and also the possibility to execute the callback delayed.

The resources

Asynchronous flow of NodeJS

async github

Async library document links

neo-async github

designing-apis-for-asynchrony