Welcome to the road of big front-end technology

Streams – The definitive Guide

Learn how to use the Streams API to use readable, writable, and transformed streams

The Streams API allows you to programmatically process data Streams received over the network, or created locally, in JavaScript. Streaming processing involves breaking up the resources to be received, sent, or transformed into chunks and then processing those chunks one by one. Although browsers are already doing streaming when they receive resources such as HTML or video, JavaScript couldn’t use these features until the Streams API was introduced in 2015.

Technically, XMLHttpRequest is possible for streaming, but it’s really hard to use.

Previously, if you wanted to process a resource (a video or text file, etc.), you had to download the entire file, wait for it to be deserialized into a suitable format, and then process it. This all changed as JavaScript became available to use streams. Now, once the raw data is available, it can be processed step by step using JavaScript without generating buffers, strings, or blobs. This will unlock many use cases, such as:

  • Video effects: Pipe a readable video to a conversion stream that adds real-time effects
  • Data decompression/compression: Pipe a file to a conversion stream that can optionally be compressed.
  • Image decoding: pipe an HTTP response to a conversion stream that decodes bytes into bitmap data, and then to a conversion stream that converts bitmaps into PNG. If installed in the service worker’s “fetch”, this allows you to transparently populate the new image format, such as AVIF.

The core concept#

Before going into the details of the various types of flows, let me introduce some core concepts.

Chunks #

Chunks are individual pieces of data written into or read from a stream. It can be any type; Streams can even contain many different types of chunks. In most cases, chunk is not the most atomic unit of data for a stream. For example, a byte stream might contain blocks made up of 16 KiBUint8Array cells rather than individual bytes.

A readable stream#

A readable stream represents a data source from which you can read. In other words, the data comes from a readable stream. Specifically, a ReadableStream is an instance of the ReadableStream class.

Transformation flows#

A transformation flow consists of a flow: a writable flow (called its writable end) and a readable flow (called its readable end). A true metaphor is a simultaneous interpreter who translates instantly from one language to another. In the case of a transformation flow, writing to the writable side results in new data being available for reading from the readable side. Specifically, any object with writable and readable properties can be used as a transformation flow. However, the standard TransformStream class makes it easier to create such an object.

Pipe Chains#

Streams are used primarily through pipe connections to each other. A readable stream can be piped directly to a writable stream using its pipeTo() method, or pipeThrough() through one or more transformation streams using the pipeThrough() method of a readable stream. A set of streams connected in this way is called a pipe chain.

Backpressure#

Once a pipeline chain is constructed, it will transmit signals about how fast the Chunks should flow. If any step in the chain cannot receive a chunk, it propagates a signal back through the pipe chain until eventually the original source is told to stop generating chunks so fast. This process of normalizing flow is called back pressure.

Teeing #

A readable stream can use the tee() method for the tee operation (named after a shape with a capital “T”, with one entry and two exits). This locks the stream so that it is no longer directly available; However, it creates two new flows, called branches, that can be used independently. Teeing is also important because streams cannot be rewound or restarted, more on that later.

Readable flow mechanism#

A ReadableStream is a data source, represented in JavaScript by a ReadableStream object streaming from the underlying source. The ReadableStream constructor creates and returns a ReadableStream object from the given handlers. There are two types of underlying sources:

  • A push source constantly pushes data to you when you access it, and you can start, pause, or cancel streaming access. Examples include live video streams, Server-sent Events, or WebSockets.
  • Pull the sourceRequires you to explicitly request data when you connect to them. Examples include passingfetch()orXMLHttpRequestThe HTTP operation invoked.

Stream data is read sequentially in small chunks called chunks. Blocks placed in a stream are called incoming queues. This means they are waiting in a queue to be read. The internal queue keeps track of blocks of data that have not yet been read.

A queue policy is an object that determines the state of the internal queue of a stream and how the stream should signal Backpressure. The queuing policy assigns a size to each chunk and compares the total size of all chunks in the queue with a specified number (called the high water mark).

Blocks in the stream are read by readers. The Reader retrieves one chunk at a time, allowing you to perform various types of operations on the data. The reader, with its accompanying processing logic, is called a consumer.

There is also a component called a controller. Each readable stream has an associated controller, which, as the name implies, allows you to control the stream.

Only one reader stream can be read at a time; When a reader is created and starts reading a stream (that is, becomes an active reader), it is locked to that stream. If you want another reader to take over your stream, you usually need to release the first reader first (although you can tee the stream).

Create a readable stream#

You can create a ReadableStream by calling the ReadableStream() constructor. This constructor takes an optional argument, underlyingSource, which represents the methods and properties of how a stream instance will behave.

The underlyingSource #

You can use the following optional methods, which are customized by the developer:

  • start(controller): is called immediately when the object is constructed. This method can access the source of the flow and perform any other actions needed to set up the flow’s functionality. If the process is done asynchronously, the method can return a promise indicating success or failure.controllerThe parameter is aReadableStreamDefaultController.
  • pull(controller): can be used to control the flow while acquiring chunks. As long as the internal chunks queue of the stream is not full, it is called repeatedly until the queue reaches its maximum water level. If the callpull()The result is a promise, thenpull()Will not be called again until the promise fulfills. If the promise is rejected, the stream fails.
  • cancel(reason): is called when a stream’s consumer cancels the stream.
const readableStream = new ReadableStream({
  start(controller) {
    / *... * /
  },

  pull(controller) {
    / *... * /
  },

  cancel(reason) {
    / *... * /}});Copy the code

ReadableStreamDefaultController supports the following methods:

  • ReadableStreamDefaultController. Close () close related to the flow.
  • ReadableStreamDefaultController. The enqueue () in the associated flow of a given piece of entrance
  • After ReadableStreamDefaultController. Error () that any interaction with the related flow error
/ *... * /
start(controller) {
  controller.enqueue('The first chunk! ');
},
/ *... * /
Copy the code

The queuingStrategy #

QueuingStrategy is the second and optional argument to the ReadableStream() constructor. It is an object that optionally defines the queue policy for a stream and has two properties:

  • highWaterMark: a non-negative number that represents the high watermark of the flow that uses this queue policy.
  • size(chunk): a function that calculates and returns the given chunk size. Results used to determine the backpressure, through ReadableStreamDefaultController desiredSize properties displayed. It also controls when the underlying source is calledpull()Methods.
const readableStream = new ReadableStream({
    / *... * /
  },
  {
    highWaterMark: 10.size(chunk) {
      returnchunk.length; }},);Copy the code

You can customize queuingStrategy, or using ByteLengthQueuingStrategy or CountQueuingStrategy instance. If queuingStrategy is not provided, CountQueuingStrategy with highWaterMark of 1 is used by default.

getReader()getReader()methods#

To read from the readable stream, you’ll need a reader, it will be a readerReadableStreamDefaultReader. The getReader() method of the ReadableStream interface creates a reader and locks the stream to it. When a stream is locked, no other reader can be retrieved until it is released.

ReadableStreamDefaultReader read () method returns the next chunk of a provides access to the internal queue of promise. It accepts or rejects depending on the stream state. There are several possibilities:

  • Chunk may have, but the promise will successfully return an object:{ value: chunk, done: false }.
  • If the stream is closed, the promise will successfully return an object:{ value: undefined, done: true }.
  • If the flow is wrong, the promise is rejected with an associated error.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}
Copy the code

lockedattribute#

You can access the readablestream. locked property to check if the ReadableStream is locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Copy the code

Examples of readable stream code#

The following code example shows all the steps in action. Start by creating a ReadableStream with a start() method defined in the underlyingSource argument (that is, the TimestampSource class). This method tells the controller to enqueue() one timestamp per second for ten seconds. Finally, the controller is notified to close the flow. You can create a reader with getReader() and keep calling read() until the stream is done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() = > {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1 _000);

    setTimeout(() = > {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10 _000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval); }}const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = ' ';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done` - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) = > console.log('Stream complete', result));

Copy the code

Asynchronous iterative#

It is not convenient to detect the done of each read() in a loop iteration. Fortunately, there’s a better way to do this: asynchronous iteration

for await (const chunk of stream) {
  console.log(chunk);
}
Copy the code

A workaround to using asynchronous iteration is to use helper functions for specific functionality. The following code lets you use this feature:

function streamAsyncIterator(stream) {
  // Get a lock on the stream:
  const reader = stream.getReader();

  return {
    next() {
      // Stream reads already resolve with {done, value}, so
      // we can just call read:
      return reader.read();
    },
    return() {
      // Release the lock if the iterator terminates.
      reader.releaseLock();
      return {};
    },
    // for-await calls this on whatever it's passed, so
    // iterators tend to return themselves.
    [Symbol.asyncIterator]() {
      return this; }}; }async function example() {
  const response = await fetch(url);
  for await (const chunk of streamAsyncIterator(response.body)) {
    console.log(chunk); }}Copy the code

Tee a readable stream#

The tee() method of the ReadableStream interface performs a tee operation on the current stream, returning an array of length 2 representing two ReadableStream instances of the new branch. This allows two readers to read a stream simultaneously. For example, in a service worker, you want to get the response from the server and stream it to the browser, but you can also stream it to the service worker cache. Since the body of Response cannot be re-consumed, two copies are needed to do this. To cancel the stream, you have to cancel the two generated branches. The Tee operation generates a lock during this time, preventing other readers from locking it.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason); }});// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]'.await readerA.read()); //=> {value: "a", done: false}
console.log('[A]'.await readerA.read()); //=> {value: "b", done: false}
console.log('[A]'.await readerA.read()); //=> {value: "c", done: false}
console.log('[A]'.await readerA.read()); //=> {value: "d", done: false}
console.log('[A]'.await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}
Copy the code

Create a readable byte stream#

Passing a type argument in the ReadableStream() constructor creates a byte stream

new ReadableStream({ type: 'bytes' });
Copy the code

The underlyingSource #

Readable byte streams of the underlying source was given a ReadableByteStreamController for operation. ReadableByteStreamController. The enqueue () method takes a ArrayBufferView chunk of a class as a parameter. ReadableByteStreamController. ByobRequest returns the current BYOB (” “bringing your own buffer”) pull request, if not null. Finally, ReadableByteStreamController. DesiredSize property returns the filling stream the expectations of internal queue size.

The queuingStrategy #

The second, also optional, argument to the ReadableStream() constructor is queuingStrategy. It is an object that optionally defines the queuing strategy for a stream and takes one argument:

  • highWaterMark: a non-negative number of bytes representing the high water mark for a stream that uses this queuing policy. This is used to determine backpressure, passReadableByteStreamController.desiredSizeProperty display. It also controls when the pull() method of the underlying source is called

Unlike queuing policies for other stream types, queuing policies for readable byte streams do not have the size(chunk) function. The size of each block is always determined by its byteLength property.

If queuingStrategy is not provided, the highWaterMark policy of 0 is used by default.

getReader() and read() #

You can set accordingly mode parameters to access ReadableStreamBYOBReader: ReadableStream. GetReader ({mode: “byob”}). This allows for more precise control over buffer allocation to avoid duplication. To read from a byte stream, you need to call ReadableStreamBYOBReader. Read (view), in which the view is a ArrayBufferView.

Example of a readable byte stream#

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1 _024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}
Copy the code

The following function returns a readable stream of bytes that allows effective zero-copy reads of randomly generated arrays. Instead of using a predetermined 1024 chunk size, it tries to populate the buffer provided by the developer, allowing full control.

const DEFAULT_CHUNK_SIZE = 1 _024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes'.pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}
Copy the code

Mechanisms for writable flows#

A WritableStream is a destination to which data can be written, represented in JavaScript by a WritableStream object. It acts as an abstraction of the underlying IO sink that writes the raw data.

Data is written to the stream one chunk at a time through writer. Chunks can take many forms, just like in Reader. You can use any logic to generate chunks ready to be written. Writers, together with their respective logic, are called producers.

When a writer is created and starts writing to the stream (that is, an active writer), it is locked to the stream. Only one writer can write data to the writable stream at a time. If you want another writer to start writing to the stream, you usually need to release the current writer first.

An internal queue tracks chunks that have been written to the stream but not yet processed by the underlying sink.

The last component is the Controller. Each writable flow has an associated controller that allows you to control the flow (for example, abort the flow).

Create a writable stream#

The Streams API’s WritableStream interface provides a standard abstraction for writing stream data to a target called sink. This object has built-in backpressure and queues. You can create a write stream by calling WritableStream(). It has an optional underlyingSink parameter, an object that defines how the stream instance will behave.

The underlyingSink #

UnderlyingSink includes the following optional, user-defined methods. WritableStreamDefaultController type controller will be passed as a parameter to part of the method.

  • start(controller): Calls this method immediately when the object is constructed. The content of this method should be accessible to the underlying layersinkAs the goal. If the process is done asynchronously, it can return a promise indicating success or failure.
  • write(chunk, controller): This method is called when a new chunk is available to write to the underlying sink. It can return a promise indicating the success or failure of a write operation. This method is called only after a previous write operation succeeded, and is not called after a stream is closed or aborted.
  • close(controller)This method is called after writing to the stream is complete. This method should put the finishing touches on the underlying sink write operation and release access to it. If the process is asynchronous, it can return a promise indicating success or failure. This method is called only after all writes on the queue have been successful.
  • abort(reason): this method is called when the application wants to suddenly close the stream and put it in an error state. It can clean up any held resources, much like close(), but with write operations queued up, abort() is also called. The chunks in those queues are thrown away. If the process is asynchronous, it can return a promise indicating success or failure. The reason parameter contains oneDOMString, describe why the flow was aborted.
const writableStream = new WritableStream({
  start(controller) {
    / *... * /
  },

  write(chunk, controller) {
    / *... * /
  },

  close(controller) {
    / *... * /
  },

  abort(reason) {
    / *... * /}});Copy the code

Streams API WritableStreamDefaultController interface represents a during setup, the chunk is committed during the period of writing, or at the end of the writing during the state of the controller to control the WritableStream. When constructing WritableStream, will give the underlying sink a corresponding WritableStreamDefaultController instance to operate. Is only one way WritableStreamDefaultController WritableStreamDefaultController. Error () will make future flow operating error:

/ *... * /
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch(error) { controller.error(error.message); }},/ *... * /
Copy the code

The queuingStrategy #

The second argument to the WritableStream() constructor is also optional. It is an object that optionally defines the queuing strategy for a stream and takes two arguments:

  • highWaterMark: a non-negative number that represents the high water mark for a flow that uses this queuing policy.
  • size(chunk): calculates and returns the size of a given chunk. This result will be used to determine the backpressure, which passesWritableStreamDefaultWriter.desiredSizeAttributes exposed.

You can customize a queuingStrategy, or using ByteLengthQueuingStrategy or CountQueuingStrategy instance. If queuingStrategy is not provided, CountQueuingStrategy with highWaterMark of 1 is used by default

getWriter()write()methods#

To write to write stream, you need to be a writer, it will be a WritableStreamDefaultWriter. WritableStream interface getWriter () method returns a new instance, WritableStreamDefaultWriter and lock the instance of flow. When a stream is locked, no other writer can be acquired until the current writer is released.

WritableStreamDefaultWriter interface of the write () method will transfer data block write WritableStream and underlying sink, and then returns a promise, the promise will parse to indicate success or failure of the write operation. Note that the meaning of “success” depends on the underlying sink, which may only mean that chunk has been accepted and not necessarily that chunk has been safely saved to its final destination.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk! ');
Copy the code

lockedattribute#

You can check whether writable streams are locked by accessing writableStream.locked of writable streams.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Copy the code

Examples of writable stream code#

This code suspends all actual operations.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) = > setTimeout(() = > {
      document.body.textContent += chunk;
      resolve();
    }, 1 _000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason); }});const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]'.Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();
Copy the code

Pipe Readable stream to writable stream#

PipeTo () is used to pipe a readable stream into a writable stream. The readableStream.pipeto () method writes the current ReadableStream pipe to a specified WritableStream and returns a promise indicating success or failure.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason); }});const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) = > setTimeout(() = > {
      document.body.textContent += chunk;
      resolve();
    }, 1 _000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason); }});await readableStream.pipeTo(writableStream);
console.log('[finished]');
Copy the code

Create a transformation flow#

The Streams API’s TransformStream interface represents a set of transformable data. A transformation flow is created by calling its constructor TransformStream(), which creates and returns a transformation flow object from a given handler. TransformStream() accepts an optional Transformer object. This object can contain any of the following methods:

The transformer #

  • start(controller): This method is called immediately when the object is constructed. Usually usecontroller.enqueue()Come join the team with some prefix chunk. These blocks are read from the reader side, but not dependent on any writes to the writer side. If the initial process is asynchronous, for example, because it takes some steps to get the prefix chunk, the function can return a promise indicating success or failure. Any exceptions thrown will be handled byTransformStream()The constructor is rethrown.
  • transform(chunk, controller): This method is called when the new chunk is ready to convert. The implementation of the flow guarantees that this function will only be called after the previous conversion is successful and will not be called until start() has completed or flush() has been called. This function performs the actual transformation of the transformation flow. It can use controller.enqueue() to enqueue results. This allows a single chunk from the write side to read 0 or more chunks, depending on whether you drop ‘controller.enqueue()’.How many times. If the transformation is asynchronous, this function can return a promise indicating the success or failure of the transformation. A reject promise makes errors on both the readable and writable sides of the transfer stream. If not providedThe transform() ‘method uses identity to transform, chunk unchanged from writable queue to readable queue.
  • flush(controller): This method will pass successfully on all chunkstransform()Method is called when the write side is also ready to close. Typically, this is used to insert queue suffix chunks into the readable end before the reader end is closed. If the flush procedure is asynchronous, the function can return a promise indicating success or failure; The result will be notifiedstream.writable.write()The caller of. A reject promise makes errors on both the readable and writable sides of the transfer stream. Throwing an exception is considered tantamount to returning a rejected promise.
const transformStream = new TransformStream({
  start(controller) {
    / *... * /
  },

  transform(chunk, controller) {
    / *... * /
  },

  flush(controller) {
    / *... * /}});Copy the code

writableStrategyreadableStrategy #

The second and third arguments to the TransformStream() constructor are also optional, writableStrategy and readableStrategy, respectively. These two concepts have been described in the readable and writable flows section.

Sample conversion flow code#

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]'); controller.terminate(); }}); (async() = > {const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = awaitreader.read(); ! result.done; result =await reader.read()) {
    console.log('[value]', result.value);
  }
})();
Copy the code

Pipe reads the stream to the transform stream

The pipeThrough() method of the ReadableStream interface provides a chained pipe read-to-transform stream capability. The PIPE operation typically locks the pipe while it is running to prevent other readers from locking the stream.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]'); controller.terminate(); }});const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason); }}); (async() = > {const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = awaitreader.read(); ! result.done; result =await reader.read()) {
    console.log('[value]', result.value);
  }
})();
Copy the code

The next code example (somewhat contrived) shows how to implement the “Shouting” version of fetch() by capitalizing all letters chunk by chunk by returning the response promise as a stream. The advantage of this approach is that you don’t have to wait for the entire document to be downloaded, which can make a huge difference when working with large files.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller){ controller.enqueue(chunk.toUpperCase()); }}); }function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk){ el.append(chunk); }}); } fetch('./lorem-ipsum.txt').then((response) = >
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);
Copy the code

Browser support and Polyfill#

Browser support for the Streams API varies. Be sure to check the detailed compatibility data for Can I Use. Note that some browsers implement only partial implementations of certain features, so be sure to check thoroughly.

The good news is that there is a reference implementation available and a Polyfill for production environments.

Demo

The following demo shows readable, writable, and transformable flows in action. It also includes examples of pipeThrough() and pipeTo() pipe chains, and demonstrates tee(). You can run the demo in a new window or view source Code.

Streams in the browser#

There are many useful streams built into the browser. You can easily create a readable stream from a Blob. The stream() method of the Blob interface returns a readable stream that, when read, returns the data contained in the Blob. Recall that a File object is a specific type of Blob that can be used in any context that a Blob can be used in.

const readableStream = new Blob(['hello world'] and {type: 'text/plain' }).stream();
Copy the code

Variants of the streams textdecoder.decode () and Textencoder.encode () are called TextDecoderStream and TextEncoderStream, respectively.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream())
Copy the code

Converting streams using CompressionStream and DecompressionStream can easily compress or decompress files. The following code example shows how to download Streams Spec, gzip it in a browser, and write the compressed file directly to hard disk.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Copy the code

File System Access API ‘s FileSystemWritableFileStream and experimental the fetch () request flow are all examples of writable stream.

The Serial API makes heavy use of both readable and writable streams.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9 _600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104.101.108.108.111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Copy the code

Finally, WebSocketStream integrates the stream with the WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}
Copy the code

resources#

  • Streams specification
  • Accompanying demos
  • Streams polyfill
  • 2016 — The Year of Web Streams
  • Async iterators and generators
  • Stream Visualizer