What is a Stream?

  • A stream in Node.js is an abstract interface that processes stream data. The STREAM module provides the basic API. Using these apis, you can easily build objects that implement the flow interface. For example, HTTP requests and process.stdout are instances of flows.
  • Streams can be readable, writable, or read-write. Note that all flows are instances of EventEmitter.

Type of stream

There are four basic stream types in Node.js:

  1. Readable-a stream that is Readable (for example, fs.createreadStream ()).
  2. Writable – Writable stream (for example fs.createWritestream ()).
  3. Duplex – A read-write stream (such as net.socket).
  4. Transform – A Duplex stream that can modify and Transform data during reading and writing (for example, zlib.createDeflate()).
var Stream = require('stream'Readable // Readable streams var Writable = stream.writable // Writable streams var Duplex = stream.duplex Var Transform = stream. Transform // Duplex streams in which data can be modified and transformed during reading and writingCopy the code

The flow operations in Node.js are encapsulated in the Stream module, which is referenced by multiple core modules. For example, the source implementations of fs.createreadstream () and fs.createWritestream () call abstract interfaces provided by the Stream module to Stream data.

Why Stream?

Let’s look at two examples to see why we use Stream.

Exp1:

Here is an example of reading the contents of a file:

const fs = require('fs')

fs.readFile(file, function(err, content) {Buffer console.log(content) console.log(content.tostring ())})Copy the code

But if the file is large, say 500 MB, the output from executing the code above is:

<Buffer 64 74 09 75 61 09 63 6f 75 6e 74 0a 0a 64 74 09 75 61 09 63 6f 75 6e 74 0a 32 30 31 35 31 32 30 38 09 4d 6f 7a 69 6c 6c 61 2f 35 2e 30 20 28 63 6f 6d ... >
buffer.js:382
    throw new Error('toString failed');
    ^

Error: toString failed
    at Buffer.toString (buffer.js:382:11)
Copy the code

The reason for the error is that the length of the Content Buffer object is too large, causing the toString method to fail. As you can see, this method of getting everything at once is not suitable for large files.

Consider using streams to read file contents.

var fs = require('fs')

fs.createReadStream(bigFile).pipe(process.stdout) 
Copy the code

Fs.createreadstream creates a readable stream that connects the source (upstream, file) to the consumer (downstream, standard output).

When the above code is executed, the stream calls fs.read one by one (the ReadStream class has a _read method that internally calls fs.read to read the file), pulling out the contents of the file in batches and passing them downstream.

As far as the document is concerned, its contents have been removed in successive chunks.

Downstream, it receives a sequence of incoming data.

If you don’t need to do everything at once, it can process a piece of data and throw it away.

From the stream’s point of view, it stores only a portion of the file’s data at any one time, but the content is changing.

It’s like using a hose to get water out of a pool.

Every time a little water is used up, the hose takes a little more out of the pool.

No matter how big the pool is, it only stores the same amount of water as the pipe.

Exp2:

Here is an example of watching a video online. Suppose we return the video content to the user via an HTTP request

const http = require('http');
const fs = require('fs');
 
http.createServer((req, res) => {
    fs.readFile(videoPath, (err, data) => {
    res.end(data);
});
}).listen(8080);
Copy the code

But there are two obvious problems with this

  1. The video file must be read completely before being returned to the user, which takes a long time.
  2. The video files are all in memory at once, which is too much for memory.

With streaming, you can read a video file bit by bit into memory, and then bit by bit back to the user, reading part and writing part. (Using the transfer-Encoding: chunked segmented Transfer feature of HTTP protocol), the user experience is optimized and the memory overhead is significantly reduced.

const http = require('http');
const fs = require('fs');
 
http.createServer((req, res) => {
    fs.createReadStream(videoPath).pipe(res);
}).listen(8080);
Copy the code

From the two examples above, we know that streaming processing must be used in the case of big data.

Iv. Readable Stream

Readable Streams are an abstraction of the sources that provide the data.

Common readable streams:

  • HTTP responses, on the client
  • HTTP requests, on the server
  • fs read streams
  • TCP Sockets // Sockets is a duplex stream, readable and writable
  • Process. stdin // Standard input

All Readable Streams implement the interface defined by the Stream.Readable class.

Two modes of readable flow (Flowing and Paused)

  1. Under flowing mode, readable streams automatically read data from the bottom layer of the system and made it available to the application as quickly as possible through events on the EventEmitter interface (all streams were instances of EventEmitter).

  2. In paused mode, the stream.read() method must be explicitly called to read a piece of data from the stream.

Create a Readable stream for the stream, which defaults to paused mode and does not read data by default. All of the Readable streams that initially worked in Paused mode could be switched to flowing mode in one of three ways:

  • Listen for the ‘data’ event
  • Call the stream.resume() method
  • Call the stream.pipe() method to send data to Writable

Fs.createreadstream (path[, options]) source code implementation

// File name readstream.jslet fs = require('fs'); // Read the filelet EventEmitter = require('events'); Class ReadStream extends EventEmitter {constructor(Path, options = {}) {super(); // This. Path = path; / / read the file path enclosing highWaterMark = options. HighWaterMark | | 64 * 1024; / / buffer size, the default 64 KB enclosing autoClose = options. AutoClose | |true; // Whether file descriptors need to be closed automatically. The default value istruethis.start = options.start || 0; // Options can include start and end values so that it can read a range of bytes from the file instead of the entire file this.pos = this.start; / / the position began to read from the file content, pos will change with the location of the read this. End = options. The end | | null; / / null said didn't pass this. Encoding = options. The encoding | | null; this.flags = options.flags ||'r'; // The question of how to manipulate files // parameters this.flowing = null; This.buffer = buffer.alloc (this.highwatermark); this.open(); // {newListener:[fn]} // This. On ('newListener', (type) => {// Wait for it to listen for data eventsif (type= = ='data') {// Set the flow to flowing mode this.flowing = when listening to the data eventtrue; this.read(); // Start reading the client has listened to the data event}})}pause(){// Switch the flow from flowing mode to paused mode this.flowing =false;
  }
  resume(){// Switch the flow from paused mode to flowing mode this.flowing =true; this.read(); After switching the stream from PAUSED mode to FLOWING mode, continue reading the file contents}read(){// Defaults to the first callreadMethod, the file is opened asynchronously, so it cannot be read directlyif(typeof this.fd ! = ='number'// If fd is not number and the file has not been opened, you need to listen for an open event, because the file will be opened as soon as it is opened, as stated in this.open()return this.once('open',() => this.read()); // Wait until the open event is triggered and the fd must get itread// end = 4; // end = 4; // end = 123; // end = 123; // end = 123lethowMuchToRead = this.end? Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark; Fs.read (this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, ByteRead) => {this.pos += byteRead; // This. Buffer is three by defaultletb = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead); // Encode what is read this.emit('data', b); // Triggers the data event to output the read content to the userif ((byteRead === this.highWaterMark)&&this.flowing){
        returnthis.read(); } // There is no more logic hereif(byteRead < this.highwatermark){// There is no more this.watermark ('end'); This.destroy (); }}); } // Open the filedestroy() {
    if(typeof this.fd ! ='number') { return this.emit('close'); } fs.close(this.fd, () => {// If the file is open, close the file and emit the close event this.emit('close');
    });
  }
  open() {fs.open(this.path, this.flags, (err, fd) => {//fd is the file descriptor that identifies the current this.path file, starting at 3 (number type).if (err) {
        if(this.autoclose) {// Fd this.destroy(); // Destroy (close file, trigger close event)} this.emit('error', err); // An error event is raised if there is an errorreturn; } this.fd = fd; // Save the file descriptor this.emit('open', this.fd); // The file is opened, the trigger file is opened method}); } pipe(dest){pipe(dest){pipe(dest){pipe(dest){pipe(dest){pipe(dest){pipe(dest){pipe(dest){pipe(dest){pipe(dest){pipe(dest){pipe(dest){'data',(data)=>{
      let flag = dest.write(data);
      if(! Flag){// This flag is the read status returned each time ws-write () is called this.pause(); }});}}}); dest.on('drain',()=>{// Console. log() when the read cache is cleared'Let me write that down. Let me stop.') this.resume(); // continue to write data to dest}); } } module.exports = ReadStream; // Export a readable streamCopy the code

Using the fs. CreateReadStream ()

// Stream: ordered directional, can control the rate // Read: read to memory // write: write to memory or file to file // Read by default 64K, encoding Read by default buffer //let fs = require('fs');
//letrs = fs.createReadStream({... }); // Natively implement readable streamslet ReadStream = require('./ReadStream');
let rs = new ReadStream('./2.txt', {highWaterMark: 3, // bytes flags:'r'// Read file autoClose:true// Start :0, //end:3,// Stream is a closed interval package start also package end encoding:'utf8'}); // If we need to receive data, we need to listen for data events, so that the data will automatically flow out of rs.on().'error'.function(err) {// Typically, this occurs either within the underlying system so that data cannot be produced, or when the implementation of the stream tries to pass bad data. console.log(err) }); rs.on('open'.function() {// The file is opened and fd is retrieved. Rs.emit ('data'); 
  console.log('File open');
});
rs.on('data'.functionConsole. log(data); rs.pause(); // Pause the trigger on('data') event, which converts the flow mode back to the non-flow mode});setTimeout(()=>{rs.resume()},3000); Rs.on (rs.on)'end'.functionConsole. log() {console.log()'Read and complete');
});
rs.on('close'.function() {// The close event is emitted after the stream or its underlying resource (such as a file) is closed. After the close event is triggered, the stream will not fire any more events. //console.log('off')});Copy the code

Writable Stream

Writable stream is an abstraction of the data flow direction to the device. It is used to consume the upstream data flow. The writable stream program can write the data to the device.

Common writable streams:

  • HTTP requests, on the client
  • HTTP responses, on the server
  • fs write streams
  • zlib streams
  • crypto streams
  • TCP sockets
  • child process stdin
  • process.stdout, process.stderr

All Writable streams implement the interface defined by the stream.Writable class.

Use of writable streams

You can write data to a writable stream by calling the write() method of a writable stream instance

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
 
rs.setEncoding('utf-8'); // Set the encoding format rs.on('data', chunk => { ws.write(chunk); // write data});Copy the code

The data event that listens on the readable stream puts the readable stream into flow mode. We call the write() method of the writable stream in the callback event so that the data is written to the writable stream abstract device destPath.

The write() method takes three arguments

  • The chunk {String | Buffer}, said to write data
  • Encoding Sets the encoding when the data to be written is a string
  • Callback A callback function after data has been written

Drain the event tapping

If the stream.write(chunk) method is called and returns false, the current cache is full, and the stream will fire the drain event at an appropriate time (after the cache is empty).

const fs = require('fs');
const rs = fs.createReadStream(sourcePath);
const ws = fs.createWriteStream(destPath);
 
rs.setEncoding('utf-8'); // Set the encoding format rs.on('data', chunk => {
letflag = ws.write(chunk); // Write dataif(! Flag) {// If the cache is full, pause reading rs.pause(); }}); ws.on('drain', () => { rs.resume(); // cache empty continue reading write});Copy the code

Fs. createWriteStream(path[, options]) source code implementation

/ / file WriteStream. Jslet fs = require('fs');
let EventEmitter = require('events');
class WriteStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    this.path = path;
    this.flags = options.flags || 'w';
    this.encoding = options.encoding || 'utf8';
    this.start = options.start || 0;
    this.pos = this.start;
    this.mode = options.mode || 0o666;
    this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark || 16 * 1024; this.open(); // this. Writing = this. Writing = thisfalse; // This. Cache = []; // Maintain a variable that represents the cache length this.len = 0; // Whether the drain event is triggered this.needDrain =false;
  }
  clearBuffer() {
    let buffer = this.cache.shift();
    if(buffer) {// If there is this._write(buffer.chunk, buffer.encoding, () => this.clearbuffer ()); }else{// If there is none in the cacheif(this.needdrain) {// Need to trigger the drain event this.writing =false; NeedDrain = this. NeedDrain =false;
        this.emit('drain'); }}} _write(chunk, encoding, clearBuffer) {// Since the write method is called synchronously, the fd has not obtained the write method, so the write operation can be performed after obtaining itif(typeof this.fd ! ='number') {
      return this.once('open', () => this._write(chunk, encoding, clearBuffer)); } fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => { this.pos += byteWritten; this.len -= byteWritten; // Reduce clearBuffer() in memory after each write; Write (chunk, encoding = this.encoding) {// Write (chunk, encoding = this.encoding) { IsBuffer (chunk) = buffer. IsBuffer (chunk)? chunk : Buffer.from(chunk, encoding); this.len += chunk.length; // Maintain cache length 3let ret = this.len < this.highWaterMark;
    if(! ret) { this.needDrain =true; // the drain event needs to be emitted}if(this.writing) {this.cache.push({chunk, encoding,}); }else{// first time this. Writing = thistrue; this._write(chunk, encoding, () => this.clearBuffer()); // write specific methods}returnret; // Can you continue to write,falseThe next time you write it, you need to use more memory.destroy() {
    if(typeof this.fd ! ='number') {
      this.emit('close');
    } else {
      fs.close(this.fd, () => {
        this.emit('close'); }); }}open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) {
        this.emit('error', err);
        if(this.autoClose) { this.destroy(); // Destroy file descriptor if autoclose}return;
      }
      this.fd = fd;
      this.emit('open', this.fd);
    });
  }
}
module.exports = WriteStream;
Copy the code

Using the fs. CreateWriteStream ()

// 1. The first write is actually written to the file, and the second write is put into the cache. When written, a Boolean type is returnedfalse// 3. When memory and writing is exhausted, a drain event is emitted //let fs = require('fs');
//letrs = fs.createWriteStream({... }); // Natively implement writable streamslet WS = require('./WriteStream')
let ws = new WS('./2.txt', {
  flags: 'w'HighWaterMark: 1, highWaterMark: 1, highWaterMark: 1'utf8'Start: 0, autoClose:true, // Automatic close file descriptor mode: 0o666, // readable and writable}); // Drain triggers only when the highWaterMark is filledlet i = 9;
function write() {
  let flag = true;
  while (flag && i >= 0) {
    i--;
    flag = ws.write('111'); // 987 // 654 // 321 // 0
    console.log(flag)
  }
}
write();
ws.on('drain'.function () {
  console.log('dry');
  write();
});
Copy the code

conclusion

Streams are divided into readable (flowing mode and Paused Mode), writable, and read-write streams, and Node.js provides a variety of stream objects. For example, HTTP requests and process.stdout are instances of flows. The STREAM module provides the basic API. Using these apis, you can easily build objects that implement the flow interface. They both call and encapsulate the Stream module at the bottom.