If you are interested in NodeJs series, please follow our wechat official account: Front-end S.H.I.E.L.D. or Github NodeJs series articles

Streams have emerged from the early days of Unix, and over the past few decades they have proved to be a reliable way of programming a large system into small parts that work perfectly together.

Streams are ubiquitous in Node, whether it’s manipulating files, creating a local server, or simply using a console.

There are four basic stream types in Node.js:

  • Readable – A stream that can read data (for example, fs.createreadStream ()).
  • Writable – A stream that can write data (for example, fs.createWritestream ()).
  • Duplex – a stream that is both readable and writable (such as net.socket).
  • Transform – A Duplex stream that can modify or Transform data while reading or writing (e.g. Zlib.createdeflate ())

Why use streams

Suppose we need to implement a simple static file server using Node:

const http = require('http');
const fs = require('fs');

http.createServer((req,res)=>{
    fs.readFile('./test.html'.function(err,data){
        if(err){
            res.statusCode = 500;
            res.end();
        }else{
            res.end(data);
        }
    })
}).listen(3000)
Copy the code

The above code simply realizes the static file read and send, logically is completely feasible. But because readFile is stored in memory at once, the server may run out of memory if the test.html file is very large or if the number of visits increases. At this point, we need to use the flow method to improve:

const http = require('http');
const fs = require('fs');

http.createServer((req,res)=>{
    fs.createReadStream('./test.html').pipe(res);
}).listen(3000);

Copy the code

Fs.createreadstream creates a readable stream that reads the contents of a file one by one for downstream consumption. This kind of gradual reading and consumption mode effectively slows down memory consumption.

Readable Stream

We can split Readable Stream into two phases: Push phase and pull phase. In the push phase, data is pushed from the underlying data resource pool to the cache pool by implementing the _read method, which is the production phase of data, while in the pull phase, data is pulled from the cache pool for downstream use, which is the data consumption phase.

Before we go any further, let’s look at a few fields from the Node source:

  • state.buffer: ArrayCache pool, where each element corresponds to data in push(data)
  • state.length: NumberThe amount of data in the cache pool, inobjectModeMode,state.length === state.buffer.lengthOtherwise, the value isstate.bufferThe total number of bytes of data in
  • state.ended: BooleanIndicates that the underlying datapool has no readable data.this.pull(null))
  • state.flowing: Null|BooleanRepresents the mode of the current stream, and its value can be in three cases:null(Initial state),true(Flow pattern),false(Pause mode)
  • state.needReadable: BooleanWhether to triggerreadableThe event
  • state.reading: BooleanWhether underlying data is being read
  • state.sync: BooleanWhether triggered immediatelydata/readableThe event,falseIs triggered immediately,trueThe next tick fires (process.nextTick)

Two modes

There are two types of readable flows: flowing (FLOWING) and paused (paused), identified in the source code using state.flowing.

The basic flow of both patterns follows the push and pull phases shown in the figure above, but the difference lies in the autonomy of the Pull phase. For flow mode, as long as there is unconsumed data in the cache pool, the data will continue to be extracted. We can think of it as an automatic pump, which will not stop until the water is drained from the pool as long as the power is turned on. For pause mode, it’s more like filling a bucket and pulling water out of the sink when needed.

All readable streams start in pause mode and can be switched to flow mode by:

  • adddataEvent handle (the premise isstate.flowing === null)
  • callstream.resume()
  • callstream.pipe()

Readable streams can also be switched back to pause mode by:

  • addreadableEvent handlers
  • Is called if there is no pipe targetstream.pause().
  • If there are pipe targets, remove all pipe targets. callstream.unpipe()Multiple pipeline targets can be removed.

Everything fromreadstart

In the case of readable streams, consumption drives production, and only by calling the read function in the pull phase can the data generation in the push phase be awakened to drive the flow. So for readable streams, read is where everything starts.

This is a simple flow chart according to the source code, some of the links will be explained later.

howMuchToRead

During the read(n) call, Node adjusts the number of reads as required, depending on howMuchRead

function howMuchToRead(n,state){
  // If size <= 0 or there is no readable data
  if (n <= 0 || (state.length === 0 && state.ended))
    return 0;
    
  // objectMode holds one unit of data at a time
  if (state.objectMode)
    return 1;
    
  // If size is not specified
  if (Number.isNaN(n)) {
    // When executing read(), since the data is continuously output in flow mode,
    // So only the first element in the cache is output at a time, whereas non-flow mode will read the cache empty
    if (state.flowing && state.length)
      return state.buffer.head.data.length;
    else
      return state.length;
  }
  
  if (n > state.highWaterMark)
    / / update the highWaterMark
    state.highWaterMark = computeNewHighWaterMark(n);

  // If the amount of data in the cache is sufficient
  if (n <= state.length)
    return n;
    
  // If there is not enough data in the cache,
  // If there is still data to read from the resource pool, do not read from the cache
  // Save the data for the next time when there is enough data
  // Otherwise read empty cache pool
  if(! state.ended) { state.needReadable =true;
    return 0;
  }
  return state.length;
}
Copy the code

endThe event

During a read call, Node determines whether to trigger the end event based on two criteria:

if (state.length === 0 && state.ended) endReadable(this);
Copy the code
  1. The underlying data (resource) has no readable datastate.endedfortrue.

Call pull(null) to indicate that the underlying data is currently unreadable

  1. There is no readable data in the cache poolstate.length === 0

This event is emitted when read([size]) is called (if the above conditions are met)

doRead

DoRead is used to determine whether to read underlying data

  // If the current pause mode is' state.needreadable '
  var doRead = state.needReadable;
  
  // If the current cache pool is empty or there is not enough cache
  if (state.length === 0 || state.length - n < state.highWaterMark){
    doRead = true;
  }

  if (state.ended || state.reading) {
    doRead = false;
  } else if (doRead) {
    // ...
    this._read(state.highWaterMark);
    // ...
  }
Copy the code

State.reading indicates whether the last fetch from the bottom has been completed and is set to false once the push method is called, indicating that _read() is finished

dataThe event

It was mentioned in the official documentation that adding a Data event handle could switch the Readable Stream’s mode to flowing mode, but what was not officially mentioned was the condition under which this was true — state.flowing’s value was not null, meaning that the flowing mode would be triggered by listening for data events only in its initial state. Here’s an example:

const { Readable } = require('stream');

class ExampleReadable extends Readable{
  constructor(opt){
    super(opt);
    this._time = 0;
  }
  _read(){
    this.push(String(+ +this._time)); }}const exampleReadable = new ExampleReadable();
// Suspend state.flowing === false
exampleReadable.pause();
exampleReadable.on('data',(chunk)=>{
  console.log(`Received ${chunk.length} bytes of data.`);
});
Copy the code

Running this example, we find that the terminal has no output. Why is this? The reason can be seen from the source code

 if(state.flowing ! = =false)
      this.resume();
Copy the code

From there we can refine the official statement a bit: Adding a data event handle to a readable stream in its initial state (state.flowing === NULL) would cause the stream to flow.

push

Can only be called by an implementation of a readable stream, and only from the readable._read() method.

Push is at the heart of data production, with the consumer calling read(n) to force the stream to output data, while the flow calls push methods to pass data to the stream through _read().

In this process, the push method may either store the data in the cache pool or output it directly through the data event. Let’s analyze them one by one.

If the current flow was flowing (state.flowing === true) and there was no readable data in the cache pool, the data would be output directly by the event data

/ / source node
if (state.flowing && state.length === 0 && !state.sync){
    state.awaitDrain = 0;
    stream.emit('data', chunk);
} 
Copy the code

Let’s take an example:

const { Readable } = require('stream');

class ExampleReadable extends Readable{
  constructor(opt){
    super(opt);
    this.max = 100;
    this.time = 0;
  }
  _read(){
    const seed = setTimeout((a)= >{
      if(this.time > 100) {this.push(null);
      }else{
        this.push(String(+ +this.time));
      }
      clearTimeout(seed);
    },0)}}const exampleReadable = new ExampleReadable({ });
exampleReadable.on('data',(data)=>{
  console.log('from data',data);
});
Copy the code

readableThe event

exampleReadable.on('readable', () = > {... });Copy the code

When we register a readable event, Node will do the following:

  1. Switch the stream to pause mode
state.flowing = false; 
state.needReadable = true;
Copy the code
  1. Trigger if the cache pool does not consume datareadable.
stream.emit('readable');
Copy the code
  1. Otherwise, check whether the underlying data is being read. If not, nextTick reads the underlying dataself.read(0);
The trigger condition
  1. state.flow === falseCurrently in pause mode
  2. There is still data in the cache pool or the underlying data of this round has been readstate.length || state.ended
return! state.ended && (state.length < state.highWaterMark || state.length === 0);Copy the code

reference

  • Node. Js v10.15.1 document
  • Have an in-depth understanding of the internal mechanics of Node.js Stream
  • stream-handbook
  • How to graphically describe the Backpressure mechanism in reactive programming?
  • Backlogs in data flow
  • Node.js Stream – Advanced chapter
  • Node Stream