See a stream of people do not flow force, depends on your understanding of convection
Learning bottomless, forward mo
Today we’re going to talk about streams in Node.js. It does something you’ve probably seen with Node in your daily life, such as:
gulp
Pipe is a method of streaming that combines writable and readable streams to achieve a read and write method that does not occupy excess cache.- Res and REq in Express and KOA are also streams, res is
Streams can be written
That is the reqA readable stream
, both of which use sockets that encapsulate net modules in Node (Duplex flow
, writable, readable stream). - .
Maybe a lot of times you know how to use it, but you don’t know how it works, and it’s awkward, like this, right
What is flow?
- A stream is an ordered set of byte data transfers with a starting and ending point.
- It does not care about the overall content of the file, only whether data is read from the file and what happens to it once it is read.
- A stream is an abstract interface implemented by many objects in Node. For example, HTTP server request and Response objects are streams.
- Flow is divided into
Readable
(Readable stream),Writable
(writable stream),Duplex
(Duplex flow),Transform
(Conversion flow)
What’s in the stream?
Binary mode
: Each block is a buffer, string object.Object pattern
Internally, a stream processes a series of common objects.
A readable stream
Readable flows are divided into flowing and Paused modes
parameter
path
: Indicates the path of the file to be readoption
:highWaterMark
: water mark, a byte readable at a time64k
flags
: identifies what to do to open a file. The default isr
encoding
: Encoding, default bufferstart
: Index position to start readingend
: Index position to end reading (including end position)autoClose
: Indicates whether to close after reading. The default value is true
let ReadStream = require('./ReadStream')
// Read 64K by default
let rs = new ReadStream('./a.txt', {highWaterMark: 2.// The default read bytes are 64K
flags: 'r'.// r indicates read and w indicates write
autoClose: true.// Automatically close after reading
start: 0.end: 5.// Stream is closed interval package start, also package end is read by default
encoding: 'utf8' // The default encoding is buffer
})
Copy the code
methods
data
: Switches to flow mode and flows out data
rs.on('data'.function (data) {
console.log(data);
});
Copy the code
open
: This listener is triggered when a stream opens a file
rs.on('open'.function () {
console.log('File opened');
});
Copy the code
error
: Listens for error messages when outgoing errors occur
rs.on('error'.function (err) {
console.log(err);
});
Copy the code
end
: The stream finishes reading, triggering end
rs.on('end'.function (err) {
console.log('Read complete');
});
Copy the code
close
: Closes the stream and triggers
rs.on('close'.function (err) {
console.log('off');
});
Copy the code
pause
: Pausing the flow (changing the flow’s flowing and not reading data);resume
: Restoring the flow (changing the flow’s flowing and continuing to read data)
// Stop the flow after 2s
rs.on('data'.function (data) {
rs.pause();
console.log(data);
});
setTimeout(function () {
rs.resume();
},2000);
Copy the code
fs.read()
This method, the most native read method, is called at the bottom of the readable stream
//fd file descriptor, usually obtained from fs.open
// Buffer is the target of the cache where the data is placed after reading
//0, starting at position 0 of buffer
//BUFFER_SIZE, each time BUFFER_SIZE this length
//index, each time from the file index position read
//bytesRead, the actual number of reads
fs.read(fd,buffer,0,BUFFER_SIZE,index,function(err,bytesRead){})Copy the code
Let’s do one ourselveslovely
Read the stream!
let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
constructor(path,options = {}){
super(a)this.path = path
this.highWaterMark = options.highWaterMark || 64*1024
this.flags = options.flags || 'r'
this.start = options.start || 0
this.pos = this.start // Will change with the read position
this.autoClose = options.autoClose || true
this.end = options.end || null
// The default null is buffer
this.encoding = options.encoding || null
// Parameter problems
this.flowing = null // Non-flow mode
// Create a buffer to store data each time it is read
this.buffer = Buffer.alloc(this.highWaterMark)
// Open the file
this.open()
// All previous newListener events are called each time the on is set to listen for an event
this.on('newListener',(type)=>{// Wait for him to listen for the data event
if(type === 'data') {this.flowing = true
// Start reading the data event that the client has been listening to
this.read()
}
})
}
// By default, the first time the read method is called, the fd has not been fetched, so it cannot be read directly
read(){
if(typeof this.fd ! ='number') {// Wait until the open event is triggered before executing the read method
return this.once('open', () = > {this.read()})
}
If there is no end, use highWaterMark. If there is more than highWaterMark, use highWaterMark. If there is less than highWaterMark, use End
let howMuchToRead = this.end?Math.min(this.end - this.pos + 1.this.highWaterMark):this.highWaterMark
fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,byteRead)=>{
this.pos += byteRead
let b = this.encoding?this.buffer.slice(0,byteRead).toString(this.encoding):this.buffer.slice(0,byteRead)
this.emit('data',b)
// If the number of bytes read is the same as highWaterMark, you need to read more
if((byteRead === this.highWaterMark)&&this.flowing){
this.read()
}
if(byteRead < this.highWaterMark){
this.emit('end')
this.destory()
}
})
}
destory(){
if(typeof this.fd ! ='number') {return this.emit('close')}// Close the file if it has been opened and trigger the close event
fs.close(this.fd,()=>{
this.emit('close')
})
}
pause(){
this.flowing = false
}
resume(){
this.flowing = true
this.read()
}
open(){
//fd represents the current this.path file, starting at 3 (type number).
fs.open(this.path,this.flags,(err,fd)=>{
// It is possible that the fd file does not exist and needs to be processed
if(err){
// If there is an automatic shutdown, help him destroy
if(this.autoClose){
// Destroy (close file, trigger close file event)
this.destory()
}
// An error event is raised if there is an error
this.emit('error',err)
return
}
// Save the file descriptor
this.fd = fd
// The open event is raised when the file is successfully opened
this.emit('open'.this.fd)
})
}
}
Copy the code
Readable
This method is a pause mode for the Readable stream. Its mode can be referenced as the reader is the person who is pouring water into the glass, and Readable is the person who is drinking water. There is a connection between them, and as soon as Readable has drunk a little water, the stream will continue to pour.
What is Readable?
- It will fire the stream as soon as it starts listening for Readable, at which point it will read the data once, and then
The stream listens, and if someone has read the stream (drank water) and reduced it, they will read it again (poured water).
- It can mainly be used to do
LineReader
let fs = require('fs')
let read = require('./ReadableStream')
let rs = fs.createReadStream('./a.txt', {
// Read 7 at a time
highWaterMark: 7
})
// If the first time the readstream has read all down and is smaller than the highWaterMark, it will read again (emitting another readable event)
// If rs.read() is read once without arguments, it will be read again from the cache, null
If readable reads just right every time (that is, the rs.read() parameter is exactly the same as highWaterMark), it will keep firing readable events, and if it does not end up with the number it wants to drink, it will fire null first and drink the rest
// The readable event will also be tuned once by default when the cache is initially 0
rs.on('readable', () = > {let result = rs.read(2)
console.log(result)
})
Copy the code
If you want to read one line of data at a time, you need to use itreadable
)
let EventEmitter = require('events')
// On ('data') is used for full reading, and on('readable') is used for accurate reading
class LineReader extends EventEmitter {
constructor(path) {
super(a)this.rs = fs.createReadStream(path)
// The carriage return hexadecimal
let RETURN = 0x0d
// Newline hexadecimal
let LINE = 0x0a
let arr = []
this.on('newListener', (type) => {
if (type === 'newLine') {
this.rs.on('readable', () = > {let char
// Read one at a time, and return null when finished, terminating the loop
while (char = this.rs.read(1)) {
switch (char[0]) {
case RETURN:
break;
// On Mac, there are only newlines. On Windows, there are carriage returns and newlines, depending on the conversion. Because I have a Mac here
case LINE:
// Convert an array to a string if it is a newline character
let r = Buffer.from(arr).toString('utf8')
// Empty the array
arr.length = 0
// Trigger the newLine event to output a line of data
this.emit('newLine', r)
break;
default:
// If it is not a newline character, it is put into the array
arr.push(char[0])}}})// The last line is not followed by a newline, so special processing is required. The end event is triggered when the read stream has finished reading
this.rs.on('end', () = > {// Take the last line of data and convert it to a string
let r = Buffer.from(arr).toString('utf8')
arr.length = 0
this.emit('newLine', r)
})
}
}
let lineReader = new LineReader('./a.txt')
lineReader.on('newLine'.function (data) {
console.log(data)
})
Copy the code
So how exactly does Readable exist? Let’s implement his source code and see what’s going on inside
let fs = require('fs')
let EventEmitter = require('events')
class ReadStream extends EventEmitter{
constructor(path,options = {}){
super(a)this.path = path
this.highWaterMark = options.highWaterMark || 64*1024
this.flags = options.flags || 'r'
this.start = options.start || 0
this.pos = this.start // Will change with the read position
this.autoClose = options.autoClose || true
this.end = options.end || null
// The default null is buffer
this.encoding = options.encoding || null
// Parameter problems
this.reading = false // Non-flow mode
// Create a buffer to store data each time it is read
this.buffers = []
// Cache length
this.len = 0
// Whether to emit a readable event
this.emittedReadable = false
// Trigger open to get the fd identifier of the file
this.open()
// All previous newListener events are called each time the on is set to listen for an event
this.on('newListener',(type)=>{// Wait for him to listen for the data event
if(type === 'readable') {// Start reading the data event that the client has been listening to
this.read()
}
})
}
// Readable will be the method in the real source code that calculates the nearest power of 2 to n
computeNewHighWaterMark(n) {
n--;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
n++;
return n;
}
read(n){
// When the number of reads is greater than the horizontal line, it takes the greater and nearest number by raising it to the power of 2
if(this.len < n){
this.highWaterMark = this.computeNewHighWaterMark(n)
// Trigger readbale callback again, so null is triggered for the first time
this.emittedReadable = true
// re-read the new water level
this._read()
}
// Actually read
let buffer = null
// There are so many items in the cache
if(n>0 && n<=this.len){
// Define a buffer
buffer = Buffer.alloc(n)
let buf
let flag = true
let index = 0
/ / [buffer < 1, 2, 3, 4 >, buffer < 1, 2, 3, 4 >, buffer < 1, 2, 3, 4 >]
// Fetch the first buffer before the cache
while(flag && (buf = this.buffers.shift())){
for(let i=0; i<buf.length; i++){// Put the data from the extracted buffer into the newly defined buffer
buffer[index++] = buf[i]
// Stop the loop when the buffer length is the same as n
if(index === n){
flag = false
// Maintain the cache, because it is possible that the buffer size in the cache is larger than n, when n is removed, there will be other buffers left, we need to cut the BUF and put it before the cache array
this.len -= n
let r = buf.slice(i+1)
if(r.length){
this.buffers.unshift(r)
}
break}}}}// If there is nothing in the cache, the readable event will need to be emitted after reading
// There will be a situation where if each readableread is exactly equal to the highWaterMark, the readableevent will be equal to 0 each time, the readableevent will be emitted each time, the read will continue, and null will be emitted at the end
if(this.len === 0) {this.emittedReadable = true
}
if(this.len < this.highWaterMark){
// Start reading by default
if(!this.reading){
this.reading = true
// True multi-read operation
this._read()
}
}
return buffer&&buffer.toString()
}
_read(){
if(typeof this.fd ! ='number') {// Wait until the open event is triggered before executing the read method
return this.once('open', () = > {this._read()})
}
// Read this buffer first
let buffer = Buffer.alloc(this.highWaterMark)
fs.read(this.fd,buffer,0,buffer.length,this.pos,(err,byteRead)=>{
if(byteRead > 0) {// Change the reading state after reading the data for the first time. If the read event is triggered, _read may be triggered a second time
this.reading = false
// Increase the cache size each time data is read
this.len += byteRead
// After each read, the start position of the read file is added
this.pos += byteRead
// Place the read buffer in buffers
this.buffers.push(buffer.slice(0,byteRead))
/ / triggers readable
if(this.emittedReadable){
this.emittedReadable = false
By default, the cup starts full
this.emit('readable')}}else{
// Start the end event without reading
this.emit('end')
}
})
}
destory(){
if(typeof this.fd ! ='number') {return this.emit('close')}// Close the file if it has been opened and trigger the close event
fs.close(this.fd,()=>{
this.emit('close')
})
}
open(){
//fd represents the current this.path file, starting at 3 (type number).
fs.open(this.path,this.flags,(err,fd)=>{
// It is possible that the fd file does not exist and needs to be processed
if(err){
// If there is an automatic shutdown, help him destroy
if(this.autoClose){
// Destroy (close file, trigger close file event)
this.destory()
}
// An error event is raised if there is an error
this.emit('error',err)
return
}
// Save the file descriptor
this.fd = fd
// The open event is raised when the file is successfully opened
this.emit('open'.this.fd)
})
}
}
Copy the code
- The difference between Readable and read stream data is that Readable can control how much and how many times it reads from the cache, whereas Data will clear the cache each time it reads and output as much as it reads
- We can look at the following example
let rs = fs.createReadStream('./a.txt')
rs.on('data',(data)=>{
console.log(data)
})
// Because the above data event read the data, clear the cache. As a result, the following readable will be read as NULL
rs.on('readable', () = > {let result = r.read(1)
console.log(result)
})
Copy the code
Custom readable streams
becausecreateReadStream
Internally calledReadStream
Class,ReadStream
And to achieve theReadable
Interface,ReadStream
To achieve the_read()
Method, so we do a custom class inheritancestream
The moduleReadable
And, inThe prototype
Create a custom one_read()
You can customize your own readable stream
let { Readable } = require('stream')
class MyRead extends Readable{
// The stream needs a _read method. What is pushed in the method, and what is received outside
_read(){
// The push method, like the push method in _read above, puts data into the cache
this.push('100')
// If null is pushed, there is nothing left to read, so stop.
this.push(null)}}Copy the code
Streams can be written
- If the file does not exist, it will be created, if the content is cleared
- This will be printed when the highWaterMark is read
- The first time it’s actually written to the file is actually written to the cache and fetched from the cache
Parameters (similar to readable stream)
path
: Indicates the path of the file to be writtenoption
:highWaterMark
: water line. Bytes that can be written to the cache at a time64k
flags
: identifies the operation to be done to write to the file. The default isw
encoding
: Encoding, default bufferstart
: Index position to start writingend
: Index position where the write ends (including the end position)autoClose
: Indicates whether to close after writing. The default value is true
let ReadStream = require('./ReadStream')
// Read 64K by default
let rs = new ReadStream('./a.txt', {highWaterMark: 2.// The default read bytes are 64K
flags: 'r'.// r indicates read and w indicates write
autoClose: true.// Automatically close after reading
start: 0.end: 5.// Stream is closed interval package start, also package end is read by default
encoding: 'utf8' // The default encoding is buffer
})
Copy the code
methods
write
let fs = require('fs')
let ws = fs.createWriteStream('./d.txt', {flags: 'w'.encoding: 'utf8'.start: 0.// Write highWaterMark is only used to trigger if the watermark is dry
highWaterMark: 3 // Write is 16K by default
})
// Return Boolean every time write eats a steamed bread in ws when the number of steamed bread eaten reaches highWaterMark returns false and the remaining states return true
// Write can only hold string or buffer
flag = ws.write('1'.'utf8', () = > {console.log(i)
})
Copy the code
drain
// Drain will only trigger callback if the drain is filled to the top of the highWaterMark (including memory, underground)
ws.on('drain', () = > {console.log('dry')})Copy the code
fs.write()
This method, the most native read method, is called at the bottom of the readable stream
// WFD file descriptor, usually obtained from fs.open
//buffer, the cache source to fetch data from
// start at position 0 of buffer
//BUFFER_SIZE, each BUFFER_SIZE length
//index, the index position of the file each time
//bytesRead, the actual number of writes
fs.write(wfd,buffer,0,bytesRead,index,function(err,bytesWrite){})Copy the code
Through code
let fs = require('fs')
let EventEmitter = require('events')
If len exceeds the highWaterMark, false is returned indicating that drain is too heavy for the cache
If there is nothing left in the cache to write, needDrain is used to determine whether a dry point is triggered
class WriteStream extends EventEmitter{
constructor(path,options = {}){
super(a)this.path = path
this.highWaterMark = options.highWaterMark || 64*1024
this.flags = options.flags || 'r'
this.start = options.start || 0
this.pos = this.start
this.autoClose = options.autoClose || true
this.mode = options.mode || 0o666
// The default null is buffer
this.encoding = options.encoding || null
// Open the file
this.open()
// What parameters are needed to write the file
// The first time he writes to the highWaterMark, he writes it to the file and then puts it in the cache
this.writing = false
// Cache the array
this.cache = []
this.callbackList = []
// Array length
this.len = 0
// Whether the drain event is emitted
this.needDrain = false
}
clearBuffer(){
// take the top one in the cache
let buffer = this.cache.shift()
if(buffer){
// If there is a buffer
this._write(buffer.chunk,buffer.encoding,()=>this.clearBuffer(),buffer.callback)
}else{
// If not, see if it is needed
if(this.needDrain){
// Fire drain and initialize all states
this.writing = false
this.needDrain = false
this.callbackList.shift()()
this.emit('drain')}this.callbackList.map(v= >{
v()
})
this.callbackList.length = 0
}
}
_write(chunk,encoding,clearBuffer,callback){
// Since the write method is called synchronously, the fd may not be available yet
if(typeof this.fd ! ='number') {// Register a one-time event directly on the open time object that will be called when the open is emitted
return this.once('open', () = >this._write(chunk,encoding,clearBuffer,callback))
}
fs.write(this.fd,chunk,0,chunk.length,this.pos,(err,byteWrite)=>{
this.pos += byteWrite
// Reduce the amount of memory per write
this.len -= byteWrite
if(callback) this.callbackList.push(callback)
// Write the first time
clearBuffer()
})
}
// Write method
write(chunk,encoding=this.encoding,callback){
Chunk must be a string or buffer for uniformitychunk = Buffer.isBuffer(chunk)? chunk:Buffer.from(chunk,encoding)// Maintain cache length 3
this.len += chunk.length
let ret = this.len < this.highWaterMark
if(! ret){// indicates that the drain event is emitted
this.needDrain = true
}
// What is being written should be put into memory
if(this.writing){
this.cache.push({
chunk,
encoding,
callback
})
}else{
// This is the first time I wrote it
this.writing = true
// a special way to implement writing
this._write(chunk,encoding,()=>this.clearBuffer(),callback)
}
// console.log(ret)
// Can I continue to write false means that the next time I write more memory
return ret
}
destory(){
if(typeof this.fd ! ='number') {return this.emit('close')}// Close the file if it has been opened and trigger the close event
fs.close(this.fd,()=>{
this.emit('close')
})
}
open(){
//fd represents the current this.path file, starting at 3 (type number).
fs.open(this.path,this.flags,(err,fd)=>{
// It is possible that the fd file does not exist and needs to be processed
if(err){
// If there is an automatic shutdown, help him destroy
if(this.autoClose){
// Destroy (close file, start close file)
this.destory()
}
// An error event is raised if there is an error
this.emit('error',err)
return
}
// Save the file descriptor
this.fd = fd
// The open event is raised when the file is successfully opened
this.emit('open'.this.fd)
})
}
}
Copy the code
Custom writable streams
becausecreateWriteStream
Internally calledWriteStream
Class,WriteStream
And to achieve theWritable
Interface,WriteStream
To achieve the_write()
Method, so we do a custom class inheritancestream
The moduleWritable
And, inThe prototype
Create a custom one_write()
You can customize your own writable stream
let { Writable } = require('stream')
class MyWrite extends Writable{
_write(chunk,encoding,callback){
// The first argument to write(), the data to be written
console.log(chunk)
// This callback, which is equivalent to our clearBuffer method above, will not continue to fetch writes from the cache if callback is not executed
callback()
}
}
let write = new MyWrite()
write.write('1'.'utf8', () = > {console.log('ok')})Copy the code
pipe
Pipe flow is a method on readable stream. As for why it is put here, it is mainly because of the basic knowledge of two streams. It is a transmission mode of readable stream and writable stream. If you use pipe, you will read more and write less, consuming more memory. If you use PIPE, you will always use the specified memory.
usage
let fs = require('fs')
// Pipe method called pipe can control the rate
let rs = fs.createReadStream('./d.txt', {highWaterMark: 4
})
let ws = fs.createWriteStream('./e,txt', {highWaterMark: 1
})
// On ('data') will read the data and write it to the file using the ws.write method
// Call a method written to return a Boolean type
// If false is returned, rs's pause method is called to pause reading
// Wait for writable stream to finish writing before listening drain resume rs
rs.pipe(ws) // Controls the rate to prevent flooding of available memory
Copy the code
Implement it yourself
let fs = require('fs')
// These are ReadStream and WriteStream
let ReadStream = require('./ReadStream')
let WriteStream = require('./WriteStream')
// If you use the original read and write method, you will read more and write less, which consumes memory
ReadStream.prototype.pipe = function(dest){
this.on('data',(data)=>{
let flag = dest.write(data)
// Stop reading if your mouth is full while writing
if(! flag){this.pause()
}
})
// If you run out of food while writing, you will continue to read
dest.on('drain', () = > {this.resume()
})
this.on('end', () = > {this.destory()
// Clear the cache
fs.fsync(dest.fd,()=>{
dest.destory()
})
})
}
Copy the code
Duplex flow
With duplex flow, we can implement both readable and writable interfaces on the same object, as if inheriting both interfaces. Importantly, the readability and writability operations of a duplex stream are completely independent of each other. This is simply combining two features into one object.
let { Duplex } = require('stream')
// Duplex flow, readable and writable
class MyDuplex extends Duplex{
_read(){
this.push('hello')
this.push(null)
}
_write(chunk,encoding,clearBuffer){
console.log(chunk)
clearBuffer()
}
}
let myDuplex = new MyDuplex()
//process.stdin is a readable stream in node's process process that listens for input from the command line
//process.stdout is a writable stream in node's own process, which is listened to and printed on the command line
// This means that the command line will print hello, and then whatever we type will be printed out as a buffer.
process.stdin.pipe(myDuplex).pipe(process.stdout)
Copy the code
Transformation flows
The output of the transformation flow is computed from the input. For the transformation flow, we don’t have to implement a read or write method, we just need to implement a transform method that combines the two. It stands for write method, and we can also use it to push data.
let { Transform } = require('stream')
class MyTransform extends Transform{
_transform(chunk,encoding,callback){
console.log(chunk.toString().toUpperCase())
callback()
}
}
let myTransform = new MyTransform()
class MyTransform2 extends Transform{
_transform(chunk,encoding,callback){
console.log(chunk.toString().toUpperCase())
this.push('1')
// this.push(null)
callback()
}
}
let myTransform2 = new MyTransform2()
// myTransform2 is then triggered as a writable stream _transform, which outputs input uppercase characters and pushes characters through the readable stream to the next transform stream
// The transform value is triggered when writing to the pipe, so the chunk held by the pipe is the value of the previous push
process.stdin.pipe(myTransform2).pipe(myTransform)
Copy the code
conclusion
A readable stream
- Under flowing mode, readable streams automatically read data from the underlying system and provided it to the application as quickly as possible through events from the EventEmitter interface.
- In paused mode, the stream.read() method must be explicitly called to read a piece of data from the stream.
- All of the Readable streams that initially worked in Paused mode could be switched to flowing mode in one of three ways:
- Listen for the ‘data’ event
- Call the stream.resume() method
- Call the stream.pipe() method to send data to Writable
- A readable stream can be switched to paused mode by:
- If no pipe destination exists, it can be implemented by calling stream.pause().
- If there are pipe targets, this can be done by unlistening the ‘data’ event and calling the stream.unpipe() method to remove all pipe targets.
Streams can be written
- Need to know only in
mouth
It’s actually full, and it doesn’t trigger until you’ve eaten all the steamed bread (in the cache) in your mouth and on the grounddrain
The event - The first write is written directly to the file, followed by one from the cache
Duplex flow
- It is simply an application of a writable and readable stream, either as a readable stream or as a writable stream, and as a readable or writable time
isolation
the
Transformation flows
- Conversion flows are typically input and output, and typically enter only when a write operation is triggered
_transform
Methods. The difference with duplex flow is that it is readable and writable intogether
.
Okay, that’s it. You’re gonna beflow
Lord