1. The concept of flow

  • A stream is an ordered set of byte data transfers with a starting and ending point
  • It does not care about the overall content of the file, only whether data is read from the file and what happens to it once it is read
  • A stream is an abstract interface implemented by many objects in Node. For example, HTTP server request and Response objects are streams.

2. Four basic flow types

  1. Readable- Readable stream (for example, fs.createreadSteam (), HTTP request)
  2. Writable Writable streams (e.g. Fs.createwritestream (), response to HTTP)
  3. Duplex- Duplex flow (e.g. Net.socket)
  4. A Duplex stream that modifies and transforms data during reading and writing (e.g. Zlib.creategzip)

3 Readable stream fs.createreadStream

The purpose of a readable stream is to read the file from a specified location through buffer, read the number of bytes at a time, save it to buffer, and read the number of bytes at a time from buffer until it finishes reading.Copy the code
  • The readable stream defines some parameters internally to implement the read-by-read function

  • Path: Read file path (required)

  • Options: Collection of parameters (objects) (Optional)

    1. Flags File flag bit Default R reads files
    2. Encoding By default, buffer is used to read files
    3. Mode File operation permission Default 0O666 (readable and writable)
    4. Start The start position for reading files is 0 by default
    5. End File read end position (after package) Default null read until last
    6. How many bytes are read from the highWaterMark file at a time The default value is 64 x 1024
    7. AutoClose Whether to automatically close the default value is true
  • By default, a file is created in non-flow mode. By default, no data is read. We accept data based on events. Before reading the file, fs.open is used to open the file inside the readable stream and an open event is triggered. An error event is emitted to handle the error during operation of the file. Finally, the end method is fired when the file is read

  • If we want to pause data, we have a pause method that changes flow mode to non-flow mode, and a resume method that changes non-flow mode to flow mode

  • 1. The content in the TXT file is xx0123456789

  • I want to read continuous strings from 0 to 9 in the 1.txt file through a readable stream

The following code

let fs = require("fs");

// Create a readable stream in non-flowing mode by default
let rs = fs.createReadStream("./1.txt"), {flags : "r".encoding : "utf8".mode : 0o666.start : 2.end : 11.highWaterMark : 10.autoClose : true
})
// The result is not read by default
rs.on("open", () = > {console.log("open");
})
rs.on("data",data=>{    // Switch from non-flowing mode to flowing mode
    console.log(data);
    // rs.pause(); suspended
    // rs.resume(); restore
});
rs.on("end", () = > {// File read completes execution
    console.log("end");
});
rs.on("error",err=>{    // Error monitoring
    console.log(err);
});
rs.on("close", () = > {// File closed execution
    console.log("close"); }); The console output is open0123456789      // The highWaterMark setting determines the number of bytes read at a time
end
close
Copy the code

Readable stream implementation principle analysis

// How to implement readable streams
let eventEmitter = require("events");
let fs = require("fs");

// Inherit the method on eventEmitter prototype on..
class ReadStream extends eventEmitter{
    constructor(path,options){
        super(a);// Inherit private methods
        this.path = path;
        this.flags = options.flags || "r";
        // Read encoding defaults to null(buffer)
        this.encoding = options.encoding || null;
        this.mode = options.mode || 0o666;
        this.start = options.start || 0;
        this.end = options.end || null; // Default null has no limit
        this.highWaterMark = options.highWaterMark || 64 * 1014;
        this.autoClose = options.autoClose || true;
        
        // Use the same memory each time for better performance
        this.buffer = Buffer.alloc(this.highWaterMark);
        // Define a control read offset. Default is the same as start
        this.pos = this.start; 
        // Whether it is flow mode The default is false
        this.flowing = false;  

        this.open(); // Initializes the open file
        // Listen on all on methods, execute this method every time on
        this.on("newListener",type=>{
            if(type === "data") {// on("data")
                this.flowing = true; // Flow mode
                this.read();    // Read file contents and emit emit("data")}})}// Open the file to get fd
    open(){
        fs.open(this.path,this.flags,(err,fd)=>{
            // Error opening file
            if(err){
                this.emit("error",err); // Raise the error method
                if(this.autoClose){
                    this.destory();
                }
                return;
            }
            this.fd = fd;               // Save the file descriptor (file)
            this.emit("open".this.fd);  // Trigger the open method})}// Destroy file closed
    destory(){
        // Close the file only when the file is open
        if(typeof this.fd === "number"){
            fs.close(this.fd,()=>{
                this.emit("close"); })}else{
            this.emit("close"); }}// Read the contents of the file
    read(){
        // Use the publish/subscribe mode to solve the fd acquisition problem
        // The data method is synchronous
        // Tell me to continue reading when a fd(file descriptor) is available
        if(typeof this.fd ! = ="number") {return this.once("open", () = >this.read());
        }
        End = this.end + 1 - this.pos;
        // actualReadNumber actualReadNumber: highWaterMark is read if the number of bytes remaining is greater than the number of bytes read from the file each time, otherwise the number of bytes remaining is read
        let actualReadNumber = this.end ? Math.min(this.highWaterMark, this.end + 1 - this.pos ) : this.highWaterMark;
        fs.read(this.fd, this.buffer, 0, actualReadNumber, this.pos, (err,bytesRead)=>{
            if(bytesRead > 0) {this.pos += bytesRead;  // Update pos offset
                let r = this.buffer.slice(0,bytesRead);   // Intercepts valid read bytes
                r = this.encoding ? r.toString(this.encoding) : r;
                this.emit("data",r);      // Returns the read data
                // Read the content recursively if it is in flow mode
                if(this.flowing){
                    this.read(); }}else{
                // Trigger end after reading
                this.emit("end");
                if(this.autoClose){
                    this.destory(); // Close the file}}}); }// Pause reading data
    pause(){
        this.flowing = false;
    }
    // Resume reading data
    resume(){
        this.flowing = true;
        this.read(); }}module.exports = ReadStream;
Copy the code

4 Writable stream fs.createWritestream

  • The parameters of a writable stream are similar to those of a readable stream

  • Differences between writable and readable stream parameters

    1. The default flags for readable streams is R, and the default flags for writable streams is W
    2. Encoding Default buffer for readable streams, utF8 for writable streams
    3. HighWaterMark defaults to 64K for readable streams and 16K for writable streams
    4. Readable streams listen to the Data method to read data, and writable streams write files to files using the write method
  • The write method is used to write highWaterMark to a file. The highWaterMark method is used to write data into the file for the first time, and the subsequent data is stored in the cache. After the last write, the data in the cache is cleared (write the content in the cache) until the data is written to the location.

  • The drain event controls how a file is read and written, using the readable stream’s data time and the writable stream’s drain method.

  • The internal implementation of writable flows

    1. Because operating on the same file at the same time causes conflicts, the first call to write actually writes to the file and then to the cache. And then you take the first item in the cache and you write it in until you’re done
    2. When writing, it compares the length of the current write to hignWaterMark, or returns true if it is smaller than hignWaterMark; otherwise returns false
    3. The drain event is emitted if the number of current writes is greater than hignWaterMark
    4. When the contents of the file are written, the cache is cleared
    5. End (does not trigger drain, cannot write after write, will trigger close) writes end itself at the end

Write a string of 9-1 consecutive 3 bytes to the file in three separate writes. The drain event is called at the end of each write, using the drain event to control writing

let fs = require("fs");
let path = require("path");

// Use drain to control writing
let ws = fs.createWriteStream("2.txt"), {highWaterMark: 3        // The number of bytes written each time
});

let i = 9;  // Write 9 bytes to the file
function write(){
    let flag = true;    // The condition that can be written
    while(i>0 && flag){ 
        flag = ws.write(i--+"");    // Write one byte at a time
    }
}
write();
// If the number of bytes written is greater than or equal to highWaterMark=3, the drain event is emitted
ws.on("drain", () = > {console.log("Dry"); write(); }); Drain the output/ / 9 August 7dry/ / 6 5 4dry/ / 3 2 1
Copy the code

Writable flow implementation principles

1.Because operating on the same file at the same time causes conflicts, the first call to write actually writes to the file and then to the cache. And then you take the first item in the cache and you write it in until you're done2.When writing, the length of the current content is compared to hignWaterMark. If the length is smaller than hignWaterMark, the value is returnedtrueOtherwise returnsfalse
3.The drain event is emitted if the number of current writes is greater than hignWaterMark4.When the contents of the file are written, the cache is cleared5.End (does not trigger drain, cannot write after write, will trigger close) writes end itself at the endlet eventEmitter = require("events");
let fs = require("fs");
let iconv = require("iconv-lite");  // decode, encode

class WriteStream extends eventEmitter{
    constructor(path,options){
        super(a);// Inherit private methods
        this.path = path;
        this.flags = options.flags || "w";  / / the default w
        this.encoding = options.encoding || "utf8"; / / utf8 by default
        this.mode = options.mode || 0o666; // Default 0o666 is readable and writable
        this.autoClose = options.autoClose || true; // It is automatically disabled by default
        this.start = options.start || 0;   // Write from position 0 by default
        this.highWaterMark = options.highWaterMark || 16 * 1024; / / 16 k by default

        // The offset of the file to be written
        this.pos = this.start;  
        // The length of the contents in the file is not written
        this.len = 0;
        // Whether to write to a file. The default value is false
        this.writing = false;  
        // Put other written contents into the cache while the file is being written
        this.cache = []; 
        // The drain event is not emitted by default only if the length of the write is equal to the highWaterMark
        this.needDrain = false;     
        this.needEnd = false;   // Whether to trigger the end event

        this.open();    / / for fd

    }
    destroy(){  // Close the file
        if(typeof this.fd === "number"){
            fs.close(this.fd,()=>{
                this.emit("close"); })}else{
            this.emit("close");
        }
    }
    open(){ // Open the file to get the fd file descriptor
        fs.open(this.path,this.flags,this.mode,(err,fd)=>{
            // There was an error opening the file
            if(err){
                this.emit("error",err); // Raises an error event
                if(this.autoClose){
                    this.destroy(); 
                }
                return;
            }
            this.fd = fd;
            this.emit("open",fd);   // Trigger the open event})}/** * 1. Nothing is written and highWater checks if the drain event is triggered * 2. The first time I write it to a file, * @param {*} chunk can write only strings and buffers * @param {*} encoding The encoding that is currently written is utF8 * @param {*} callback Returns Boolean that compares the length of the unwritten content to hignWaterMark, or returns true if it is smaller than hignWaterMark, or false if it is not
    write(chunk,encoding="utf8",callback = (a)= >{{})if(this.needEnd){   // No more writing can be done
            throw new Error("write after end");
        }
        // 1. Compare the unwritten content with highWater
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);    / / into a buffer
        this.len += chunk.length;  // Gets the number of bytes that have not yet been written
        if(this.len >= this.highWaterMark){
            this.needDrain = true;  // Enable the drain event step 3 *****
        }

        // 2. Write the first time to the file and the second time to the cache
        // Save the file to be written to the cache step 1 *****
        if(this.writing){
            this.cache.push({
                chunk,
                encoding,
                callback
            });
        }else{  // Write to the file
            // The next time you do not write to the file, the drain event is triggered to update the status
            this.writing = true;
            this._write(chunk,encoding,()=>{
                callback();
                // After the first write, fetch the next item from the cache and continue writing
                this.clearBuffer(); 
            });
        }
        // Return false as long as the length of the content not written is greater than the maximum watermark
        return this.len < this.highWaterMark;       // Step 2 *****
    }
    clearBuffer(){  // Clear the next item in the cache
        // Get the first item to write in the cache and delete the first item
        let obj = this.cache.shift();
        if(obj){
            this._write(obj.chunk,obj.encoding,()=>{
                obj.callback();
                this.clearBuffer(); // Then clear the contents of the cache
            });
        }else{  // After all the contents of the cache are written
             // If there is an end event, the drain event is not emitted
            if(this.needEnd){  
                this._writeEnd(this.needEnd);    // Add the contents of the end event
            }else{
                if(this.needDrain){
                    this.writing = false;   // triggers drain to write to the file the next time it writes
                    this.emit("drain");     // Trigger the drain event}}}}// The method that actually writes to the file
    _write(chunk,encoding,callback){
        // Use the publish/subscribe mode to solve the fd acquisition problem
        // The data method is synchronous
        // Tell me to continue reading when a fd(file descriptor) is available
        if(typeof this.fd ! = ="number") {return this.once("open", () = > {this._write(chunk,encoding,callback)});
        }
        /** * fd file descriptor starts from 3 * chunk buffer to write * 0 Where buffer is written from * len where buffer is written * pos where file is written from * bytesWrite Number of actual writes * /
        // Return buffer to decode according to different encoding format
        chunk = iconv.encode(chunk,encoding); 
        fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err,bytesWrite)=>{
            this.pos += bytesWrite;  // Move the offset to be written
            this.len -= bytesWrite;  // Reduce the number of no writes
            callback(); // Clear the contents of the cache. Step 4 *****
        });
    }
    // The file ends and the file is closed
    end(chunk,encoding="utf8",callback = (a)= >{}){
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);    / / into a buffer
        this.needEnd = {
            chunk,
            encoding,
            callback
        };
        if(!this.writing){  // The end method is called if no file is written
           this._writeEnd(this.needEnd);
        }
    }
    _writeEnd(obj){ // Write the contents of the end event to the file and close the file
        this._write(obj.chunk,obj.encoding,()=>{
            obj.callback();         // Execute the end callback
            if(this.autoClose){     
                this.destroy();     // Execute the close callback}}); }}module.exports = WriteStream;
Copy the code

5 Pipe flow readable flow.

The principle of pipe flow is to read a bit from file 1 through the readable stream and then write a bit from file 2 through the writable stream, realizing the function of read a bit and write a bitCopy the code

Here is the pipeline flow for the FS file manipulation module in Node

let fs = require('fs');
let rs = fs.createReadStream('./1.txt', {highWaterMark:3
});
let ws = fs.createWriteStream('./2.txt', {highWaterMark:3
});
rs.pipe(ws); // Will control the rate (to prevent flooding free memory)
Copy the code
  • Pipe controls the rate of the highWaterMark for both readable streams and writable streams. Pipe writes the content read from THE RS on(‘data’) into the highWaterMark by calling the ws.write method
  • The ws. Write method returns a Boolean true: the number of bytes written is not saturated, false: the number of bytes written is saturated
  • If false is returned, rs.pause() is called to pause the read
  • Wait for the writable stream to finish writing and resume reading of the readable stream in the on(‘drain’) event

Pipe is implemented as follows

// Add a pipe method to the readable stream prototype
// Implement the function of reading and writing bit by bit
pipe(dest){
    this.on("data",data=>{
        // If the data read exceeds the maximum number of bytes written to the writable stream, the read is paused until the write is complete
        let flag = dest.write(data);
        if(! flag){this.pause();   / / pause}})// Resume reading after data is successfully written
    dest.on("drain", () = > {this.resume();      / / recovery})}Copy the code

6 To achieve duplex flow NET module socket

With duplex flow, we can implement both readable and writable interfaces on the same object, as if inheriting both interfaces. Importantly, the readability and writability operations of a duplex stream are completely independent of each other. This is simply combining two features into one object

// All streams are based on the stream module
let {Duplex} = require("stream");
let fs = require("fs");
let iconv = require("iconv-lite");  // decode, encode

// Inherit the duplex stream entity class to make it readable and writable
class MyStream extends Duplex{
  constructor() {super(a); } _read(){// equivalent to the _read method in writable source code
    this.push('1');     // Fires the data event and returns data
    this.push(null);    // indicates the end of reading, otherwise the loop will continue indefinitely
  }
  // The first write is written to the file, and the rest is stored in the cache
  _write(chunk,encoding,callback){  // equivalent to the _write method in writable source code
    chunk = encoding ? iconv.encode(chunk,encoding) : chunk;    / / decoding
    console.log(chunk)
    callback();         // Clear the contents of the buffer. If not, the write method is executed only once}}let myStream = new MyStream
myStream.write('ok1');  
myStream.write('ok2');  // If the callback function of the previous write method is not executed, this code will not be called
myStream.on("data",data=>{
    console.log(data); }) output ok1 ok21If the _read method does not writethis.push(null) and the _write methods do not call the callback method ok11
1
1
1On and onCopy the code

CreateGzip (zlib.creategzip)

  • The output of the transformation flow is computed from the input
  • For the transformation flow, we don’t have to implement a read or write method, we just need to implement a transform method that combines the two. It stands for write method, and we can also use it to push data.
  • A conversion stream is a conversion operation between a readable stream and a writable stream
let {Transform} = require("stream"); let fs = require("fs"); Class MyStream extends Transform{constructor(){super(); } // convert letters toUpperCase _transform(chunk,encoding,callback){// writable stream this.push(chunk.tostring ().touppercase ()); // Callback (); }} let myStream = new myStream; The console inputs ABC and outputs ABCCopy the code

8 readable mode

In addition to the non-flowing and flowing modes, there is also a readable mode

Readable characteristics

    1. By default, after listening for readable, a callback will be executed to fill the highWaterMark with this much content
    1. You read it yourself, and if the cup is empty you’ll keep reading the highWaterMark until there’s nothing left
    1. HighWaterMark is the default value of the cup. If the cup is not full, add highWaterMark to the cup

Implement a row reader in readable mode that reads data line by line

let fs = require("fs");
let eventEmitter = require("events");   // Event emitter

// The self-written line reader reads the next line when it encounters a newline
class LineReader extends eventEmitter{
    constructor(path){
        super(a);this.rs = fs.createReadStream(path);    / / readable stream
        const RETURN = 13;  // \r
        const LINE = 10;    // \n
        let arr = [];  // Access the array of contents read per row
        // Listen for readable events to take advantage of the readable mode
        // Features: Read the highWaterMark by yourself. If the cup is empty, it will continue to read the highWaterMark until there is nothing left
        this.rs.on("readable", () = > {let char;
            // this.rs.read(1) returns a buffer
            // this.rs.read(1)[0] converts itself to base 10 by default
            while(char = this.rs.read(1)) {switch (char[0]) {
                    case RETURN:    // newLine returns data when \r\n is encountered
                        this.emit("newLine",Buffer.concat(arr).toString());
                        arr = [];  
                        // if the next item \r is not \n, put it in the array
                        if(this.rs.read(1) [0] !== LINE){
                            arr.push(char);
                        }
                        break;
                    case LINE:     // Under MAC there is no \r only \n
                        break;
                    default:
                        arr.push(char);
                        break; }}});this.rs.on("end", () = > {// Send the last piece of data
            this.emit("newLine",Buffer.concat(arr).toString()); }); }}// 1.txt contains two lines of data
/ / 0123456789
/ / 9876543210
let line = new LineReader("./1.txt");

line.on("newLine",data=>{
    console.log(data); }) output the result0123456789
9876543210
Copy the code

The use of multiple talents in actual development deepens the understanding and role of flow.