1. Basic concepts
1.1. Historical evolution of flows
Streams are not a node.js specific concept. They are a couple of decades ago in the Unix operating system, the introduction of the program can through the pipeline operator (|) convection interact with each other.
The pipe operator (|) can be used on Unix-based MacOS as well as Linux to convert the output of processes on the left side of the operator into input on the right.
In Node, we use the traditional readFile to read the file from beginning to end into memory, and load the file into memory only after all the contents have been read.
There are two drawbacks to doing this:
- Memory: takes up a lot of memory
- Time aspect: Data needs to wait for the entire payload of data to load before processing begins
In order to solve these problems, Node.js emulates and implements the concept of flows. In Node.js flows, there are four types of flows, which are instances of EventEmitter in Node.js:
- Readable Stream
- Writable Stream
- Duplex Stream readable and writable
- Transform Stream
In order to learn more about this part and understand the concept of streams in Node.js step by step, and because the source code part is relatively complex, I decided to start with readable streams.
What is a Stream?
A stream is an abstract data structure, a collection of data, in which data types stored can only be of the following types (only if objectMode === false) :
- string
- Buffer
We can see the flow as the data collection, like liquid, let’s keep the liquid in a container (the flow of the internal buffer BufferList), when the corresponding event trigger, we put the inside of the liquid in the pipeline, and inform the others on the other side of the pipe to take your own container to pick up the inside of the liquid.
1.3. What is a Readable Stream?
A readable stream is a type of stream that has two modes and three states
Two read modes:
- Flow mode: Data is read from the underlying system and transmitted as quickly as possible to the registered event handler via EventEmitter
- Pause mode: In this mode no data will be read and the stream.read () method must be explicitly called to read data from the Stream
Three states:
- ReadableFlowing === NULL: No data is generated, and calls to stream.pipe (), stream.resume, which makes its state true, start generating data and actively trigger events
- ReadableFlowing === False: At that point, the flow of data was suspended, but not its generation, resulting in a data backlog
- ReadableFlowing === True: Normal data generation and consumption
2. Fundamentals
2.1. Internal State Definition (ReadableState)
ReadableState
_readableState: ReadableState {
objectMode: false.// To operate on data other than string, Buffer, and NULL requires this mode to be turned on
highWaterMark: 16384.// Water level limit, 1024 \* 16, default 16KB, will stop calling \_read() to read data into buffer if this limit is exceeded
buffer: BufferList { head: null.tail: null.length: 0 }, // Buffer linked list, used to store data
length: 0.// The size of the entire readable stream data, if objectMode is equal to buffer.length
pipes: [].// Save all pipe queues listening to the readable stream
flowing: null.// The state can be null, false, true
ended: false.// All data is consumed
endEmitted: false.// End The event is sent
reading: false.// Whether data is being read
constructed: true.// Streams cannot be destroyed until they are constructed or fail
sync: true.// Whether the 'readable'/'data' event will be emitted synchronously or wait for the next tick
needReadable: false.// Whether a readable event needs to be sent
emittedReadable: false.// The readable event has been sent
readableListening: false.// Whether there are readable listener events
resumeScheduled: false.// Whether the resume method was called
errorEmitted: false.// Error events have been sent
emitClose: true.// Whether to send a close event when the stream is destroyed
autoDestroy: true.// Auto-destruct, called after the 'end' event
destroyed: false.// Whether the stream has been destroyed
errored: null.// Indicates whether the stream is reporting an error
closed: false.// Whether the stream is closed
closeEmitted: false.// close Whether the event is sent
defaultEncoding: 'utf8'.// Default character encoding format
awaitDrainWriters: null.// reference to writer that listens for the 'drain' event of type NULL, Writable, Set
multiAwaitDrain: false.// Whether more than one writer waits for the drain event
readingMore: false.// Whether more data can be read
dataEmitted: false.// Data has been sent
decoder: null./ / decoder
encoding: null./ / encoder
[Symbol(kPaused)]: null
},
Copy the code
2.2. Internal Data Store implementation (BufferList)
BufferList is a container for streaming internal data. It is designed as a linked list with three attributes: head, tail, and Length.
So each of the nodes in the BufferList I’m going to represent as bufferNodes, and the type of the Data in the BufferList depends on objectMode.
This data structure gets the header data faster than array.prototype.shift ().
2.2.1. Data store type
ObjectMode === true:
So data can be of any type, what data is pushed and what data is stored
objectMode=true
const Stream = require('stream');
const readableStream = new Stream.Readable({
objectMode: true.read(){}}); readableStream.push({name: 'lisa'});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(true);
console.log(readableStream._readableState.buffer.tail);
readableStream.push('lisa');
console.log(readableStream._readableState.buffer.tail);
readableStream.push(Awesome!);
console.log(readableStream._readableState.buffer.tail);
readableStream.push(() = > {});
console.log(readableStream._readableState.buffer.tail);
readableStream.push(Symbol(1));
console.log(readableStream._readableState.buffer.tail);
readableStream.push(BigInt(123));
console.log(readableStream._readableState.buffer.tail);
Copy the code
Running results:
If objectMode === false:
So data can only be string or Buffer or Uint8Array
objectMode=false
const Stream = require('stream');
const readableStream = new Stream.Readable({
objectMode: false.read(){}}); readableStream.push({name: 'lisa'});
Copy the code
Running results:
2.2.2. Data storage structure
We create a readable stream from the Node command line at the console to observe changes in the buffer:
Of course we need to implement the _read method before pushing the data, or implement the read method in the constructor’s arguments:
const Stream = require('stream');
const readableStream = new Stream.Readable();
RS._read = function(size) {}
Copy the code
or
const Stream = require('stream');
const readableStream = new Stream.Readable({
read(size){}});Copy the code
After readablestream.push (‘ ABC ‘), the current buffer is:
As you can see from the current data store, both the beginning and end of the data store are ASCII characters of the string ‘ABC’. The type is Buffer. Length indicates the number of stored data items rather than the size of the data contents.
2.2.3. Relevant API
Print all the methods of BufferList to get:
Except for a join, which serializes a BufferList to a string, all operations are data access operations.
I won’t cover all of them here, but focus on consume, _getString, and _getBuffer.
2.2.3.1. consume
Source address: BufferList. Consume
comsume
// Consumes a specified amount of bytes or characters from the buffered data.
consume(n, hasStrings) {
const data = this.head.data;
if (n < data.length) {
// `slice` is the same for buffers and strings.
const slice = data.slice(0, n);
this.head.data = data.slice(n);
return slice;
}
if (n === data.length) {
// First chunk is a perfect match.
return this.shift();
}
// Result spans more than one buffer.
return hasStrings ? this.\_getString(n) : this.\_getBuffer(n);
}
Copy the code
The code has three judgment conditions:
-
If the byte length of data consumed is less than the length of data stored by the head node of the linked list, the first N bytes of data of the head node are taken, and the data of the current head node is set to the sliced data
-
If the data consumed is exactly equal to the length of the data stored in the head node of the list, the data of the current head node is returned directly
- If the length of the data consumed is greater than the length of the head node, a final determination is made based on the second argument passed to determine whether the current BufferList underlying stores a string or Buffer
2.2.3.2. _getBuffer
Source address: bufferlist._getbuffer
comsume
// Consumes a specified amount of bytes from the buffered data.
_getBuffer(n) {
const ret = Buffer.allocUnsafe(n);
const retLen = n;
let p = this.head;
let c = 0;
do {
const buf = p.data;
if (n > buf.length) {
TypedArrayPrototypeSet(ret, buf, retLen - n);
n -= buf.length;
} else {
if (n === buf.length) {
TypedArrayPrototypeSet(ret, buf, retLen - n);
++c;
if (p.next)
this.head = p.next;
else
this.head = this.tail = null;
} else {
TypedArrayPrototypeSet(ret,
new Uint8Array(buf.buffer, buf.byteOffset, n),
retLen - n);
this.head = p;
p.data = buf.slice(n);
}
break;
}
++c;
} while((p = p.next) ! = =null);
this.length -= c;
return ret;
}
Copy the code
Basically, loop through the linked list and create a new Buffer array to store the returned data.
The data is first fetched from the head node of the linked list and copied to the newly created Buffer until the data of a node is greater than or equal to the length to be fetched minus the length already obtained.
Or if the last node in the list has not reached the desired length, the newly created Buffer is returned.
2.2.3.3. _getString
Source address: bufferlist._getString
comsume
// Consumes a specified amount of characters from the buffered data.
_getString(n) {
let ret = ' ';
let p = this.head;
let c = 0;
do {
const str = p.data;
if (n > str.length) {
ret += str;
n -= str.length;
} else {
if (n === str.length) {
ret += str;
++c;
if (p.next)
this.head = p.next;
else
this.head = this.tail = null;
} else {
ret += StringPrototypeSlice(str, 0, n);
this.head = p;
p.data = StringPrototypeSlice(str, n);
}
break;
}
++c;
} while((p = p.next) ! = =null);
this.length -= c;
return ret;
}
Copy the code
The operation string is the same as the operation Buffer, which reads the data from the head of the list in a loop, except that the data is copied and stored differently. In addition, the _getString operation returns a string.
2.3. Why are Readable Streams instances of EventEmitter?
For that, it’s important to understand what publish-and-subscribe is. The publish-and-subscribe model plays an important role in most apis, from Promise to Redux, and advanced apis based on publish-and-subscribe implementations are everywhere.
Its advantage is that can store event callback function related to the queue, then at some point to notice to the other party to process the data, so as to achieve the separation of concerns, producers just production data and inform consumers, while consumers just corresponding events and their corresponding data processing, and the Node. Js flow pattern just accord with the characteristics.
How do Node.js streams create instances based on EventEmitter?
The source code is here: stream/ Legacy
legacy
function Stream(opts) {
EE.call(this, opts);
}
ObjectSetPrototypeOf(Stream.prototype, EE.prototype);
ObjectSetPrototypeOf(Stream, EE);
Copy the code
Then in the readable stream source there are several lines of code:
The source code for this is available here: readable
legacy
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
Copy the code
Start by inheriting Stream prototype objects from EventEmitter, so that all instances of Stream can access methods on EventEmitter.
In addition, ObjectSetPrototypeOf(Stream, EE) is used to inherit static methods from EventEmitter. In the Stream constructor, we use the EE constructor to inherit all properties from EventEmitter. Use the same method to implement the Stream class prototype inheritance and static property inheritance, so as to obtain:
Readable.prototype.__proto__ === Stream.prototype;
Stream.prototype.__proto__ === EE.prototype
Therefore:
Readable.prototype.__proto__.__proto__ === EE.prototype
Therefore, we can find the prototype of EventEmitter by tracing the prototype chain of the readable stream, and realize the inheritance of EventEmitter
2.4. Implementation of relevant APIS
The API is presented in the order in which it appears in the source documentation, and only the core API implementations are interpreted.
Note: only the functions declared in the Node.js readable stream source code are interpreted here, not the imported function definitions, and to save space, not all the code will be copied.
Readable.prototype
Stream {
destroy: [Function: destroy],
_undestroy: [Function: undestroy],
_destroy: [Function (anonymous)],
push: [Function (anonymous)],
unshift: [Function (anonymous)],
isPaused: [Function (anonymous)],
setEncoding: [Function (anonymous)],
read: [Function (anonymous)],
_read: [Function (anonymous)],
pipe: [Function (anonymous)],
unpipe: [Function (anonymous)],
on: [Function (anonymous)],
addListener: [Function (anonymous)],
removeListener: [Function (anonymous)],
off: [Function (anonymous)],
removeAllListeners: [Function (anonymous)],
resume: [Function (anonymous)],
pause: [Function (anonymous)],
wrap: [Function (anonymous)],
iterator: [Function (anonymous)],
[Symbol(nodejs.rejection)]: [Function (anonymous)],
[Symbol(Symbol.asyncIterator)]: [Function (anonymous)]
}
Copy the code
Against 2.4.1. Push
readable.push
Readable.prototype.push = function(chunk, encoding) {
return readableAddChunk(this, chunk, encoding, false);
};
Copy the code
The main purpose of the push method is to pass a block of data to a downstream pipe by firing a’ data’ event, or to store the data in its own buffer.
The following codes are related pseudocodes and only show the main process:
readable.push
function readableAddChunk(stream, chunk, encoding, addToFront) {
const state = stream.\_readableState;
if (chunk === null) { // push the null stream to terminate the signal, after which no more data can be written
state.reading = false;
onEofChunk(stream, state);
} else if(! state.objectMode) {// If not object mode
if (typeof chunk === 'string') {
chunk = Buffer.from(chunk);
} else if (chunk instanceof Buffer) { // If it is Buffer
// handle the encoding
} else if (Stream.\_isUint8Array(chunk)) {
chunk = Stream.\_uint8ArrayToBuffer(chunk);
} else if(chunk ! =null) {
err = new ERR\_INVALID\_ARG\_TYPE('chunk'['string'.'Buffer'.'Uint8Array'], chunk); }}if (state.objectMode || (chunk && chunk.length > 0)) { // Is the object mode or chunk is the Buffer
// We omit several data insertion methods here
addChunk(stream, state, chunk, true); }}function addChunk(stream, state, chunk, addToFront) {
if (state.flowing && state.length === 0 && !state.sync &&
stream.listenerCount('data') > 0) { // If in flow mode, there are subscribers listening for data
stream.emit('data', chunk);
} else { // Otherwise save the data to the buffer
state.length += state.objectMode ? 1 : chunk.length;
if (addToFront) {
state.buffer.unshift(chunk);
} else {
state.buffer.push(chunk);
}
}
maybeReadMore(stream, state); // Try to read more data
}
Copy the code
Push operation is mainly divided into objectMode judgment. Different types will perform different operations on the incoming data:
- ObjectMode === false: Converts chunk data to Buffer
- ObjectMode === true: passes the data to the downstream intact
The first judgment of addChunk is primarily to deal with cases where Readable is in flowing mode, there are data listeners, and the buffer data is empty.
In this case, the data is passed through to other programs that subscribe to the data event, otherwise the data is stored in the buffer.
2.4.2. read
Excluding the judgment of boundary conditions and flow states, this method mainly has two operations
- Call the user-implemented _read method to process the results of the execution
- Reads data from the buffer and fires the ‘data’ event
readable.read
// If the length of read is greater than HWM, the HWM is recalculated
if (n > state.highWaterMark) {
state.highWaterMark = computeNewHighWaterMark(n);
}
// Invoke the user-implemented \_read method
try {
const result = this.\_read(state.highWaterMark);
if(result ! =null) {
const then = result.then;
if (typeof then === 'function') {
then.call(
result,
nop,
function(err) {
errorOrDestroy(this, err); }); }}}catch (err) {
errorOrDestroy(this, err);
}
Copy the code
If the user-implemented _read method returns a promise, then the promise’s then method is called, passing in success and failure callbacks for easy handling of exceptions.
The core code for the read method is as follows:
readable.read
function fromList(n, state) {
// nothing buffered.
if (state.length === 0)
return null;
let ret;
if (state.objectMode)
ret = state.buffer.shift();
else if(! n || n >= state.length) {// Handle cases where n is empty or greater than the length of the buffer
// Read it all, truncate the list.
if (state.decoder) // With a decoder, serialize the result to a string
ret = state.buffer.join(' ');
else if (state.buffer.length === 1) // Only one data, return the head node data
ret = state.buffer.first();
else // Store all data in a Buffer
ret = state.buffer.concat(state.length);
state.buffer.clear(); // Clear the buffer
} else {
// Handle the case where the read length is smaller than the buffer
ret = state.buffer.consume(n, state.decoder);
}
return ret;
}
Copy the code
2.4.3. _read
The method that the user must implement when initializing the Readable Stream can be called push in this method, which continuously fires the read method and stops writing to the stream when we push null.
Sample code:
readable._read
const Stream = require('stream');
const readableStream = new Stream.Readable({
read(hwm) {
this.push(String.fromCharCode(this.currentCharCode++));
if (this.currentCharCode > 122) {
this.push(null); }}}); readableStream.currentCharCode =97;
readableStream.pipe(process.stdout);
// abcdefghijklmnopqrstuvwxyz%
Copy the code
2.4.4. Pipe (Important)
Bind one or more Writable streams to the current Readable stream and switch the Readable stream to flowing mode.
There are a number of event listener handles in this method that I won’t cover here:
readable.pipe
Readable.prototype.pipe = function(dest, pipeOpts) {
const src = this;
const state = this.\_readableState;
state.pipes.push(dest); // Collect Writable streams
src.on('data', ondata);
function ondata(chunk) {
const ret = dest.write(chunk);
if (ret === false) { pause(); }}// Tell the dest that it's being piped to.
dest.emit('pipe', src);
// Start the stream if it is in paused mode
if (dest.writableNeedDrain === true) {
if(state.flowing) { pause(); }}else if(! state.flowing) { src.resume(); }return dest;
}
Copy the code
Pipe and Linux operation of pipeline operators’ | ‘is very similar, will be on the left side of the output into the right input, this method will be writable stream collected for maintenance, and when a readable stream trigger’ data ‘event.
When data flows out, it triggers a write event to the writable stream, thus enabling data transfer and pipe-like operation. It automatically changes a readable stream in pause mode to flow mode.
2.4.5. resume
Switch the stream from ‘paused’ mode to ‘flowing’ mode. This method will have no effect if ‘readable’ listener is set
readable.resume
Readable.prototype.resume = function() {
const state = this._readableState;
if(! state.flowing) { state.flowing = ! state.readableListening;// Whether you are in flowing mode depends on whether the 'readable' listener handle is set
resume(this, state); }};function resume(stream, state) {
if(! state.resumeScheduled) {// Switches the resume_ method to be called only once in the same Tick
state.resumeScheduled = true; process.nextTick(resume_, stream, state); }}function resume_(stream, state) {
if(! state.reading) { stream.read(0);
}
state.resumeScheduled = false;
stream.emit('resume');
flow(stream);
}
function flow(stream) { // When a stream is in stream mode, the method reads data from the buffer until the buffer is empty
const state = stream._readableState;
while(state.flowing && stream.read() ! = =null);
// Since the read method will be called here, the stream with the 'readable' event listener may also be called,
// This will result in incoherent data (not affecting data, but only the read method being called in the 'readable' event callback)
}
Copy the code
2.4.6. pause
Change the stream from flow mode to pause mode, stop firing the ‘data’ event, and save all data to the buffer
readable.pause
Readable.prototype.pause = function() {
if (this._readableState.flowing ! = =false) {
debug('pause');
this._readableState.flowing = false;
this.emit('pause');
}
return this;
};
Copy the code
2.5. Use method and working mechanism
Using the method, as described in the BufferList section, creates a Readable instance and implements its _read() method, or implements the read method in the constructor’s first object argument.
2.5.1. Working mechanism
This only Outlines the process and the mode conversion trigger conditions for the Readable stream.
Among them:
- NeedReadable (true): Suspends mode with buffer data <= HWM, a readable event listener is bound, or read data is in the buffer with no data or is returned empty
- Push: If in flow mode, no data in the buffer will trigger the ‘data’ event; Otherwise, the data is saved to the buffer and the ‘readable’ event is emitted based on the needReadable state
- Read: When data with length=0 is read, the ‘readable’ event must be emitted if the data in the buffer has already reached HWM or is overflowing. Reads data from buffer and fires the ‘data’ event
- Resume: With ‘readable’ listening, this method will not work; Otherwise, change the stream from paused mode to flowing mode and empty the buffer
- If the ‘readable’ event is bound and there is data in the buffer, if there is data in the buffer for push data, and if needReadable === true or Length =0 data is read, the data in the buffer will have reached the HWM or overflow
3. Summary
- Node.js solves memory and time problems by implementing its own stream, which can read bits of data into memory for consumer consumption
- Streams are not a Node.js specific concept; they were introduced in the Unix operating system decades ago
- There are four types of streams: readable stream, writable stream, writable stream, and transformation stream, all of which inherit the instance methods and static methods of EventEmiiter and are EE instances
- The underlying container for streams is based on BufferList, a custom linked list implementation that starts and ends with a “pointer” to the next node reference
- Readable streams have two modes and three states, in which data is sent to the consumer via EventEmitter
- Based on flow, we can realize the chain processing of data, and we can assemble different flow processing functions to realize various operations of flow and convert them into the data we want