The concept of flow

  • A stream is an ordered set of byte data transfers with a starting and ending point
  • It does not care about the overall content of the file, only whether data is read from the file and what happens to it once it is read
  • A stream is an abstract interface implemented by many objects in Node. For example, HTTP server request and Response objects are streams.

There are four basic stream types in Node.js

  1. Readable-a stream that is Readable (for example, fs.createreadStream ()).
  2. Writable – Writable stream (for example fs.createWritestream ()).
  3. Duplex – A read-write stream (such as net.socket).
  4. Transform-transform stream A Duplex stream that can modify and Transform data during reading and writing (e.g. Zlib.createdeflate ())

Why use streams

If a file is read synchronously using fs.readfilesync, the program blocks and all data is written to memory. With fs.readfile, the program does not block, but all data is still written to memory at once and then read by the consumer. If the file is large, memory usage becomes an issue. In this case there is an advantage to being low. Instead of writing a stream to memory at once, it is written to a buffer and then read by the consumer instead of writing the entire file to memory, saving memory space.

1. When no stream is used, all files are written to the memory, and then the memory is written to the target file

Use and implementation of streams

A readable stream createReadStream

Use of readable streams

  1. Create a readable stream

    var rs = fs.createReadStream(path,[options]);
    Copy the code

    1.)path Read file path

    2.)options

    • Flags What to do to open a file. Default is ‘r’.
    • Encoding The default value is NULL
    • Start Indicates the index position to start reading
    • End Index position (including end position) to end reading
    • HighWaterMark reads the cache area with a default size of 64KB

    HighWaterMark should be larger than 3 bytes if utF8 is specified

  2. Listening for data events

    The stream switches to flow mode and the data is read as fast as possible

    rs.on('data'.function (data) {
        console.log(data);
    });
    Copy the code
  3. Listen for the End event

    This event is triggered after reading the data

    rs.on('end'.function () {
        console.log('Read complete');
    });
    Copy the code
  4. Listening for Error Events

    rs.on('error'.function (err) {
        console.log(err);
    });
    Copy the code
  5. Listening for close events

    Sets encoding as specified {encoding:’utf8′}

    rs.setEncoding('utf8');
    Copy the code
  6. Pause and resume triggering data

    Use pause() and resume()

    rs.on('data'.function (data) {
        rs.pause();
        console.log(data);
    });
    setTimeout(function() { rs.resume(); }, 2000);Copy the code

A simple implementation of readable streams

  1. Mock write readable stream
    let fs = require('fs');
    let EventEmitter = require('events');
    class ReadStream extends EventEmitter {
      constructor(path, options = {}) {
        super();
        this.path = path;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.autoClose = options.autoClose || true; this.start = options.start || 0; this.pos = this.start; / / pos with the position to change this. Read end = options. The end | | null; / / null said didn't pass this. Encoding = options. The encoding | | null; this.flags = options.flags ||'r'; // Argument problem this.flowing = null; This.buffer = buffer.alloc (this.highwatermark); this.open(); // {newListener:[fn]} // This. On ('newListener', (type) => {// Wait for it to listen for data eventsif (type= = ='data') {
            this.flowing = true; this.read(); // Start reading the client has listened to the data event}})}pause(){
        this.flowing = false;
      }
      resume(){
        this.flowing =true;
        this.read();
      }
      read(){// Defaults to the first callreadMethod has not yet fetched the FD, so it cannot be read directlyif(typeof this.fd ! = ='number') {return this.once('open',() => this.read()); // Wait until the open event is triggered and the fd must get itread} // start reading the file when the fd is acquired // start reading the file when the fd is acquired // start reading the file when the fd is acquired // Start reading the file when the fd is acquired // Start reading the file when the fd is acquired // Start reading the file when the fd is acquired // End is 4 //lethowMuchToRead = this.end? Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark; fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, ByteRead) => {this.pos += byteRead; // this. Buffer is three by defaultlet b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);
          this.emit('data', b);
          if ((byteRead === this.highWaterMark)&&this.flowing){
            returnthis.read(); } // There is no more logic hereif(byteRead < this.highwatermark){// There is no more this.watermark ('end'); This.destroy (); }}); } // Open the filedestroy() {
        if(typeof this.fd ! ='number') { return this.emit('close'); } fs.close(this.fd, () => {// If the file is opened, close the file and emit the close event this.emit('close');
        });
      }
      open() {fs.open(this.path, this.flags, (err, fd) => {//fd indicates the current this.path file, starting from 3 (number type).if (err) {
            if(this.autoclose) {// If you want to automatically close fd this.destroy(); // Destroy (close file, trigger close event)} this.emit('error', err); // An error event is raised if there is an errorreturn; } this.fd = fd; // Save the file descriptor this.emit('open', this.fd); // trigger file opening method}); } } module.exports = ReadStream;Copy the code
  2. validation
    let ReadStream = require('./ReadStream');
    let rs = new ReadStream('./2.txt', {highWaterMark: 3, // bytes flags:'r',
      autoClose:true// Start :0, //end:3,// Stream is a closed interval package start also contains end encoding:'utf8'}); // By default, we create a stream in non-flowing mode. By default, we do not read data. // We need to receive data.'error'.function (err) {
      console.log(err)
    });
    rs.on('open'.function () {
      console.log('File open'); }); Rs.emit () rs.emit('data');
    rs.on('data'.function(data) { console.log(data); rs.pause(); // Pause the trigger on('data') event, which converts the flow mode back to the non-flow mode});setTimeout(()=>{rs.resume()},5000)
    rs.on('end'.function () {
      console.log('Read and complete');
    });
    rs.on('close'.function () {
      console.log('off')});Copy the code

Writable stream createWriteStream

Use of writable streams

  1. Create writable streams

    var ws = fs.createWriteStream(path,[options]);
    Copy the code

    1.)path File path to be written

    2.)options

    • Flags What to do to open a file, default is ‘w’
    • Encoding Defaults to UTf8
    • HighWaterMark The default write buffer size is 16KB
  2. The write method

    ws.write(chunk,[encoding],[callback]);
    Copy the code

    1.)chunk Written data Buffer /string

    This parameter is optional when chunk is a string

    3.) Callback callback after successful write

    The return value is Boolean, false when the system cache is full, true when the system cache is not full

  3. End method

    ws.end(chunk,[encoding],[callback]);
    Copy the code

    Indicates that no data is to be written to Writable next. By passing the optional chunk and encoding arguments, one more piece of data can be written before closing the stream. If the optional callback function is passed, it will be used as a callback for the ‘Finish’ event

  4. Drain method of tapping

    • When a stream is not in drain state, the call to write() caches the data block and returns false. Once all currently cached data blocks are drained (accepted by the operating system for output), the ‘drain’ event is emitted

    • It is recommended that once write() returns false, no blocks of data can be written until the ‘drain’ event is emitted

    let fs = require('fs');
    let ws = fs.createWriteStream('./2.txt',{
      flags:'w',
      encoding:'utf8',
      highWaterMark:3
    });
    let i = 10;
    function write() {let  flag = true;
     while(i&&flag){
          flag = ws.write("1");
          i--;
         console.log(flag);
     }
    }
    write();
    ws.on('drain',()=>{
      console.log("drain");
      write();
    });
    Copy the code
  5. Finish method

    After the stream.end() method is called and the buffer data has been passed to the underlying system, the ‘Finish’ event is emitted

    var writer = fs.createWriteStream('./2.txt');
    for (let i = 0; i < 100; i++) {
      writer.write(`hello, ${i}! \n`); } writer.end('end \ n');
    writer.on('finish', () => {
      console.error('All writes are complete! ');
    });
    Copy the code

Simple implementation of writable flow

  1. Imitation writable flow
    let fs = require('fs');
    let EventEmitter = require('events');
    class WriteStream extends EventEmitter {
      constructor(path, options = {}) {
        super();
        this.path = path;
        this.flags = options.flags || 'w';
        this.encoding = options.encoding || 'utf8';
        this.start = options.start || 0;
        this.pos = this.start;
        this.mode = options.mode || 0o666;
        this.autoClose = options.autoClose || true; this.highWaterMark = options.highWaterMark || 16 * 1024; this.open(); // this. Writing = this. Writing = thisfalse; This. cache = []; this.cache = []; // Maintain a variable to indicate the cache length this.len = 0; // Whether the drain event is triggered this.needDrain =false;
      }
      clearBuffer() {
        let buffer = this.cache.shift();
        if(buffer) {// There is this._write(buffer.chunk, buffer.encoding, () => this.clearbuffer ()); }else{// There is nothing in the cacheif(this.needdrain) {// Need to trigger the drain event this.writing =false; NeedDrain = this. NeedDrain =false;
            this.emit('drain'); }}} _write(chunk, encoding, clearBuffer) {// Since the write method is called synchronously, the fd has not obtained the write method, so the write operation can be performed after obtaining itif(typeof this.fd ! ='number') {
          return this.once('open', () => this._write(chunk, encoding, clearBuffer)); } fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => { this.pos += byteWritten; this.len -= byteWritten; // Reduce clearBuffer() in memory after each write; Write (chunk, encoding = this.encoding) {// Write (chunk, encoding = this.encoding) { IsBuffer (chunk) = buffer. IsBuffer (chunk)? chunk : Buffer.from(chunk, encoding); this.len += chunk.length; // Maintain cache length 3let ret = this.len < this.highWaterMark;
        if(! ret) { this.needDrain =true; // the drain event needs to be emitted}if(this.writing) {this.cache.push({chunk, encoding,}); }else{// first time this. Writing = thistrue; this._write(chunk, encoding, () => this.clearBuffer()); // write specific methods}returnret; // Can you continue to write,falseThe next time you write it, you need to use more memory.destroy() {
        if(typeof this.fd ! ='number') {
          this.emit('close');
        } else {
          fs.close(this.fd, () => {
            this.emit('close'); }); }}open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
          if (err) {
            this.emit('error', err);
            if(this.autoClose) { this.destroy(); // Destroy file descriptor if autoclose}return;
          }
          this.fd = fd;
          this.emit('open', this.fd);
        });
      }
    }
    module.exports = WriteStream;
    Copy the code
  2. validation
    let WS = require('./WriteStream')
    let ws = new WS('./2.txt', {
      flags: 'w'HighWaterMark: 1 will be created if default file does not exist.'utf8'Start: 0, autoClose:true, // Automatic shutdown mode: 0o666, // readable and writable}); // Drain triggers only when the highWaterMark is filledlet i = 9;
    function write() {
      let flag = true;
      while (flag && i >= 0) {
        i--;
        flag = ws.write('111'); // 987 // 654 // 321 // 0
        console.log(flag)
      }
    }
    write();
    ws.on('drain'.function () {
      console.log('dry');
      write();
    });
    Copy the code

Pipe method

The pipe method stands for pipe and controls the rate

  • The RS’s on(‘data’) is listened for and the ws.write method is called on what is read
  • Calling the write method returns a Boolean type
  • If false is returned, rs.pause() is called to pause the read
  • On (‘drain’) resumes the use of the pipe method after the writable stream has been written
let fs = require('fs');
let rs = fs.createReadStream('./2.txt',{
  highWaterMark:1
});
let ws = fs.createWriteStream('./1.txt',{ highWaterMark:3 }); rs.pipe(ws); // Will control the rate (to prevent flooding free memory)Copy the code