I have never understood the concept of Stream in Node.js, but I have recently studied it in depth, and I will try to explain it in an article. The code in this article is located at github.com/Maricaya/no…

What is a Stream?

A stream is an abstract data structure. Just like an array or a string, a stream is a collection of data.

The difference is that a stream can output a small amount of data at a time, and it doesn’t have to be in memory.

For example, the request/ Response object that makes an HTTP request to the server is a Stream.

The illustration Stream

A stream is like a stream of water, but by default there is no water. Stream. write can make water in the stream, that is, write data.

The upper-left corner is the segment that generates the data, called the source. And at the bottom is the sink where you get the data. The dots that flow from top to bottom are pieces of data written at a time, called chunks.

Why do I need a Stream

Stream nodes can also be used to read and write data without using stream nodes.

Yes, but the way to read and write is to read all the contents of the file into memory and then write to the file, which is not a problem for small files.

But encountered large files, it is unbearable.

The flow can divide the file resources into small pieces and transport them piece by piece. The resources are transferred like water, reducing the pressure on the server.

The Stream instance

In case that doesn’t convince you, let’s try an experiment to see if it’s necessary to use streams when reading and writing large files.

First create a large file:

Create large files with Stream

We first create a writable stream that writes multiple times to the file. Finally, remember to close the stream and get a large file.

// Import the file module
const fs = require('fs');

const stream = fs.createWriteStream('. /.. /big_file.txt');

for (let i = 0; i < 1000000; i++) {
  stream.write(` this is the first${i}Line \ n `);
}

stream.end()
console.log('done')
Copy the code

Using readFile

Let’s first read the contents of the file using fs.readfile and see what happens.

const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request', (request, response) => {
  fs.readFile('. /.. /big_file.txt', (error, data) => {
    if (error) throw error
    response.end(data)
    console.log('done')
  })
})
server.listen(8889)
console.log(8889)
Copy the code

When we access http://localhost:8889, the server asynchronously reads this large file.

Everything seems to be all right.

However, we used the task manager to look at the node.js memory, and it was around 130Mb.

The server received one request, occupying 130 Mb. So if you accept 10 requests, you’re taking 1G. The memory consumption on the server is high.

How to solve this problem? Use of the Stream.

Use of the Stream

Let’s try rewriting the above example with Stream.

Create a readable stream createReadStream and connect the stream to the response stream via a pipe.

const fs = require('fs')
const http = require('http')
const server = http.createServer()
server.on('request', (request, response) => {
	const stream = fs.createReadStream('./big_file.txt')
	stream.pipe(response)
})
server.listen(8888)
Copy the code

Again, we look at the node.js memory usage, and it’s basically no higher than 30Mb.

Because only a small amount of data is passed at a time, it does not take up much memory.

Pipeline pipe

As long as stream1 has data, it will flow to Stream2.

Like the code above:

const stream = fs.createReadStream('./big_file.txt')
stream.pipe(response)
Copy the code

Stream is a file stream, and the following stream is our HTTP stream response. Originally the two streams were unrelated, and now we want to pass the data from the file stream to the HTTP stream. That’s easy. Just use a pipe.

Commonly used code

stream1.pipe(stream2)

  • Stream1 is the stream that emits data, a readable stream.
  • Stream2 is the stream that writes data, a writable stream.

The chain operation

A stream of water can flow through an infinite number of pipes, and so can a stream of data.

There are two ways to write it:

a.pipe(b).pipe(c)
/ / equivalent to the
a.pipe(b)
b.pipe(c)
Copy the code

Pipe principle

A pipe can also be thought of as the encapsulation of two events

  • Listen for data events, and stream1 feeds it to Stream2 as soon as it has data
  • Listen for the end event and stop stream2 when stream1 stops
stream1.on('data', (chunk) => {
	stream2.write(chunk)
})

stream1.on('end', () => {
	stream2.end()
})
Copy the code

A prototype chain of Stream objects

Knowing the Stream prototype chain makes it easier to remember the Stream API.

fs.createReadStream(path)

If s = fs.createreadStream (path), then the object hierarchy of S is:

  1. Property of itself, byfs.ReadStreamConstructed by the constructor
  2. Prototype:stream.Readable.prototype
  3. Secondary prototype:stream.Stream.prototype
  4. Level 3 prototype:events.EventEmitter.prototypeThis is the prototype that all streams inherit
  5. Level 4 prototype:Object.prototypeThe prototype that all objects inherit from

Events and methods supported by Stream

With that in mind, let’s look at the events and methods that Stream supports.

Just get an impression. Look it up when you need it.

Readable Stream Writable Stream
The event data, end, error,close,readable Drain (write this time),finish(write the whole time),error,close,pipe,unpipe
methods pipe() unpipe() read()… write() destroy() …

The Stream classification

There are four categories

The name of the The characteristics of
Readable Can be read
Writable Can write
Duplex Readable and writable (two-way)
Transform Readable and writable (variations)

Both Readable and Writable are unidirectional, while the other two are bidirectional.

Readable, writable and understandable. What’s the difference between the other two?

And Transform is write and read.

A Babel, for example, converts es6 to, we write ES6 on the left and read ES5 on the right. It’s like a car wash. The black car goes in, the white car comes out.

Readable Stream is a Readable Stream

Paused and flow dynamic flowing

Readable flows have two states, paused and flowing.

A readable stream can be thought of as a content producer, static when no content is sent, and stream dynamic when content resumes.

  • The readable stream is in the paused state by default.
  • Once the Data event listener is added, it becomes flowing.
  • Delete the data event listener, paused state.
  • Pause () can change it to paused.
  • Resume () will make it flowing.
const http = require('http')
const fs = require('fs')
const server = http.createServer()
server,on('request', (request, response) => {
  // The default is paused
  const stream = fs.createReadStream('./big_file.txt')
  stream.pipe(response)
  stream.pause(); / / pause
  setTimeout((a)= > {
  / / recovery
    stream.resume()
  }, 3000)
})
server.listen(8888);
Copy the code

Writable Stream

Drain Drains the event

It means you’re ready to add some water, which means you can continue writing data. When we call stream.write(chunk), we might get false.

False means you are writing too fast, there is a backlog of data.

At this point, we can no longer write, but listen for drain.

When the drain event triggers, we can continue writing.

It’s a little hard to understand just by looking at this, but here’s an example from our website:

const fs = require('fs');

// Write data to file 1000000 times
function writeOneMillionTimes(writer, data) {
  let i = 1000000;
  write();

  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // Write for the last time
        writer.write(data);
      } else {
        // Check if you can continue writing
	// if ok is false, it means that you are writing too fast
        ok = writer.write(data);
        if (ok === false) {
          console.log('No more.')}}}while (i > 0 && ok);
    if (i > 0) {
      // It is dry and can continue writing
      writer.once('drain', () = > {console.log('Dried up') write() }); }}}const write = fs.createWriteStream('. /.. /big_file.txt')
writeOneMillionTimes(write, 'hello world')
Copy the code

Finish the event

After calling stream.end() and after the buffer data has been passed to the underlying system, the Finish event is fired.

When we write data to a file, instead of saving it directly to the hard drive, we put it in a buffer first. Data is written to disks only when it reaches a certain size.

Create your own stream

Let’s take a look at how to create your own stream.

Follow each of the four types of flow.

Writable

const {Writable} = require('stream')

const outStream = new Writable({
  // If others call, what do we do
  write(chunk, encoding, callback) {
    console.log(chunk.toString())
    // Proceed to the next process
    callback()
  }
})

process.stdin.pipe(outStream);
Copy the code

Save the file as writable.js and run it with node. No matter what you put in, you get the same result.

Readable

Read all the data at once

const {Readable} = require('stream')

const inStream = new Readable()

inStream.push('hello world') // Write data
inStream.push('hello node')

inStream.push(null) // There is no data left
// Import the readable stream into the writable stream process.stdout.
inStream.pipe(process.stdout)
Copy the code

Push all data into inStream and pipe it into the writable stream process.stdout.

This way, when we run the file with Node, we can read all the data from inStream and print it out.

It’s simple, but it’s not efficient.

A better approach is to push on demand, and then read the data when the user wants it.

You have to call read to give the data once

In this way, the data is supplied on demand, and we only give it once when they call read.

For example, in the following example, we push one character at A time, starting with character code 65 (for A).

So when the user reads, it’s going to keep triggering read, and we’re going to push more characters.

When all characters are pushed, we push null and stop the Stream.

const {Readable} = require('stream')

const inStream = new Readable({
  read(size) {
    const char = String.fromCharCode(this.currentCharCode++)
    this.push(char);
    console.log(` pushed on${char}`)
    // Stop at this time
    if (this.currentCharCode > 90) { // Z
      this.push(null)
    }
  }
})

inStream.currentCharCode = 65 // A

inStream.pipe(process.stdout)
Copy the code

Duplex Stream

Once you’ve seen both readable and writable streams, Duplex Stream is much simpler.

Implement both write and read methods.

const {Duplex} = require('stream')

const inoutStream = new Duplex({
  write(chunk, encoding, callback) {
    console.log(chunk.toString())
    callback()
  },
  read(size) {
    this.push(String.fromCharCode(this.currentCharCode++))
    if (this.currentCharCode > 90) {
      this.pull(null)
    }
  }
})

inoutStream.currentCharCode = 65;

process.stdin.pipe(inoutStream).pipe(process.stdout);
Copy the code

Transform Stream

For Transform Stream, we implement the Transform method, which combines both the readable and writable methods.

Here’s a simple transform example that prints any character you type in uppercase:

const {Transform} = require('stream')

const upperCaseTr = new Transform({
  transform(chunk, encoding, callback) {
    // 1. Read data chunk.toString()
    // 2. Write data this. Push (XXX)
    this.push(chunk.toString().toUpperCase()) callback(); }})// Listen for user input and call upperCaseTr
// After the conversion is complete, output
process.stdin.pipe(upperCaseTr) 
	.pipe(process.stdout)
Copy the code

Node.js built-in Transform Stream

For example, the best solution often said in the interview: GZIP compression.

This is done in four lines of code in Node.js

const fs = require('fs')
const zlib = require('zlib')
const file = process.argv[2]

fs.createReadStream(file)
  .pipe(zlib.createGzip())
	.on('data', () => process.stdout.write(".")) // Display the progress bar
  .pipe(fs.createWriteStream(file + ".gz"))
Copy the code

Stream is ubiquitous in Node.js

Readable Stream Writeable Stream
HTTP Response client HTTP Request client
HTTP Request server HTTP Response server
fs read stream fs write stream
zlib stream zlib stream
TCP sockets TCP sockets
child process stdout & stderr child process stdin
process.stdin process.stdout,process.stderr
. .

Data backlog problem

There is another very important issue in Stream: the data backlog.

If the data is too much, blocked, how to solve.

The node.js website has a special article explaining how to solve the problem, so you can check it out if you have a problem.

I won’t repeat it here, the address is here

conclusion

Let’s summarize what we’ve learned so far about Stream objects.

  • Why use Stream?

    • Because when reading and writing large files, you can effectively reduce memory pressure.
  • A pipe is an important concept in a Stream that connects streams.

  • Stream objects inherit EventEmitter.

  • Stream is divided into four categories

    • Readable, with two states: Paused, flowing.
    • Writable, two important events: drain, finish.
    • Duplex readable and writable
    • Readable and writable Transform
  • Finally, how to create four classes of Stream and Stream in Node.js