How do you move something from A to B?

Pick it up, move it to your destination, put it down.

What if this thing weighs a ton?

Then move it in pieces.

In fact, IO is to move things, including network IO, file IO, if the amount of data is small, then directly transfer all the content on the line, but if the content is particularly large, one-time loading into memory will crash, and the speed is also slow, at this time can be part of a part of the processing, this is the idea of flow.

The Stream API is commonly used in many languages, including Node.js. Let’s explore the stream API.

This article answers the following questions:

  • What are the four streams of Node.js
  • How can generators be combined with Readable Stream
  • The suspension and flow of stream
  • What is the back pressure problem and how to solve it

Four streams of Node.js

Intuitive sense of flow

To flow from one place to another, there is obviously an outflow side and an inflow side. The outflow side is readable and the inflow side is writable.

Of course, there are also streams that can flow in and out, called duplex streams.

Since it can flow in and out, is it possible to transform the incoming content down and out again? This flow is called transform flow.

The incoming and outgoing contents of a Duplex flow need not be related, whereas the incoming and outgoing contents of a Transform flow are related, which is the difference between the two.

Flow of the API

Node.js provides streams of the four types described above:

const stream = require('stream');

/ / readable stream
const Readable = stream.Readable;
/ / write flow
const Writable = stream.Writable;
/ / duplex flow
const Duplex = stream.Duplex;
/ / transition flow
const Transform = stream.Transform;
Copy the code

They all have methods to implement:

  • Readable requires the implementation of the _read method to return the content
  • Writable needs to implement the _write method to accept content
  • Duplex needs to implement the _read and _write methods to accept and return content
  • Transform implements the _transform method to Transform the accepted content and return it

Let’s take a look at each:

Readable

Readable implements the _read method, which returns concrete data through push.

const Stream = require('stream');

const readableStream = Stream.Readable();

readableStream._read = function({
    this.push('Amen to the vine before,');
    this.push('Adon adon green just sprouted,');
    this.push('Adon is carrying that heavy shell,');
    this.push('One step at a time. ')
    this.push(null);
}

readableStream.on('data'.(data) = > {
    console.log(data.toString())
});

readableStream.on('end'.() = > {
    console.log('done~');
});
Copy the code

When a NULL is pushed, it terminates the stream.

The execution effect is as follows:

Creating Readable can also be done through inheritance:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {

    constructor() {
        super(a); }_read() {
        this.push('Amen to the vine before,');
        this.push('Adon adon green just sprouted,');
        this.push('Adon is carrying that heavy shell,');
        this.push('One step at a time. ')
        this.push(null); }}const readableStream = new ReadableDong();

readableStream.on('data'.(data) = > {
    console.log(data.toString())
});

readableStream.on('end'.() = > {
    console.log('done~');
});
Copy the code

Readable streams generate content, so it’s natural to combine them with generators:

const Stream = require('stream');

class ReadableDong extends Stream.Readable {

    constructor(iterator) {
        super(a);this.iterator = iterator;
    }

    _read() {
        const next = this.iterator.next();
        if(next.done) {
            return this.push(null);
        } else {
            this.push(next.value)
        }
    }

}

function *songGenerator({
    yield 'Amen to the vine before,';
    yield 'Adon adon green just sprouted,';
    yield 'Adon is carrying that heavy shell,';
    yield 'One step at a time. ';
}

const songIterator = songGenerator();

const readableStream = new ReadableDong(songIterator);

readableStream.on('data'.(data) = > {
    console.log(data.toString())
});

readableStream.on('end'.() = > {
    console.log('done~');
});
Copy the code

This is a readable stream, which returns content by implementing the _read method.

Writable

Writable implements the _write method to receive written content.

const Stream = require('stream');

const writableStream = Stream.Writable();

writableStream._write = function (data, enc, next{
   console.log(data.toString());
   // Write once per second
   setTimeout(() = > {
       next();
   }, 1000);
}

writableStream.on('finish'.() = > console.log('done~'));

writableStream.write('Amen to the vine before,');
writableStream.write('Adon adon green just sprouted,');
writableStream.write('Adon is carrying that heavy shell,');
writableStream.write('One step at a time. ');
writableStream.end();
Copy the code

It receives what is written, prints it out, and calls next to process the next write, which is asynchronous and can be controlled with frequency.

After running a bit, we can actually handle the contents of the write normally:

This is the writable flow, which handles what is written by implementing the _write method.

Duplex

Duplex is readable and writable, and implements both _read and _write

const Stream = require('stream');

var duplexStream = Stream.Duplex();

duplexStream._read = function ({
    this.push('Amen to the vine before,');
    this.push('Adon adon green just sprouted,');
    this.push('Adon is carrying that heavy shell,');
    this.push('One step at a time. ')
    this.push(null);
}

duplexStream._write = function (data, enc, next{
    console.log(data.toString());
    next();
}

duplexStream.on('data'.data= > console.log(data.toString()));
duplexStream.on('end'.data= > console.log('read done~'));

duplexStream.write('Amen to the vine before,');
duplexStream.write('Adon adon green just sprouted,');
duplexStream.write('Adon is carrying that heavy shell,');
duplexStream.write('One step at a time. ');
duplexStream.end();

duplexStream.on('finish'.data= > console.log('write done~'));
Copy the code

Duplex is the combination of Readable and Writable streams.

Transform

Duplex streams are readable and writable, but have no relationship with each other. In some cases, the incoming content needs to be converted and then discharged.

Transform stream To implement the _transform API, we implement the following Transform stream that reverses the content:

const Stream = require('stream');

class TransformReverse extends Stream.Transform {

  constructor() {
    super()}_transform(buf, enc, next) {
    const res = buf.toString().split(' ').reverse().join(' ');
    this.push(res)
    next()
  }
}

var transformStream = new TransformReverse();

transformStream.on('data'.data= > console.log(data.toString()))
transformStream.on('end'.data= > console.log('read done~'));

transformStream.write('A vine in front of Amen.');
transformStream.write('Adon adon green just sprouted.');
transformStream.write('Adon is carrying that heavy shell.');
transformStream.write('One step at a time.');
transformStream.end()

transformStream.on('finish'.data= > console.log('write done~'));
Copy the code

Here’s what it looks like:

The suspension and flow of flow

We get the content from the Readable stream and stream it into the Writable stream, implementing _read and _write on both sides.

Back pressure

But both read and write are asynchronous. What if the rates are different?

If Readable reads data at a faster rate than Writable writes, some data will accumulate in the buffer. If too much data is cached, it will burst and the data will be lost.

What if the Readable is reading data at a rate less than the Writable is writing? That doesn’t matter. It’s just a period of downtime.

This phenomenon that the read rate is greater than the write rate is called “back pressure” or “negative pressure”. It is also easy to understand that the write segment pressure is too high, cannot write, will burst the buffer, resulting in data loss.

The buffer size, which can be viewed by readableHighWaterMark and writableHightWaterMark, is 16K.

To solve the back pressure

How to solve the problem of inconsistent read/write rates?

When you are not finished, stop reading. So you don’t read more and more data and it stays in the buffer.

The Readable Stream has a readableFlowing property that represents whether data is automatically read in, which defaults to true, automatically reading in data and then listening for data events.

When readableFlowing is set to false, it does not read automatically; it needs to be read in manually through read.

readableStream.readableFlowing = false;

let data;
while((data = readableStream.read()) ! =null) {
    console.log(data.toString());
}
Copy the code

However, it is more difficult to read manually. We can still use pause and resume to pause and resume automatically.

When the write method of writable stream is called, it returns a Boolean indicating whether the target is written to or placed in the buffer:

  • True: Data has been written to the target
  • False: The target cannot be written and is temporarily placed in the buffer

Pause when false is returned and resume when the buffer is empty:

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);

rs.on('data'.function (chunk{
    if (ws.write(chunk) === false) { rs.pause(); }}); rs.on('end'.function ({
    ws.end();
});

ws.on('drain'.function ({
    rs.resume();
});
Copy the code

In this way, the function of stopping and resuming the read rate can be achieved according to the write rate, and the back pressure problem can be solved.

Does Pipe have back pressure problems?

Pipes are often used to connect the Readable stream directly to the Writable stream, but we do not seem to have encountered any backpressure problems. In fact, the read rate is adjusted dynamically in pipes.

const rs = fs.createReadStream(src);
const ws = fs.createWriteStream(dst);

rs.pipe(ws);
Copy the code

conclusion

Stream is a common idea in data transmission. It is the basic concept of file reading and writing and network communication.

Node.js also provides apis for Streams, including Readable, Writable, Duplex, and Transform streams. They implement _read, _write, _read + _write, and _transform methods respectively to return and process data.

Creating a Readable object can either be created directly by calling the Readable API and overriding the _read method, or it can inherit a subclass of the Readable implementation and instantiate it later. The same applies to other flows. (Readable can easily be combined with Generator)

When the read rate is greater than the write rate, the “back pressure” phenomenon occurs and the buffer will burst, resulting in data loss. The solution is to dynamically pause and resume the read rate according to the write rate. Pipe does not have this problem because it is handled internally.

Flow is a concept that I/O cannot go around. Back pressure is also a common problem of flow. In case of data loss, you can consider whether back pressure occurs. I hope this article can help you clear your mind and really master Stream!