When to use streams

When dealing with large file reads, compression, archiving, media files, and large log files, data is read into memory, and memory can be used up quickly, which can cause major problems for programs.

If you read a fixed length at a time while doing this with a suitable buffer, you will use less memory, which is the streaming API.

The API classes available for Stream

  • Readable-a stream that is Readable (for example, fs.createreadStream ()).
  • Writable – Writable stream (for example fs.createWritestream ()).
  • Duplex – A read-write stream (such as net.socket).
  • Transform – A Duplex stream that can modify and Transform data during reading and writing (for example, zlib.createDeflate()).


Implement a static Web server using built-in streams

Both FS and NET, the core modules of Node’s file system and network operations, provide streaming interfaces. Using streams to handle I/O problems can be fairly simple.

Using the Node core module to achieve a simple static server:

const http = require('http');
const fs = require('fs');

const server = http.createServer(function(req,res){
	fs.readFile(__dirname + '/index.html'.function(err,data){
		if(err){
			res.statusCode = 500;
			res.end(String(err))
			return;
		}
		res.end(data)
	})
})

server.listen(3000)
Copy the code

Although the code above is for non-blocking readfiles, it will run out of memory quickly if a file is read from a very large or very large number of file accesses, so it needs to be improved using the fs.createreadstream method:

const http = require('http');
const fs = require('fs');

const server = http.createServer(function(req,res){// Data is streamed from an HTML file to the HTTP request response fs.createreadStream (__dirname +)'/index.html').pipe(res);
})

server.listen(3000)
Copy the code

The above code provides a buffer to send to the client. If the client connection is slow, the network flow will send a signal to suspend the I/O resource until the client is ready to receive more data.

Implement a simple static file server using streams:

const http = require('http');
const fs = require('fs');

const server = http.createServer(function(req,res){
	let filename = req.url
	if(filename === '/'){
	   filename = '/index.html'	
	}
	fs.createReadStream(__dirname + filename ).pipe(res);	
})

server.listen(3000)
Copy the code


Static server with gZIP compression

const http = require('http');
const fs = require('fs');
const zlib = require('zlib')

const server = http.createServer(function(req,res){
	res.writeHead(200, { 'content-encoding': 'gzip' })
	fs.createReadStream(__dirname + '/index.html' )
            .pipe(zlib.createGzip())
            .pipe(res);	
})

server.listen(3000)
Copy the code



Second, Readable streams

Stream inherits from Events and therefore has on and emit methods on events.

1, events,

  • Readable – will be emitted when blocks of data can be read from the stream.
  • Data — This event is triggered when data is being transferred (the object is chunk data).
  • End – Triggered when data reading ends
  • Close – Triggered when the underlying resource (such as a file) is closed.
  • Error – Triggered when an error occurs in receiving data.

2, methods,

  • Read ([size]) — Reads data from the stream. The data can be String, Buffer, or NULL (as the code below does), and when size is specified, the read-only value is limited to that number of bytes
  • SetEncoding (encoding) — Sets the encoding used in read() requests
  • Pause () – Pauses data events emitted from this object
  • Resume () — Resumes the data events emitted from this object
  • Pipe (destination,[options]) — Passes the read data block to a Writable destination. When the ‘end’ event is triggered after the data transfer is complete, the ‘end’ event of the target (writable stream) is also fired, causing the target to become unwritable
  • Unpipe ([destination]) —- Disconnects this object from the Writale destination.

Considerations for inheriting readable streams:

  • The readable.read method will return blocks of data that are added to the internal readable queue by the readable.push method.
  • All subclasses that inherit readable streams must be implementedreadable._read()Method to obtain the underlying data resources and should only be called by internal Readable object methods, not directly by user programs. in readable._read()The implementation should be called only if there is still data to readreadable.push(chunk) Method to add data to an internal readable queuereadable.readMethod is read for use by the provider program.
  • Once the instance listens for a data event, the readable._read() return value will be lost.

Example: Implement a readable stream

const { Readable } = require('stream');
const util = require('util');

util.inherits(MyReadStream, Readable)

function MyReadStream(arr){
	this.source = arr;
	Readable.call(this);
}

MyReadStream.prototype._read = function(){
	if(this.source.length){
		this.push(this.source[0])
		this.source.splice(0.1)}else{
		this.push(null)}}let myStream = new MyReadStream(['php'.'js'.'java'])

myStream.on('readable'.function(){
	let output_buf  = myStream.read();
	console.log(output_buf,'output')  // null
})

myStream.on('data'.function(res){
	console.log(res.toString(),'data')
})
myStream.on('end'.function(){
	console.log('end')})Copy the code

In the code above, the read method is called on a readable event to reada string and listens for a Data event to output the read data.


A Writable stream

The Writable stream interface is an abstraction of the destination for writing data.

1, the method

Write (chunk,[encoding],[callback]) — Writes data to the stream. Chunk contains the data to be written to, encoding specifies the encoding of the string, and callback specifies a callback function to execute when the data has been completely flushed. Write () returns true if the write succeeds.

End ([chunk],[encoding],[callback]) — Like write(), it sets the Writable object to a state that no longer accepts data and sends the Finish event.

2, events,

Drain – After the write() call returns false, this event notifies the monitor when it is ready to start writing more data.

Finish – Fired when end() is called on a Writable object, so the data is flushed and no more data is accepted

Pipe — Emitted when the pipe() method is called on the Readable stream and this writable has been added as a destination

Unpipe — Emitted when the unpipe() method is called with the goal of removing Writable.



Considerations for inheriting writable streams:

  • writable.write()Method writes data to the stream and is called after data processing is completecallback. If something goes wrong,callbackIt doesn’t have to be called with this error as the first argument. To ensure that write errors are reliably detected, you should listen'error'Events.

  • All writable stream implementations must provide a writable._write() method to send data to the underlying resource.


Example: Implement a writable stream of standard input to standard output, and determine if the input character contains a, an error and exit

const { Writable } = require('stream');
const util = require('util');

util.inherits(MyWriteStream, Writable)


function MyWriteStream(options){
	Writable.call(this, options);
}

MyWriteStream.prototype._write = function(chunk, encoding, callback){
	if(chunk.toString().indexOf('a') > -1){
		process.stdout.write("Newly written:"+ chunk)
		callback(null)
	}else{
		callback(new Error('no a'))}}let myStream = new MyWriteStream();
myStream.write('abc\n')
process.stdin.pipe(myStream)Copy the code

Note: The callback method must be called to indicate success or failure of the write. If an Error occurs, the first argument to callback must be an Error object and null on success.


Duplex flow – readable and writable flow

Inherit stream.Duplex to implement a Duplex stream

Example: Implement a method that changes the color of standard input content and prints it from standard output

const { Duplex } = require('stream');
const util = require('util');

util.inherits(MyDuplexStream, Duplex)


function MyDuplexStream(options){
	Duplex.call(this, options);
	this.wating = false;
}

MyDuplexStream.prototype._write = function(chunk, encoding, callback){
	this.wating = false; // Push the data to the internal queue this.push('\u001b[32m' +  chunk + '\u001b[39m');
	callback()
}

MyDuplexStream.prototype._read = function(chunk, encoding, callback){
	if(! This.wating){// Display a prompt while waiting for data this.push('Waiting for input >')
		this.wating = true; }}letmyStream = new MyDuplexStream(); Process.stdin.pipe (myStream).pipe(process.stdout)Copy the code



5. Transform the flow

The conversion stream is much like a duplex stream and also implements the Readable and Writable interfaces. The difference is that the conversion flow transforms data and is implemented using _transform. This method takes three parameters, the thunk data block, the Encoding encoding, and the callback callback (much like _write), which executes when the conversion is complete, allowing the conversion stream to parse the data asynchronously.

Example to be fixed.