Flow through a chunk of data can be broken up into a small part of the flowing, little by little without a one-time all read, we can by | symbols under Linux, similar in the Stream of Nodejs module also provides us with pipe () method to implement.

1. Nodejs Stream Pipe basic example

Koa was chosen for this simple Demo because someone on the “Nodejs Technology Stack” forum asked how to return a Stream in Koa, which will be covered below.

1.1 No Stream Pipe is used

In Nodejs, the I/O operations are asynchronous, and the callback form of fs.readFile is converted to Promise form using the promisify method of util module. This code seems to be ok, but the experience is not very good. Because it reads data into memory once and returns it, it is also a memory drain when data files are large, so it is not recommended.

const Koa = require('koa');
const fs = require('fs');
const app = new Koa();
const { promisify } = require('util');
const { resolve } = require('path');
const readFile = promisify(fs.readFile);

app.use(async ctx => {
  try {
    ctx.body = await readFile(resolve(__dirname, 'test.json'));
  } catch(err) { ctx.body = err };
});

app.listen(3000);
Copy the code

1.2 Using Stream Pipe

Now, how do I respond to data in the Koa framework via Stream

. app.use(async ctx => {
  try {
    const readable = fs.createReadStream(resolve(__dirname, 'test.json'));
    ctx.body = readable;
  } catch(err) { ctx.body = err };
});
Copy the code

In Koa, create a readable stream and assign it to ctx.body. You may be wondering why there is no pipe method, because the framework encapsulates ctx.body.

// https://github.com/koajs/koa/blob/master/lib/application.js#L256
function respond(ctx) {... let body = ctx.body;if (body instanceof Stream) returnbody.pipe(res); . }Copy the code

No magic, the framework makes a layer judgment when it returns, because res is a writable Stream, and if the body is also a Stream (in which case the body is a readable Stream), it responds as a Stream with body.pipe(res).

1.3 Using Stream VS not using Stream

To see a picture, have to say the painting was too ‘, source www.cnblogs.com/vajoy/p/634…

2 pipe call process and implementation principle analysis

The core implementation of pipe is to use the pipe method to implement the input and output, the focus of this section is also to study the implementation of PIPE, the best way to open it through reading the source code.

2.1 Follow the lead

In the application layer, we call fs.createreadstream () method and find the pipe method implementation of the readable stream object created by this method. The following only lists the core code implementation, based on Nodejs V12.x source code.

2.1.1 / lib/fs. Js

Export a createReadStream method that creates a ReadStream readable object from an internal/fs/streams file.

// https://github.com/nodejs/node/blob/v12.x/lib/fs.js
ReadStream, WriteStream, ReadStream, WriteStream... Objects such as
function lazyLoadStreams() {
  if(! ReadStream) { ({ ReadStream, WriteStream } =require('internal/fs/streams')); [ FileReadStream, FileWriteStream ] = [ ReadStream, WriteStream ]; }}function createReadStream(path, options) {
  lazyLoadStreams();
  return new ReadStream(path, options); // Create a readable stream
}

module.exports = fs = {
  createReadStream, Export the createReadStream method. }Copy the code

2.1.2 /lib/internal/fs/streams.js

This method defines the constructor ReadStream, and the prototype defines methods like open, _read, and _destroy, but not the pipe method we’re looking for.

However, inheritance is implemented through the ObjectSetPrototypeOf method, ReadStream inherits the functions defined by Readable in the prototype, and then continues to find the implementation of Readable.

// https://github.com/nodejs/node/blob/v12.x/lib/internal/fs/streams.js
const { Readable, Writable } = require('stream');

function ReadStream(path, options) {
  if(! (this instanceof ReadStream))
    return newReadStream(path, options); . Readable.call(this, options); . } ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype); ObjectSetPrototypeOf(ReadStream, Readable); ReadStream.prototype.open =function() {... }; ReadStream.prototype._read =function(n) {... };; ReadStream.prototype._destroy =function(err, cb) {... }; . module.exports = { ReadStream, WriteStream };Copy the code

2.1.3 / lib/stream. Js

In the stream. Js implementation, there is a note: in a Readable/Writable/Duplex /… We imported Stream to avoid cross-reference(require).

The first stream.js step here copies the require(‘internal/streams/legacy’) export to the stream.

After _stream_readable, Writable, Duplex… The module also references the stream.js file in turn, as you’ll see below.

Stream imports internal/ Streams/Legacy

Above/lib/internal/fs/streams. The js files from access to a Readable stream module object, is the stream below. The definition of Readable.

// https://github.com/nodejs/node/blob/v12.x/lib/stream.js
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
const Stream = module.exports = require('internal/streams/legacy');

Stream.Readable = require('_stream_readable');
Stream.Writable = require('_stream_writable');
Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough'); .Copy the code

2.1.4 / lib/internal/streams/legacy. Js

Stream = internal/streams/ Legacy, inherits the Events module, and defines the pipe method on the prototype. When I first saw this, I thought it was there. However, after looking at the implementation of _stream_readable, we find that _stream_readable inherits Stream and then implements pipe itself. When will it be used? Translate file name “legacy= legacy”? I don’t understand. Is it left behind? There are clear leaders can give advice, also welcome to the public account “Nodejs technology stack” background add my wechat discussion together!

// https://github.com/nodejs/node/blob/v12.x/lib/internal/streams/legacy.js
const {
  ObjectSetPrototypeOf,
} = primordials;
const EE = require('events');
function Stream(opts) {
  EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);

Stream.prototype.pipe = function(dest, options) {... };module.exports = Stream;
Copy the code

2.1.5 / lib / _stream_readable. Js

The Readable constructor is defined in the _stream_readable.js implementation and inherits from the Stream, which is the /lib/stream.js file we mentioned above, Instead, an internal/streams/legacy file was loaded in the /lib/stream.js file and the pipe method defined in it was overridden.

After a series of analyses above, we finally find out where the pipe of the readable stream is, and we also have a better understanding of how to execute the call when creating a readable stream. We will focus on the implementation of this method.

module.exports = Readable;
Readable.ReadableState = ReadableState;

const EE = require('events');
const Stream = require('stream');

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if(! (this instanceof Readable))
    return newReadable(options); . Stream.call(this, options); // inherits the definition of the Stream constructor}...Copy the code

2.2 _stream_readable implementation analysis

2.2.1 Declare the constructor Readable

The Readable constructor inherits the Stream constructor and prototype.

Stream is a /lib/stream.js file that inherits events events and has properties defined in the events prototype, such as on, emit, etc.

const Stream = require('stream');
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

function Readable(options) {
  if(! (this instanceof Readable))
    return newReadable(options); . Stream.call(this, options);
}
Copy the code

2.2.2 Declare pipe method to subscribe data event

Declare the PIPE method on the Stream prototype, subscribing to the data event with SRC as the readable Stream object and dest as the writable Stream object.

We also listen for data events when using the PIPE method, reading data while writing data.

Take a look at some of the core implementations of the ondata() method:

  • Ded. write(Chunk) : Receives chunk writes, returns true if the internal buffer is smaller than the highWaterMark configured when creating the stream, otherwise returns false and should stop writing data to the stream until the ‘drain’ event is emitted.
  • Src.pause () : The readable stream stops the data event, meaning that writing to the data has been suspended.

The reason src.pause() is called is to prevent data from being read too fast to be written to the stream. If dest.write(chunk) returns false, the highWaterMark property passed to the stream is used to create the stream. The default is 16384 (16KB), and the stream in object mode defaults to 16.

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {... src.pause(); }}... };Copy the code

2.2.3 Subscribe drain event to continue data flow

In a data event, if a call to dest.write(chunk) returns false, src.pause() is called to stop the flow of data.

If the drain event is emitted when it is possible to continue writing events to the stream, the drain event is also registered when dest.write(chunk) is equal to false if ondrain does not exist.

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  src.on('data', ondata);
  function ondata(chunk) {
    const ret = dest.write(chunk);
    if (ret === false) {
      ...
      if (!ondrain) {
        // When the dest drains, it reduces the awaitDrain counter
        // on the source. This would be more elegant with a .once()
        // handler in flow(), but adding and removing repeatedly is
        // too slow.
        ondrain = pipeOnDrain(src);
        dest.on('drain', ondrain); } src.pause(); }}... };// When writable stream Dest is exhausted, it reduces the awaitDrain counter on the readable stream object source
// To ensure that all writes that need to be buffered are complete, i.e., state.awaitdrain === 0 and data events exist on the SRC readable stream, switch the stream to flow mode
function pipeOnDrain(src) {
  return function pipeOnDrainFunctionResult() {
    const state = src._readableState;
    debug('pipeOnDrain', state.awaitDrain);
    if (state.awaitDrain)
      state.awaitDrain--;
    if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
      state.flowing = true; flow(src); }}; }// stream.read() pulls data from the internal buffer and returns it. If no data is readable, null is returned. SRC also has a readable attribute on a readable stream, which is true if readable.read() can be safely called
function flow(stream) {
  const state = stream._readableState;
  debug('flow', state.flowing);
  while(state.flowing && stream.read() ! = =null);
}
Copy the code

2.2.4 Triggering a Data event

The readable Resume () method is called to trigger the ‘data’ event for the readable stream and enter flow mode.

Readable.prototype.pipe = function(dest, options) {
  const src = this;
  // Start the flow if it hasn't been started already.
  if(! state.flowing) { debug('pipe resume'); src.resume(); }...Copy the code

Resume (as defined on the Readable prototype) on the instance will then call the Resume () method, then call resume_() inside that method, and finally execute stream.read(0) to read the empty data once (size is set to 0). Will fire the _read() method on the instance, which will then fire the data event.

function resume(stream, state) {... process.nextTick(resume_, stream, state); }function resume_(stream, state) {
  debug('resume', state.reading);
  if(! state.reading) { stream.read(0); }... }Copy the code

2.2.5 Subscribing to the End event

End event: Triggered when there is no data in the readable stream to consume. Call onend, execute dest.end() to indicate that there is no data to be written to the writable stream. Close (close the fd of the writable stream), and then call stream.write() will result in an error.

Readable.prototype.pipe = function(dest, options) {... const doEnd = (! pipeOpts || pipeOpts.end ! = =false) && dest ! == process.stdout && dest ! == process.stderr;const endFn = doEnd ? onend : unpipe;
  if (state.endEmitted)
    process.nextTick(endFn);
  else
    src.once('end', endFn);

  dest.on('unpipe', onunpipe); . function onend() { debug('onend'); dest.end(); }}Copy the code

2.2.6 Triggering the PIPE Event

The pipe method also ends with a PIPE event passing in the readable stream object

Readable.prototype.pipe = function(dest, options) {... const source =this;
  dest.emit('pipe', src); . };Copy the code

When used in the application layer, you can subscribe to pipe events on writable streams to make some judgments. See stream_event_PIPE for examples

2.2.7 Support chain call

A.ipe (B).pipe(C)

Readable.prototype.pipe = function(dest, options) {
  return dest;
};
Copy the code

3. Summary

This paper is divided into two parts:

  • The first part is relatively basic, explaining how the Nodejs Stream pipe method is used in Koa2.
  • The Nodejs Stream Pipe method is still used to listen for data events and write data to a writable Stream. If the buffer is larger than the highWaterMark configured to create the Stream, the pipe method is used to listen for data events and write data to a writable Stream. Stop the flow of data until the drain event fires or ends, and of course listen for events such as end and error to do some processing.

4. Reference

  • nodejs.cn/api/stream.html
  • cnodejs.org/topic/56ba030271204e03637a3870
  • github.com/nodejs/node/blob/master/lib/_stream_readable.js