See a stream of people do not flow force, depends on your understanding of convection

Learning bottomless, forward mo

Today we’re going to talk about streams in Node.js. It does something you’ve probably seen with Node in your daily life, such as:

  • gulpPipe is a method of streaming that combines writable and readable streams to achieve a read and write method that does not occupy excess cache.
  • Res and REq in Express and KOA are also streams, res isStreams can be writtenThat is the reqA readable stream, both of which use sockets that encapsulate net modules in Node (Duplex flow, writable, readable stream).
  • .

Maybe a lot of times you know how to use it, but you don’t know how it works, and it’s awkward, like this, right

What is flow?

  • A stream is an ordered set of byte data transfers with a starting and ending point.
  • It does not care about the overall content of the file, only whether data is read from the file and what happens to it once it is read.
  • A stream is an abstract interface implemented by many objects in Node. For example, HTTP server request and Response objects are streams.
  • Flow is divided intoReadable(Readable stream),Writable(writable stream),Duplex(Duplex flow),Transform(Conversion flow)

What’s in the stream?

  • Binary mode: Each block is a buffer, string object.
  • Object patternInternally, a stream processes a series of common objects.

A readable stream

Readable flows are divided into flowing and Paused modes

parameter

  • path: Indicates the path of the file to be read
  • option:
    • highWaterMark: water mark, a byte readable at a time64k
    • flags: identifies what to do to open a file. The default isr
    • encoding: Encoding, default buffer
    • start: Index position to start reading
    • end: Index position to end reading (including end position)
    • autoClose: Indicates whether to close after reading. The default value is true
let ReadStream = require('./ReadStream')
// Read 64K by default
let rs = new ReadStream('./a.txt', {highWaterMark: 2.// The default read bytes are 64K
  flags: 'r'.// r indicates read and w indicates write
  autoClose: true.// Automatically close after reading
  start: 0.end: 5.// Stream is closed interval package start, also package end is read by default
  encoding: 'utf8' // The default encoding is buffer
})
Copy the code

methods

data: Switches to flow mode and flows out data

rs.on('data'.function (data) {
    console.log(data);
});
Copy the code

open: This listener is triggered when a stream opens a file

rs.on('open'.function () {
    console.log('File opened');
});
Copy the code

error: Listens for error messages when outgoing errors occur

rs.on('error'.function (err) {
    console.log(err);
});
Copy the code

end: The stream finishes reading, triggering end

rs.on('end'.function (err) {
    console.log('Read complete');
});
Copy the code

close: Closes the stream and triggers

rs.on('close'.function (err) {
    console.log('off');
});
Copy the code

pause: Pausing the flow (changing the flow’s flowing and not reading data);resume: Restoring the flow (changing the flow’s flowing and continuing to read data)

// Stop the flow after 2s
rs.on('data'.function (data) {
    rs.pause();
    console.log(data);
});
setTimeout(function () {
    rs.resume();
},2000);
Copy the code

fs.read()This method, the most native read method, is called at the bottom of the readable stream

//fd file descriptor, usually obtained from fs.open
// Buffer is the target of the cache where the data is placed after reading
//0, starting at position 0 of buffer
//BUFFER_SIZE, each time BUFFER_SIZE this length
//index, each time from the file index position read
//bytesRead, the actual number of reads
fs.read(fd,buffer,0,BUFFER_SIZE,index,function(err,bytesRead){})Copy the code

Let’s do one ourselveslovelyRead the stream!

let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
  constructor(path,options = {}){
    super(a)this.path = path
    this.highWaterMark = options.highWaterMark || 64*1024
    this.flags = options.flags || 'r'
    this.start = options.start || 0
    this.pos = this.start     // Will change with the read position
    this.autoClose = options.autoClose || true
    this.end = options.end || null
    // The default null is buffer
    this.encoding = options.encoding || null

    // Parameter problems
    this.flowing = null // Non-flow mode
    // Create a buffer to store data each time it is read
    this.buffer = Buffer.alloc(this.highWaterMark)
    // Open the file
    this.open()
    // All previous newListener events are called each time the on is set to listen for an event
    this.on('newListener',(type)=>{// Wait for him to listen for the data event
      if(type === 'data') {this.flowing = true
        // Start reading the data event that the client has been listening to
        this.read()
      }
    })
  }
  // By default, the first time the read method is called, the fd has not been fetched, so it cannot be read directly
  read(){
    if(typeof this.fd ! ='number') {// Wait until the open event is triggered before executing the read method
      return this.once('open', () = > {this.read()})
    }

    If there is no end, use highWaterMark. If there is more than highWaterMark, use highWaterMark. If there is less than highWaterMark, use End
    let howMuchToRead = this.end?Math.min(this.end - this.pos + 1.this.highWaterMark):this.highWaterMark
    fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,byteRead)=>{
      this.pos += byteRead
      let b = this.encoding?this.buffer.slice(0,byteRead).toString(this.encoding):this.buffer.slice(0,byteRead)
      this.emit('data',b)
      // If the number of bytes read is the same as highWaterMark, you need to read more
      if((byteRead === this.highWaterMark)&&this.flowing){
        this.read()
      }
      if(byteRead < this.highWaterMark){
        this.emit('end')
        this.destory()
      }
    })
  }
  destory(){
    if(typeof this.fd ! ='number') {return this.emit('close')}// Close the file if it has been opened and trigger the close event
    fs.close(this.fd,()=>{
      this.emit('close')
    })
  }
  pause(){
    this.flowing = false
  }
  resume(){
    this.flowing = true
    this.read()
  }
  open(){
    //fd represents the current this.path file, starting at 3 (type number).
    fs.open(this.path,this.flags,(err,fd)=>{
      // It is possible that the fd file does not exist and needs to be processed
      if(err){
        // If there is an automatic shutdown, help him destroy
        if(this.autoClose){
          // Destroy (close file, trigger close file event)
          this.destory()
        }
        // An error event is raised if there is an error
        this.emit('error',err)
        return
      }
      // Save the file descriptor
      this.fd = fd
      // The open event is raised when the file is successfully opened
      this.emit('open'.this.fd)
    })
  }
}
Copy the code

Readable

This method is a pause mode for the Readable stream. Its mode can be referenced as the reader is the person who is pouring water into the glass, and Readable is the person who is drinking water. There is a connection between them, and as soon as Readable has drunk a little water, the stream will continue to pour.

What is Readable?

  • It will fire the stream as soon as it starts listening for Readable, at which point it will read the data once, and thenThe stream listens, and if someone has read the stream (drank water) and reduced it, they will read it again (poured water).
  • It can mainly be used to doLineReader
let fs = require('fs')
let read = require('./ReadableStream')
let rs = fs.createReadStream('./a.txt', {
  // Read 7 at a time
  highWaterMark: 7
})
// If the first time the readstream has read all down and is smaller than the highWaterMark, it will read again (emitting another readable event)
// If rs.read() is read once without arguments, it will be read again from the cache, null
If readable reads just right every time (that is, the rs.read() parameter is exactly the same as highWaterMark), it will keep firing readable events, and if it does not end up with the number it wants to drink, it will fire null first and drink the rest
// The readable event will also be tuned once by default when the cache is initially 0
rs.on('readable', () = > {let result = rs.read(2)
  console.log(result)
})
Copy the code

If you want to read one line of data at a time, you need to use itreadable)

let EventEmitter = require('events')
// On ('data') is used for full reading, and on('readable') is used for accurate reading
class LineReader extends EventEmitter {
  constructor(path) {
    super(a)this.rs = fs.createReadStream(path)
    // The carriage return hexadecimal
    let RETURN = 0x0d
    // Newline hexadecimal
    let LINE = 0x0a
    let arr = []
    this.on('newListener', (type) => {
      if (type === 'newLine') {
        this.rs.on('readable', () = > {let char
          // Read one at a time, and return null when finished, terminating the loop
          while (char = this.rs.read(1)) {
            switch (char[0]) {
              case RETURN:
                break;
              // On Mac, there are only newlines. On Windows, there are carriage returns and newlines, depending on the conversion. Because I have a Mac here
              case LINE:
                // Convert an array to a string if it is a newline character
                let r = Buffer.from(arr).toString('utf8')
                // Empty the array
                arr.length = 0
                // Trigger the newLine event to output a line of data
                this.emit('newLine', r)
                break;
              default:
                // If it is not a newline character, it is put into the array
                arr.push(char[0])}}})// The last line is not followed by a newline, so special processing is required. The end event is triggered when the read stream has finished reading
    this.rs.on('end', () = > {// Take the last line of data and convert it to a string
      let r = Buffer.from(arr).toString('utf8')
      arr.length = 0
      this.emit('newLine', r)
    })
  }
}

let lineReader = new LineReader('./a.txt')
lineReader.on('newLine'.function (data) {
  console.log(data)
})
Copy the code

So how exactly does Readable exist? Let’s implement his source code and see what’s going on inside

let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
  constructor(path,options = {}){
    super(a)this.path = path
    this.highWaterMark = options.highWaterMark || 64*1024
    this.flags = options.flags || 'r'
    this.start = options.start || 0
    this.pos = this.start     // Will change with the read position
    this.autoClose = options.autoClose || true
    this.end = options.end || null
    // The default null is buffer
    this.encoding = options.encoding || null

    // Parameter problems
    this.reading = false // Non-flow mode
    // Create a buffer to store data each time it is read
    this.buffers = []
    // Cache length
    this.len = 0
    // Whether to emit a readable event
    this.emittedReadable = false
    // Trigger open to get the fd identifier of the file
    this.open()
    // All previous newListener events are called each time the on is set to listen for an event
    this.on('newListener',(type)=>{// Wait for him to listen for the data event
      if(type === 'readable') {// Start reading the data event that the client has been listening to
        this.read()
      }
    })
  }
  // Readable will be the method in the real source code that calculates the nearest power of 2 to n
  computeNewHighWaterMark(n) {
    n--;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    n++;
    return n;
  }
  read(n){
    // When the number of reads is greater than the horizontal line, it takes the greater and nearest number by raising it to the power of 2
    if(this.len < n){
      this.highWaterMark = this.computeNewHighWaterMark(n)
      // Trigger readbale callback again, so null is triggered for the first time
      this.emittedReadable = true
      // re-read the new water level
      this._read()
    }
    // Actually read
    let buffer = null
    // There are so many items in the cache
    if(n>0 && n<=this.len){
      // Define a buffer
      buffer = Buffer.alloc(n)
      let buf
      let flag = true
      let index = 0
      / / [buffer < 1, 2, 3, 4 >, buffer < 1, 2, 3, 4 >, buffer < 1, 2, 3, 4 >]
      // Fetch the first buffer before the cache
      while(flag && (buf = this.buffers.shift())){
        for(let i=0; i<buf.length; i++){// Put the data from the extracted buffer into the newly defined buffer
          buffer[index++] = buf[i]
          // Stop the loop when the buffer length is the same as n
          if(index === n){
            flag = false
            // Maintain the cache, because it is possible that the buffer size in the cache is larger than n, when n is removed, there will be other buffers left, we need to cut the BUF and put it before the cache array
            this.len -= n
            let r = buf.slice(i+1)
            if(r.length){
              this.buffers.unshift(r)
            }
            break}}}}// If there is nothing in the cache, the readable event will need to be emitted after reading
    // There will be a situation where if each readableread is exactly equal to the highWaterMark, the readableevent will be equal to 0 each time, the readableevent will be emitted each time, the read will continue, and null will be emitted at the end
    if(this.len === 0) {this.emittedReadable = true
    }
    if(this.len < this.highWaterMark){
      // Start reading by default
      if(!this.reading){
        this.reading = true
        // True multi-read operation
        this._read()
      }
    }
    return buffer&&buffer.toString()
  }
  _read(){
    if(typeof this.fd ! ='number') {// Wait until the open event is triggered before executing the read method
      return this.once('open', () = > {this._read()})
    }
    // Read this buffer first
    let buffer = Buffer.alloc(this.highWaterMark)
    fs.read(this.fd,buffer,0,buffer.length,this.pos,(err,byteRead)=>{
      if(byteRead > 0) {// Change the reading state after reading the data for the first time. If the read event is triggered, _read may be triggered a second time
        this.reading = false
        // Increase the cache size each time data is read
        this.len += byteRead
        // After each read, the start position of the read file is added
        this.pos += byteRead
        // Place the read buffer in buffers
        this.buffers.push(buffer.slice(0,byteRead))
        / / triggers readable
        if(this.emittedReadable){
          this.emittedReadable = false
          By default, the cup starts full
          this.emit('readable')}}else{
        // Start the end event without reading
        this.emit('end')
      }
    })
  }
  destory(){
    if(typeof this.fd ! ='number') {return this.emit('close')}// Close the file if it has been opened and trigger the close event
    fs.close(this.fd,()=>{
      this.emit('close')
    })
  }
  open(){
    //fd represents the current this.path file, starting at 3 (type number).
    fs.open(this.path,this.flags,(err,fd)=>{
      // It is possible that the fd file does not exist and needs to be processed
      if(err){
        // If there is an automatic shutdown, help him destroy
        if(this.autoClose){
          // Destroy (close file, trigger close file event)
          this.destory()
        }
        // An error event is raised if there is an error
        this.emit('error',err)
        return
      }
      // Save the file descriptor
      this.fd = fd
      // The open event is raised when the file is successfully opened
      this.emit('open'.this.fd)
    })
  }
}
Copy the code
  • The difference between Readable and read stream data is that Readable can control how much and how many times it reads from the cache, whereas Data will clear the cache each time it reads and output as much as it reads
  • We can look at the following example
let rs = fs.createReadStream('./a.txt')
rs.on('data',(data)=>{
  console.log(data)
})
// Because the above data event read the data, clear the cache. As a result, the following readable will be read as NULL
rs.on('readable', () = > {let result = r.read(1)
  console.log(result)
})
Copy the code

Custom readable streams

becausecreateReadStreamInternally calledReadStreamClass,ReadStreamAnd to achieve theReadableInterface,ReadStreamTo achieve the_read()Method, so we do a custom class inheritancestreamThe moduleReadableAnd, inThe prototypeCreate a custom one_read()You can customize your own readable stream

let { Readable } = require('stream')

class MyRead extends Readable{
  // The stream needs a _read method. What is pushed in the method, and what is received outside
  _read(){
    // The push method, like the push method in _read above, puts data into the cache
    this.push('100')
    // If null is pushed, there is nothing left to read, so stop.
    this.push(null)}}Copy the code

Streams can be written

  • If the file does not exist, it will be created, if the content is cleared
  • This will be printed when the highWaterMark is read
  • The first time it’s actually written to the file is actually written to the cache and fetched from the cache

Parameters (similar to readable stream)

  • path: Indicates the path of the file to be written
  • option:
    • highWaterMark: water line. Bytes that can be written to the cache at a time64k
    • flags: identifies the operation to be done to write to the file. The default isw
    • encoding: Encoding, default buffer
    • start: Index position to start writing
    • end: Index position where the write ends (including the end position)
    • autoClose: Indicates whether to close after writing. The default value is true
let ReadStream = require('./ReadStream')
// Read 64K by default
let rs = new ReadStream('./a.txt', {highWaterMark: 2.// The default read bytes are 64K
  flags: 'r'.// r indicates read and w indicates write
  autoClose: true.// Automatically close after reading
  start: 0.end: 5.// Stream is closed interval package start, also package end is read by default
  encoding: 'utf8' // The default encoding is buffer
})
Copy the code

methods

write

let fs = require('fs')
let ws = fs.createWriteStream('./d.txt', {flags: 'w'.encoding: 'utf8'.start: 0.// Write highWaterMark is only used to trigger if the watermark is dry
  highWaterMark: 3 // Write is 16K by default
})
// Return Boolean every time write eats a steamed bread in ws when the number of steamed bread eaten reaches highWaterMark returns false and the remaining states return true
// Write can only hold string or buffer
flag = ws.write('1'.'utf8', () = > {console.log(i)
})
Copy the code

drain

// Drain will only trigger callback if the drain is filled to the top of the highWaterMark (including memory, underground)
ws.on('drain', () = > {console.log('dry')})Copy the code

fs.write()This method, the most native read method, is called at the bottom of the readable stream

// WFD file descriptor, usually obtained from fs.open
//buffer, the cache source to fetch data from
// start at position 0 of buffer
//BUFFER_SIZE, each BUFFER_SIZE length
//index, the index position of the file each time
//bytesRead, the actual number of writes
fs.write(wfd,buffer,0,bytesRead,index,function(err,bytesWrite){})Copy the code

Through code

let fs = require('fs')
let EventEmitter = require('events')
If len exceeds the highWaterMark, false is returned indicating that drain is too heavy for the cache
If there is nothing left in the cache to write, needDrain is used to determine whether a dry point is triggered
class WriteStream extends EventEmitter{
  constructor(path,options = {}){
    super(a)this.path = path
    this.highWaterMark = options.highWaterMark || 64*1024
    this.flags = options.flags || 'r'
    this.start = options.start || 0
    this.pos = this.start
    this.autoClose = options.autoClose || true
    this.mode = options.mode || 0o666
    // The default null is buffer
    this.encoding = options.encoding || null

    // Open the file
    this.open()

    // What parameters are needed to write the file
    // The first time he writes to the highWaterMark, he writes it to the file and then puts it in the cache
    this.writing = false
    // Cache the array
    this.cache = []
    this.callbackList = []
    // Array length
    this.len = 0
    // Whether the drain event is emitted
    this.needDrain = false
  }

  clearBuffer(){
    // take the top one in the cache
    let buffer = this.cache.shift()
    if(buffer){
      // If there is a buffer
      this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer(),buffer.callback)
    }else{
      // If not, see if it is needed
      if(this.needDrain){
        // Fire drain and initialize all states
        this.writing = false
        this.needDrain = false
        this.callbackList.shift()()
        this.emit('drain')}this.callbackList.map(v= >{
        v()
      })
      this.callbackList.length = 0
    }
  }
  _write(chunk,encoding,clearBuffer,callback){
    // Since the write method is called synchronously, the fd may not be available yet
    if(typeof this.fd ! ='number') {// Register a one-time event directly on the open time object that will be called when the open is emitted
      return this.once('open', () = >this._write(chunk,encoding,clearBuffer,callback))
    }
    fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWrite)=>{
      this.pos += byteWrite
      // Reduce the amount of memory per write
      this.len -= byteWrite
      if(callback) this.callbackList.push(callback)
      // Write the first time
      clearBuffer()
      
    })
  }

  // Write method
  write(chunk,encoding=this.encoding,callback){
    Chunk must be a string or buffer for uniformitychunk = Buffer.isBuffer(chunk)? chunk:Buffer.from(chunk,encoding)// Maintain cache length 3
    this.len += chunk.length
    let ret = this.len < this.highWaterMark
    if(! ret){// indicates that the drain event is emitted
      this.needDrain = true
    }
    // What is being written should be put into memory
    if(this.writing){
      this.cache.push({
        chunk,
        encoding,
        callback
      })
    }else{
      // This is the first time I wrote it
      this.writing = true
      // a special way to implement writing
      this._write(chunk,encoding,()=>this.clearBuffer(),callback)
    }
    // console.log(ret)
    // Can I continue to write false means that the next time I write more memory
    return ret
  }

  destory(){
    if(typeof this.fd ! ='number') {return this.emit('close')}// Close the file if it has been opened and trigger the close event
    fs.close(this.fd,()=>{
      this.emit('close')
    })
  }
  open(){
    //fd represents the current this.path file, starting at 3 (type number).
    fs.open(this.path,this.flags,(err,fd)=>{
      // It is possible that the fd file does not exist and needs to be processed
      if(err){
        // If there is an automatic shutdown, help him destroy
        if(this.autoClose){
          // Destroy (close file, start close file)
          this.destory()
        }
        // An error event is raised if there is an error
        this.emit('error',err)
        return
      }
      // Save the file descriptor
      this.fd = fd
      // The open event is raised when the file is successfully opened
      this.emit('open'.this.fd)
    })
  }
}
Copy the code

Custom writable streams

becausecreateWriteStreamInternally calledWriteStreamClass,WriteStreamAnd to achieve theWritableInterface,WriteStreamTo achieve the_write()Method, so we do a custom class inheritancestreamThe moduleWritableAnd, inThe prototypeCreate a custom one_write()You can customize your own writable stream

let { Writable } = require('stream')

class MyWrite extends Writable{
  _write(chunk,encoding,callback){
    // The first argument to write(), the data to be written
    console.log(chunk)
    // This callback, which is equivalent to our clearBuffer method above, will not continue to fetch writes from the cache if callback is not executed
    callback()
  }
}

let write = new MyWrite()
write.write('1'.'utf8', () = > {console.log('ok')})Copy the code

pipe

Pipe flow is a method on readable stream. As for why it is put here, it is mainly because of the basic knowledge of two streams. It is a transmission mode of readable stream and writable stream. If you use pipe, you will read more and write less, consuming more memory. If you use PIPE, you will always use the specified memory.

usage

let fs = require('fs')
// Pipe method called pipe can control the rate
let rs = fs.createReadStream('./d.txt', {highWaterMark: 4
})
let ws = fs.createWriteStream('./e,txt', {highWaterMark: 1
})
// On ('data') will read the data and write it to the file using the ws.write method
// Call a method written to return a Boolean type
// If false is returned, rs's pause method is called to pause reading
// Wait for writable stream to finish writing before listening drain resume rs
rs.pipe(ws) // Controls the rate to prevent flooding of available memory
Copy the code

Implement it yourself

let fs = require('fs')
// These are ReadStream and WriteStream
let ReadStream = require('./ReadStream')
let WriteStream = require('./WriteStream')

// If you use the original read and write method, you will read more and write less, which consumes memory
ReadStream.prototype.pipe = function(dest){
  this.on('data',(data)=>{
    let flag = dest.write(data)
    // Stop reading if your mouth is full while writing
    if(! flag){this.pause()
    }
  })
  // If you run out of food while writing, you will continue to read
  dest.on('drain', () = > {this.resume()
  })
  this.on('end', () = > {this.destory()
    // Clear the cache
    fs.fsync(dest.fd,()=>{
      dest.destory()
    })
  })
}
Copy the code

Duplex flow

With duplex flow, we can implement both readable and writable interfaces on the same object, as if inheriting both interfaces. Importantly, the readability and writability operations of a duplex stream are completely independent of each other. This is simply combining two features into one object.

let { Duplex } = require('stream')
// Duplex flow, readable and writable
class MyDuplex extends Duplex{
  _read(){
    this.push('hello')
    this.push(null)
  }
  _write(chunk,encoding,clearBuffer){
    console.log(chunk)
    clearBuffer()
  }
}

let myDuplex = new MyDuplex()
//process.stdin is a readable stream in node's process process that listens for input from the command line
//process.stdout is a writable stream in node's own process, which is listened to and printed on the command line
// This means that the command line will print hello, and then whatever we type will be printed out as a buffer.
process.stdin.pipe(myDuplex).pipe(process.stdout)
Copy the code

Transformation flows

The output of the transformation flow is computed from the input. For the transformation flow, we don’t have to implement a read or write method, we just need to implement a transform method that combines the two. It stands for write method, and we can also use it to push data.

let { Transform } = require('stream')

class MyTransform extends Transform{
  _transform(chunk,encoding,callback){
    console.log(chunk.toString().toUpperCase())
    callback()
  }
}
let myTransform = new MyTransform()


class MyTransform2 extends Transform{
  _transform(chunk,encoding,callback){
    console.log(chunk.toString().toUpperCase())
    this.push('1')
    // this.push(null)
    callback()
  }
}
let myTransform2 = new MyTransform2()

// myTransform2 is then triggered as a writable stream _transform, which outputs input uppercase characters and pushes characters through the readable stream to the next transform stream
// The transform value is triggered when writing to the pipe, so the chunk held by the pipe is the value of the previous push
process.stdin.pipe(myTransform2).pipe(myTransform)
Copy the code

conclusion

A readable stream

  • Under flowing mode, readable streams automatically read data from the underlying system and provided it to the application as quickly as possible through events from the EventEmitter interface.
  • In paused mode, the stream.read() method must be explicitly called to read a piece of data from the stream.
  • All of the Readable streams that initially worked in Paused mode could be switched to flowing mode in one of three ways:
    • Listen for the ‘data’ event
    • Call the stream.resume() method
    • Call the stream.pipe() method to send data to Writable
  • A readable stream can be switched to paused mode by:
    • If no pipe destination exists, it can be implemented by calling stream.pause().
    • If there are pipe targets, this can be done by unlistening the ‘data’ event and calling the stream.unpipe() method to remove all pipe targets.

Streams can be written

  • Need to know only inmouthIt’s actually full, and it doesn’t trigger until you’ve eaten all the steamed bread (in the cache) in your mouth and on the grounddrainThe event
  • The first write is written directly to the file, followed by one from the cache

Duplex flow

  • It is simply an application of a writable and readable stream, either as a readable stream or as a writable stream, and as a readable or writable timeisolationthe

Transformation flows

  • Conversion flows are typically input and output, and typically enter only when a write operation is triggered_transformMethods. The difference with duplex flow is that it is readable and writable intogether.

Okay, that’s it. You’re gonna beflowLord