Shell scripting with Node.js
You can buy the offline version of this book (HTML, PDF, EPUB, MOBI) and support the free online version.
(Ad, please don’t block.)

10 Using web streams on Node.js



Web streams are a standard for streams that is now supported on all major web platforms: web browsers, Node.js, and Deno. (Streams are an abstraction for reading and writing data sequentially in small pieces from all kinds of sources – files, data hosted on servers, etc.)

For example, the global function fetch() (which downloads online resources) asynchronously returns a Response which has a property .body with a web stream.

This chapter covers web streams on Node.js, but most of what we learn applies to all web platforms that support them.

10.1 What are web streams?

Let’s start with an overview of a few fundamentals of web streams. Afterwards, we’ll quickly move on to examples.

Streams are a data structure for accessing data such as:

Two of their benefits are:

Web streams (“web” is often omitted) are a relatively new standard that originated in web browsers but is now also supported by Node.js and Deno (as shown in this MDN compatibility table).

In web streams, chunks are usually either:

10.1.1 Kinds of streams

There are three main kinds of web streams:

ReadableStreams, WritableStreams and TransformStreams can be used to transport text or binary data. We’ll mostly do the former in this chapter. Byte streams for binary data are briefly mentioned at the end.

10.1.2 Pipe chains

Piping is an operation that lets us pipe a ReadableStream to a WritableStream: As long as the ReadableStream produces data, this operation reads that data and writes it to the WritableStream. If we connect just two streams, we get a convenient way of transferring data from one location to another (e.g. to copy a file). However, we can also connect more than two streams and get pipe chains that can process data in a variety of ways. This is an example of a pipe chain:

A ReadableStream is connected to a TransformStream by piping the former to the writable side of the latter. Similarly, a TransformStream is connected to another TransformStream by piping the readable side of the former to the writable side of the latter. And a TransformStream is connected to a WritableStream by piping the readable side of the former to the latter.

10.1.3 Backpressure

One problem in pipe chains is that a member may receive more data than it can handle at the moment. Backpressure is a technique for solving this problem: It enables a receiver of data to tell its sender that it should temporarily stop sending data so that the receiver doesn’t get overwhelmed.

Another way to look at backpressure is as a signal that travels backwards through a pipe chain, from a member that is getting overwhelmed to the beginning of the chain. As an example, consider the following pipe chain:

ReadableStream -pipeTo-> TransformStream -pipeTo-> WriteableStream

This is how backpressure travels through this chain:

We have reached the beginning of the pipe chain. Therefore, no data accumulates inside the ReadableStream (which is also buffered) and the WriteableStream has time to recover. Once it does, it signals that it is ready to receive data again. That signal also travels back through the chain until it reaches the ReadableStream and data processing resumes.

In this first look at backpressure, several details were omitted to make things easier to understand. These will be covered later.

10.1.4 Support for web streams in Node.js

In Node.js, web streams are available from two sources:

At the moment, only one API has direct support for web streams in Node.js – the Fetch API:

const response = await fetch('https://example.com');
const readableStream = response.body;

For other things, we need to use one of the following static methods in module 'node:stream' to either convert a Node.js stream to a web stream or vice versa:

One other API partially supports web streams: FileHandles have the method .readableWebStream().

10.2 Reading from ReadableStreams

ReadableStreams let us read chunks of data from various sources. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface ReadableStream<TChunk> {
  getReader(): ReadableStreamDefaultReader<TChunk>;
  readonly locked: boolean;
  [Symbol.asyncIterator](): AsyncIterator<TChunk>;

  cancel(reason?: any): Promise<void>;

  pipeTo(
    destination: WritableStream<TChunk>,
    options?: StreamPipeOptions
  ): Promise<void>;
  pipeThrough<TChunk2>(
    transform: ReadableWritablePair<TChunk2, TChunk>,
    options?: StreamPipeOptions
  ): ReadableStream<TChunk2>;
  
  // Not used in this chapter:
  tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
}

interface StreamPipeOptions {
  signal?: AbortSignal;
  preventClose?: boolean;
  preventAbort?: boolean;
  preventCancel?: boolean;
}

Explanations of these properties:

The following subsections cover three ways of consuming ReadableStreams:

10.2.1 Consuming ReadableStreams via Readers

We can use Readers to read data from ReadableStreams. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface ReadableStreamGenericReader {
  readonly closed: Promise<undefined>;
  cancel(reason?: any): Promise<void>;
}
interface ReadableStreamDefaultReader<TChunk>
  extends ReadableStreamGenericReader
{
  releaseLock(): void;
  read(): Promise<ReadableStreamReadResult<TChunk>>;
}

interface ReadableStreamReadResult<TChunk> {
  done: boolean;
  value: TChunk | undefined;
}

Explanations of these properties:

ReadableStreamReadResult may look familiar if you know how iteration works: ReadableStreams are similar to iterables, Readers are similar to iterators, and ReadableStreamReadResults are similar to the objects returned by the iterator method .next().

The following code demonstrates the protocol for using Readers:

const reader = readableStream.getReader(); // (A)
assert.equal(readableStream.locked, true); // (B)
try {
  while (true) {
    const {done, value: chunk} = await reader.read(); // (C)
    if (done) break;
    // Use `chunk`
  }
} finally {
  reader.releaseLock(); // (D)
}

Getting a Reader. We can’t read directly from readableStream, we first need to acquire a Reader (line A). Each ReadableStream can have at most one Reader. After a Reader was acquired, readableStream is locked (line B). Before we can call .getReader() again, we must call .releaseLock() (line D).

Reading chunks. .read() returns a Promise for an object with the properties .done and .value (line C). After the last chunk was read, .done is true. This approach is similar to how asynchronous iteration works in JavaScript.

10.2.1.1 Example: reading a file via a ReadableStream

In the following example, we read chunks (strings) from a text file data.txt:

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'data.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable); // (A)

const reader = webReadableStream.getReader();
try {
  while (true) {
    const {done, value} = await reader.read();
    if (done) break;
    console.log(value);
  }
} finally {
  reader.releaseLock();
}
// Output:
// 'Content of text file\n'

We are converting a Node.js Readable to a web ReadableStream (line A). Then we use the previously explained protocol to read the chunks.

10.2.1.2 Example: assembling a string with the contents of a ReadableStream

In the next example, we concatenate all chunks of a ReadableStream into a string and return it:

/**
 * Returns a string with the contents of `readableStream`.
 */
async function readableStreamToString(readableStream) {
  const reader = readableStream.getReader();
  try {
    let result = '';
    while (true) {
      const {done, value} = await reader.read();
      if (done) {
        return result; // (A)
      }
      result += value;
    }
  } finally {
    reader.releaseLock(); // (B)
  }
}

Conveniently, the finally clause is always executed – now matter how we leave the try clause. That is, the lock is correctly released (line B) if we return a result (line A).

10.2.2 Consuming ReadableStreams via asynchronous iteration

ReadableStreams can also be consumed via asynchronous iteration:

const iterator = readableStream[Symbol.asyncIterator]();
let exhaustive = false;
try {
  while (true) {
    let chunk;
    ({done: exhaustive, value: chunk} = await iterator.next());
    if (exhaustive) break;
    console.log(chunk);
  }
} finally {
  // If the loop was terminated before we could iterate exhaustively
  // (via an exception or `return`), we must call `iterator.return()`.
  // Check if that was the case.
  if (!exhaustive) {
    iterator.return();
  }
}

Thankfully, the for-await-of loop handles all the details of asynchronous iteration for us:

for await (const chunk of readableStream) {
  console.log(chunk);
}
10.2.2.1 Example: using asynchronous iteration to read a stream

Let’s redo our previous attempt to read text from a file. This time, we use asynchronous iteration instead of a Reader:

import * as fs from 'node:fs';
import {Readable} from 'node:stream';

const nodeReadable = fs.createReadStream(
  'text-file.txt', {encoding: 'utf-8'});
const webReadableStream = Readable.toWeb(nodeReadable);
for await (const chunk of webReadableStream) {
  console.log(chunk);
}
// Output:
// 'Content of text file'
10.2.2.2 Example: assembling a string with the contents of a ReadableStream

We have previously used a Reader to assemble a string with the contents of a ReadableStream. With asynchronous iteration, the code becomes simpler:

/**
 * Returns a string with the contents of `readableStream`.
 */
async function readableStreamToString2(readableStream) {
  let result = '';
  for await (const chunk of readableStream) {
    result += chunk;
  }
  return result;
}
10.2.2.3 Caveat: Browsers don’t support asynchronous iteration over ReadableStreams

At the moment, Node.js and Deno support asynchronous iteration over ReadableStreams but web browsers don’t: There is a GitHub issue that links to bug reports.

Given that it’s not yet completely clear how async iteration will be supported on browsers, wrapping is a safer choice than polyfilling. The following code is based on a suggestion in the Chromium bug report:

async function* getAsyncIterableFor(readableStream) {
  const reader = readableStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) return;
      yield value;
    }
  } finally {
    reader.releaseLock();
  }
}

10.2.3 Piping ReadableStreams to WritableStreams

ReadableStreams have two methods for piping:

10.3 Turning data sources into ReadableStreams via wrapping

If we want to read an external source via a ReadableStream, we can wrap it in an adapter object and pass that object to the ReadableStream constructor. The adapter object is called the underlying source of the ReadableStream (queuing strategies are explained later, when we take a closer look at backpressure):

new ReadableStream(underlyingSource?, queuingStrategy?)

This is the type of underlying sources (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface UnderlyingSource<TChunk> {
  start?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  pull?(
    controller: ReadableStreamController<TChunk>
  ): void | Promise<void>;
  cancel?(reason?: any): void | Promise<void>;

  // Only used in byte streams and ignored in this section:
  type: 'bytes' | undefined;
  autoAllocateChunkSize: bigint;
}

This is when the ReadableStream calls these methods:

Each of these methods can return a Promise and no further steps will be taken until the Promise is settled. That is useful if we want to do something asynchronous.

The parameter controller of .start() and .pull() lets them access the stream. It has the following type:

type ReadableStreamController<TChunk> =
  | ReadableStreamDefaultController<TChunk>
  | ReadableByteStreamController<TChunk> // ignored here
;

interface ReadableStreamDefaultController<TChunk> {
  enqueue(chunk?: TChunk): void;
  readonly desiredSize: number | null;
  close(): void;
  error(err?: any): void;
}

For now, chunks are strings. We’ll later get to byte streams, where Uint8Arrays are common. This is what the methods do:

10.3.1 A first example of implementing an underlying source

In our first example of implementing an underlying source, we only provide method .start(). We’ll see use cases for .pull() in the next subsection.

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First line\n'); // (A)
    controller.enqueue('Second line\n'); // (B)
    controller.close(); // (C)
  },
});
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'First line\n'
// 'Second line\n'

We use the controller to create a stream with two chunks (line A and line B). It’s important that we close the stream (line C). Otherwise, the for-await-of loop would never finish!

Note that this way of enqueuing isn’t completely safe: There is a risk of exceding the capacity of the internal queue. We’ll see soon how we can avoid that risk.

10.3.2 Using a ReadableStream to wrap a push source or a pull source

A common scenario is turning a push source or a pull source into a ReadableStream. The source being push or pull determines how we will hook into the ReadableStream with our UnderlyingSource:

We’ll see examples for both kinds of sources next.

10.3.2.1 Example: creating a ReadableStream from a push source with backpressure support

In the following example, we wrap a ReadableStream around a socket – which pushes its data to us (it calls us). This example is taken from the web stream specification:

function makeReadableBackpressureSocketStream(host, port) {
  const socket = createBackpressureSocket(host, port);

  return new ReadableStream({
    start(controller) {
      socket.ondata = event => {
        controller.enqueue(event.data);

        if (controller.desiredSize <= 0) {
          // The internal queue is full, so propagate
          // the backpressure signal to the underlying source.
          socket.readStop();
        }
      };

      socket.onend = () => controller.close();
      socket.onerror = () => controller.error(
        new Error('The socket errored!'));
    },

    pull() {
      // This is called if the internal queue has been emptied, but the
      // stream’s consumer still wants more data. In that case, restart
      // the flow of data if we have previously paused it.
      socket.readStart();
    },

    cancel() {
      socket.close();
    },
  });
}
10.3.2.2 Example: creating a ReadableStream from a pull source

The tool function iterableToReadableStream() takes an iterable over chunks and turns it into a ReadableStream:

/**
 * @param iterable an iterable (asynchronous or synchronous)
 */
 function iterableToReadableStream(iterable) {
  return new ReadableStream({
    start() {
      if (typeof iterable[Symbol.asyncIterator] === 'function') {
        this.iterator = iterable[Symbol.asyncIterator]();
      } else if (typeof iterable[Symbol.iterator] === 'function') {
        this.iterator = iterable[Symbol.iterator]();
      } else {
        throw new Error('Not an iterable: ' + iterable);
      }
    },

    async pull(controller) {
      if (this.iterator === null) return;
      // Sync iterators return non-Promise values,
      // but `await` doesn’t mind and simply passes them on
      const {value, done} = await this.iterator.next();
      if (done) {
        this.iterator = null;
        controller.close();
        return;
      }
      controller.enqueue(value);
    },

    cancel() {
      this.iterator = null;
      controller.close();
    },
  });
}

Let’s use an async generator function to create an asynchronous iterable and turn that iterable into a ReadableStream:

async function* genAsyncIterable() {
  yield 'how';
  yield 'are';
  yield 'you';
}
const readableStream = iterableToReadableStream(genAsyncIterable());
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'how'
// 'are'
// 'you'

iterableToReadableStream() also works with synchronous iterables:

const syncIterable = ['hello', 'everyone'];
const readableStream = iterableToReadableStream(syncIterable);
for await (const chunk of readableStream) {
  console.log(chunk);
}

// Output:
// 'hello'
// 'everyone'

There may eventually by a static helper method ReadableStream.from() that provides this functionality (see its pull request for more information).

10.4 Writing to WritableStreams

WritableStreams let us write chunks of data to various sinks. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface WritableStream<TChunk> {
  getWriter(): WritableStreamDefaultWriter<TChunk>;
  readonly locked: boolean;

  close(): Promise<void>;
  abort(reason?: any): Promise<void>;
}

Explanations of these properties:

The following subsections cover two approaches to sending data to WritableStreams:

10.4.1 Writing to WritableStreams via Writers

We can use Writers to write to WritableStreams. They have the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface WritableStreamDefaultWriter<TChunk> {
  readonly desiredSize: number | null;
  readonly ready: Promise<undefined>;
  write(chunk?: TChunk): Promise<void>;
  releaseLock(): void;

  close(): Promise<void>;
  readonly closed: Promise<undefined>;
  abort(reason?: any): Promise<void>;
}

Explanations of these properties:

The following code shows the protocol for using Writers:

const writer = writableStream.getWriter(); // (A)
assert.equal(writableStream.locked, true); // (B)
try {
  // Writing the chunks (explained later)
} finally {
  writer.releaseLock(); // (C)
}

We can’t write directly to a writableStream, we first need to acquire a Writer (line A). Each WritableStream can have at most one Writer. After a Writer was acquired, writableStream is locked (line B). Before we can call .getWriter() again, we must call .releaseLock() (line C).

There are three approaches to writing chunks.

10.4.1.1 Writing approach 1: awaiting .write() (handling backpressure inefficiently)

The first writing approach is to await each result of .write():

await writer.write('Chunk 1');
await writer.write('Chunk 2');
await writer.close();

The Promise returned by .write() fulfills when the chunk that we passed to it, was successfully written. What exactly “successfully written” means, depends on how a WritableStream is implemented – e.g., with a file stream, the chunk may have been sent to the operating system but still reside in a cache and therefore not have actually been written to disk.

The Promise returned by .close() is fulfilled when the stream becomes closed.

A downside of this writing approach is that waiting until writing succeeds means that the queue isn’t used. As a consequence, data throughput may be lower.

10.4.1.2 Writing approach 2: ignoring .write() rejections (ignoring backpressure)

In the second writing approach, we ignore the Promises returned by .write() and only await the Promise returned by .close():

writer.write('Chunk 1').catch(() => {}); // (A)
writer.write('Chunk 2').catch(() => {}); // (B)
await writer.close(); // reports errors

The synchronous invocations of .write() add chunks to the internal queue of the WritableStream. By not awaiting the returned Promises, we don’t wait until each chunk is written. However, awaiting .close() ensures that the queue is empty and all writing succeeded before we continue.

Invoking .catch() in line A and line B is necessary to avoid warnings about unhandled Promise rejections when something goes wrong during writing. Such warnings are often logged to the console. We can afford to ignore the errors reported by .write() because .close() will also report them to us.

The previous code can be improved by using a helper function that ignores Promise rejections:

ignoreRejections(
  writer.write('Chunk 1'),
  writer.write('Chunk 2'),
);
await writer.close(); // reports errors

function ignoreRejections(...promises) {
  for (const promise of promises) {
    promise.catch(() => {});
  }
}

One downside of this approach is that backpressure is ignored: We simply assume that the queue is big enough to hold everything we write.

10.4.1.3 Writing approach 3: awaiting .ready (handling backpressure efficiently)

In this writing approach, we handle backpressure efficiently by awaiting the Writer getter .ready:

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 1').catch(() => {});

await writer.ready; // reports errors
// How much room do we have?
console.log(writer.desiredSize);
writer.write('Chunk 2').catch(() => {});

await writer.close(); // reports errors

The Promise in .ready fulfills whenever the stream transitions from having backpressure to not having backpressure.

10.4.1.4 Example: writing to a file via a Writer

In this example, we create a text file data.txt via a WritableStream:

import * as fs from 'node:fs';
import {Writable} from 'node:stream';

const nodeWritable = fs.createWriteStream(
  'new-file.txt', {encoding: 'utf-8'}); // (A)
const webWritableStream = Writable.toWeb(nodeWritable); // (B)

const writer = webWritableStream.getWriter();
try {
  await writer.write('First line\n');
  await writer.write('Second line\n');
  await writer.close();
} finally {
  writer.releaseLock()
}

In line A, we create a Node.js stream for the file data.txt. In line B, we convert this stream to a web stream. Then we use a Writer to write strings to it.

10.4.2 Piping to WritableStreams

Instead of using Writers, we can also write to WritableStreams by piping ReadableStreams to them:

await readableStream.pipeTo(writableStream);

The Promise returned by .pipeTo() fulfills when piping finishes successfully.

10.4.2.1 Piping happens asynchronously

Piping is performed after the current task completes or pauses. The following code demonstrates that:

const readableStream = new ReadableStream({ // (A)
  start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});
const writableStream = new WritableStream({ // (B)
  write(chunk) {
    console.log('WRITE: ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE WritableStream');
  },
});


console.log('Before .pipeTo()');
const promise = readableStream.pipeTo(writableStream); // (C)
promise.then(() => console.log('Promise fulfilled'));
console.log('After .pipeTo()');

// Output:
// 'Before .pipeTo()'
// 'After .pipeTo()'
// 'WRITE: "First line\n"'
// 'WRITE: "Second line\n"'
// 'CLOSE WritableStream'
// 'Promise fulfilled'

In line A we create a ReadableStream. In line B we create a WritableStream.

We can see that .pipeTo() (line C) returns immediately. In a new task, chunks are read and written. Then writableStream is closed and, finally, promise is fulfilled.

10.4.2.2 Example: piping to a WritableStream for a file

In the following example, we create a WritableStream for a file and pipe a ReadableStream to it:

const webReadableStream = new ReadableStream({ // (A)
  async start(controller) {
    controller.enqueue('First line\n');
    controller.enqueue('Second line\n');
    controller.close();
  },
});

const nodeWritable = fs.createWriteStream( // (B)
  'data.txt', {encoding: 'utf-8'});
const webWritableStream = Writable.toWeb(nodeWritable); // (C)

await webReadableStream.pipeTo(webWritableStream); // (D)

In line A, we create a ReadableStream. In line B, we create a Node.js stream for the file data.txt. In line C, we convert this stream to a web stream. In line D, we pipe our webReadableStream to the WritableStream for the file.

10.4.2.3 Example: writing two ReadableStreams to a WritableStream

In the following example, we write two ReadableStreams to a single WritableStream.

function createReadableStream(prefix) {
  return new ReadableStream({
    async start(controller) {
      controller.enqueue(prefix + 'chunk 1');
      controller.enqueue(prefix + 'chunk 2');
      controller.close();
    },
  });
}

const writableStream = new WritableStream({
  write(chunk) {
    console.log('WRITE ' + JSON.stringify(chunk));
  },
  close() {
    console.log('CLOSE');
  },
  abort(err) {
    console.log('ABORT ' + err);
  },
});

await createReadableStream('Stream 1: ')
  .pipeTo(writableStream, {preventClose: true}); // (A)
await createReadableStream('Stream 2: ')
  .pipeTo(writableStream, {preventClose: true}); // (B)
await writableStream.close();

// Output
// 'WRITE "Stream 1: chunk 1"'
// 'WRITE "Stream 1: chunk 2"'
// 'WRITE "Stream 2: chunk 1"'
// 'WRITE "Stream 2: chunk 2"'
// 'CLOSE'

We tell .pipeTo() to not close the WritableStream after the ReadableStream is closed (line A and line B). Therefore, the WritableStream remains open after line A and we can pipe another ReadableStream to it.

10.5 Turning data sinks into WritableStreams via wrapping

If we want to write to an external sink via a WritableStream, we can wrap it in an adapter object and pass that object to the WritableStream constructor. The adapter object is called the underlying sink of the WritableStream (queuing strategies are explained later, when we take a closer look at backpressure):

new WritableStream(underlyingSink?, queuingStrategy?)

This is the type of underlying sinks (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface UnderlyingSink<TChunk> {
  start?(
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  write?(
    chunk: TChunk,
    controller: WritableStreamDefaultController
  ): void | Promise<void>;
  close?(): void | Promise<void>;;
  abort?(reason?: any): void | Promise<void>;
}

Explanations of these properties:

The parameter controller of .start() and .write() lets them error the WritableStream. It has the following type:

interface WritableStreamDefaultController {
  readonly signal: AbortSignal;
  error(err?: any): void;
}

10.5.1 Example: tracing a ReadableStream

In the next example, we pipe a ReadableStream to a WritableStream in order to check how the ReadableStream produces chunks:

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue('First chunk');
    controller.enqueue('Second chunk');
    controller.close();
  },
});
await readableStream.pipeTo(
  new WritableStream({
    write(chunk) {
      console.log('WRITE ' + JSON.stringify(chunk));
    },
    close() {
      console.log('CLOSE');
    },
    abort(err) {
      console.log('ABORT ' + err);
    },
  })
);
// Output:
// 'WRITE "First chunk"'
// 'WRITE "Second chunk"'
// 'CLOSE'

10.5.2 Example: collecting chunks written to a WriteStream in a string

In the next example, we create a subclass of WriteStream that collects all written chunks in a string. We can access that string via method .getString():

class StringWritableStream extends WritableStream {
  #string = '';
  constructor() {
    super({
      // We need to access the `this` of `StringWritableStream`.
      // Hence the arrow function (and not a method).
      write: (chunk) => {
        this.#string += chunk;
      },
    });
  }
  getString() {
    return this.#string;
  }
}
const stringStream = new StringWritableStream();
const writer = stringStream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

A downside of this approach is that we are mixing two APIs: The API of WritableStream and our new string stream API. An alternative is to delegate to the WritableStream instead of extending it:

function StringcreateWritableStream() {
  let string = '';
  return {
    stream: new WritableStream({
      write(chunk) {
        string += chunk;
      },
    }),
    getString() {
      return string;
    },
  };
}

const stringStream = StringcreateWritableStream();
const writer = stringStream.stream.getWriter();
try {
  await writer.write('How are');
  await writer.write(' you?');
  await writer.close();
} finally {
  writer.releaseLock()
}
assert.equal(
  stringStream.getString(),
  'How are you?'
);

This functionality could also be implemented via a class (instead of as a factory function for objects).

10.6 Using TransformStreams

A TransformStream:

The most common way to use TransformStreams is to “pipe through” them:

const transformedStream = readableStream.pipeThrough(transformStream);

.pipeThrough() pipes readableStream to the writable side of transformStream and returns its readable side. In other words: We have created a new ReadableStream that is a transformed version of readableStream.

.pipeThrough() accepts not only TransformStreams, but any object that has the following shape:

interface ReadableWritablePair<RChunk, WChunk> {
  readable: ReadableStream<RChunk>;
  writable: WritableStream<WChunk>;
}

10.6.1 Standard TransformStreams

Node.js supports the following standard TransformStreams:

10.6.1.1 Example: decoding a stream of UTF-8-encoded bytes

In the following example, we decode a stream of UTF-8-encoded bytes:

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

response.body is a ReadableByteStream whose chunks are instances of Uint8Array (TypedArrays). We pipe that stream through a TextDecoderStream to get a stream that has string chunks.

Note that translating each byte chunk separately (e.g. via a TextDecoder) doesn’t work because a single Unicode code point is encoded as up to four bytes in UTF-8 and those bytes might not all be in the same chunk.

10.6.1.2 Example: creating a readable text stream for standard input

The following Node.js module logs everything that is sent to it via standard input:

// echo-stdin.mjs
import {Readable} from 'node:stream';

const webStream = Readable.toWeb(process.stdin)
  .pipeThrough(new TextDecoderStream('utf-8'));
for await (const chunk of webStream) {
  console.log('>>>', chunk);
}

We can access standard input via a stream stored in process.stdin (process is a global Node.js variable). If we don’t set an encoding for this stream and convert it via Readable.toWeb(), we get a byte stream. We pipe it through a TextDecoderStream in order to get a text stream.

Note that we process standard input incrementally: As soon as another chunk is available, we log it. In other words, we don’t wait until standard input is finished. That is useful when the data is either large or only sent intermittently.

10.7 Implementing custom TransformStreams

We can implement a custom TransformStream by passing a Transformer object to the constructor of TransformStream. Such has object has the following type (feel free to skim this type and the explanations of its properties; they will be explained again when we encounter them in examples):

interface Transformer<TInChunk, TOutChunk> {
  start?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  transform?(
    chunk: TInChunk,
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
  flush?(
    controller: TransformStreamDefaultController<TOutChunk>
  ): void | Promise<void>;
}

Explanations of these properties:

Each of these methods can return a Promise and no further steps will be taken until the Promise is settled. That is useful if we want to do something asynchronous.

The parameter controller has the following type:

interface TransformStreamDefaultController<TOutChunk> {
  enqueue(chunk?: TOutChunk): void;
  readonly desiredSize: number | null;
  terminate(): void;
  error(err?: any): void;
}

What about backpressure in a TransformStream? The class propagates the backpressure from its readable side (output) to its writable side (input). The assumption is that transforming doesn’t change the amount of data much. Therefore, Transforms can get away with ignoring backpressure. However, it could be detected via transformStreamDefaultController.desiredSize and propagated by returning a Promise from transformer.transform().

10.7.1 Example: transforming a stream of arbitrary chunks to a stream of lines

The following subclass of TransformStream converts a stream with arbitrary chunks into a stream where each chunk comprises exactly one line of text. That is, with the possible exception of the last chunk, each chunk ends with an end-of-line (EOL) string: '\n' on Unix (incl. macOS) and '\r\n' on Windows.

class ChunksToLinesTransformer {
  #previous = '';

  transform(chunk, controller) {
    let startSearch = this.#previous.length;
    this.#previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = this.#previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // Line includes the EOL
      const line = this.#previous.slice(0, eolIndex+1);
      controller.enqueue(line);
      this.#previous = this.#previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }

  flush(controller) {
    // Clean up and enqueue any text we’re still holding on to
    if (this.#previous.length > 0) {
      controller.enqueue(this.#previous);
    }
  }
}
class ChunksToLinesStream extends TransformStream {
  constructor() {
    super(new ChunksToLinesTransformer());
  }
}

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('multiple\nlines of\ntext');
    controller.close();
  },
});
const transformStream = new ChunksToLinesStream();
const transformed = stream.pipeThrough(transformStream);

for await (const line of transformed) {
  console.log('>>>', JSON.stringify(line));
}

// Output:
// '>>> "multiple\n"'
// '>>> "lines of\n"'
// '>>> "text"'

Note that Deno’s built-in TextLineStream provides similar functionality.

Tip: We can also make this transformation via an async generator. It would asynchronously iterate over a ReadableStream and return an asynchronous iterable with lines. Its implementation is shown in §9.4 “Transforming readable streams via async generators”.

10.7.2 Tip: async generators are also great for transforming streams

Due to ReadableStreams being asynchronously iterable, we can use asynchronous generators to transform them. That leads to very elegant code:

const stream = new ReadableStream({
  async start(controller) {
    controller.enqueue('one');
    controller.enqueue('two');
    controller.enqueue('three');
    controller.close();
  },
});

async function* prefixChunks(prefix, asyncIterable) {
  for await (const chunk of asyncIterable) {
    yield '> ' + chunk;
  }
}

const transformedAsyncIterable = prefixChunks('> ', stream);
for await (const transformedChunk of transformedAsyncIterable) {
  console.log(transformedChunk);
}

// Output:
// '> one'
// '> two'
// '> three'

10.8 A closer look at backpressure

Let’s take a closer look at backpressure. Consider the following pipe chain:

rs.pipeThrough(ts).pipeTo(ws);

rs is a ReadableStream, ts is a TransformStream, ws is a WritableStream. These are the connections that are created by the previous expression (.pipeThrough uses .pipeTo to connect rs to the writable side of ts):

rs -pipeTo-> ts{writable,readable} -pipeTo-> ws

Observations:

Let’s assume that the underlying sink of ws is slow and the buffer of ws is eventually full. Then the following steps happen:

This example illustrates that we need two kinds of functionality:

Let’s explore how these functionalities are implemented in the web streams API.

10.8.1 Signalling backpressure

Backpressure is signalled by entities that are receiving data. Web streams have two such entities:

In both cases, the input is buffered via queues. The signal to apply backpressure is when a queue is full. Let’s see how that can be detected.

These are the locations of the queues:

The desired size of a queue is a number that indicates how much room is left in the queue:

Therefore, we have to apply backpressure if the desired size is zero or less. It is available via the getter .desiredSize of the object which contains the queue.

How is the desired size computed? Via an object that specifies a so-called queuing strategy. ReadableStream and WritableStream have default queuing strategies which can be overridden via optional parameters of their constructors. The interface QueuingStrategy has two properties:

The desired size of a queue is the high water mark minus the current size of the queue.

10.8.2 Reacting to backpressure

Entities sending data need to react to signalled backpressure by exerting backpressure.

10.8.2.1 Code writing to a WritableStream via a Writer

If we want to, we can additionally base the size of our chunks on writer.desiredSize.

10.8.2.2 The underlying source of a ReadableStream

The underlying source object that can be passed to a ReadableStream wraps an external source. In a way, it is also a member of the pipe chain; one that comes before its ReadableStream.

10.8.2.3 The underlying sink of a WritableStream

The underlying sink object that can be passed to a WritableStream wraps an external sink. In a way, it is also a member of the pipe chain; one that comes after its WritableStream.

Each external sink signals backpressure differently (in some cases not at all). The underlying sink can exert backpressure by returning a Promise from method .write() that is fulfilled once writing is finished. There is an example in the web streams standard that demonstrates how that works.

10.8.2.4 A transformStream (.writable .readable)

The TransformStream connects its writable side with its readable side by implementing an underlying sink for the former and an underlying source for the latter. It has an internal slot .[[backpressure]] that indicates if internal backpressure is currently active or not.

10.8.2.5 .pipeTo() (ReadableStream WritableStream)

.pipeTo() reads chunks from the ReadableStream via a reader and write them to the WritableStream via a Writer. It pauses whenever writer.desiredSize is zero or less (web streams standard: Step 15 of ReadableStreamPipeTo).

10.9 Byte streams

So far, we have only worked with text streams, streams whose chunks were strings. But the web streams API also supports byte streams for binary data, where chunks are Uint8Arrays (TypedArrays):

Next, we’ll learn how to create readable byte streams.

10.9.1 Readable byte streams

What kind of stream is created by the ReadableStream constructor depends on the optional property .type of its optional first parameter underlyingSource:

What changes if a ReadableStream is in 'bytes' mode?

In default mode, the underlying source can return any kind of chunk. In bytes mode, the chunks must be ArrayBufferViews, i.e. TypedArrays (such as Uint8Arrays) or DataViews.

Additionally, a readable byte stream can create two kinds of readers:

“BYOB“ stands for “Bring Your Own Buffer” and means that we can pass a buffer (an ArrayBufferView) to reader.read(). Afterwards, that ArrayBufferView will be detached and no longer usable. But .read() returns its data in a new ArrayBufferView that has the same type and accesses the same region of the same ArrayBuffer.

Additionally, readable byte streams have different controllers: They are instances of ReadableByteStreamController (vs. ReadableStreamDefaultController). Apart from forcing underlying sources to enqueue ArrayBufferViews (TypedArrays or DataViews), it also supports ReadableStreamBYOBReaders via its property .byobRequest. An underlying source writes its data into the BYOBRequest stored in this property. The web streams standard has two examples of using .byobRequest in its section “Examples of creating streams”.

10.9.2 Example: an infinite readable byte stream filled with random data

In the next example, create an infinite readable byte stream that fills its chunks with random data (inspiration: example4.mjs in “Implementing the Web Streams API in Node.js”).

import {promisify} from 'node:util';
import {randomFill} from 'node:crypto';
const asyncRandomFill = promisify(randomFill);

const readableByteStream = new ReadableStream({
  type: 'bytes',
  async pull(controller) {
    const byobRequest = controller.byobRequest;
    await asyncRandomFill(byobRequest.view);
    byobRequest.respond(byobRequest.view.byteLength);
  },
});

const reader = readableByteStream.getReader({mode: 'byob'});
const buffer = new Uint8Array(10); // (A)
const firstChunk = await reader.read(buffer); // (B)
console.log(firstChunk);

Due to readableByteStream being infinite, we can’t loop over it. That’s why we only read its first chunk (line B).

The buffer we create in line A is transferred and therefore unreadable after line B.

10.9.3 Example: compressing a readable byte stream

In the following example, we create a readable byte stream and pipe it through a stream that compresses it to the GZIP format:

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // 256 zeros
    controller.enqueue(new Uint8Array(256));
    controller.close();
  },
});
const transformedStream = readableByteStream.pipeThrough(
  new CompressionStream('gzip'));
await logChunks(transformedStream);

async function logChunks(readableByteStream) {
  const reader = readableByteStream.getReader();
  try {
    while (true) {
      const {done, value} = await reader.read();
      if (done) break;
      console.log(value);
    }
  } finally {
    reader.releaseLock();
  }
}

10.9.4 Example: reading a web page via fetch()

The result of fetch() resolves to a response object whose property .body is a readable byte stream. We convert that byte stream to a text stream via TextDecoderStream:

const response = await fetch('https://example.com');
const readableByteStream = response.body;
const readableStream = readableByteStream.pipeThrough(
  new TextDecoderStream('utf-8'));
for await (const stringChunk of readableStream) {
  console.log(stringChunk);
}

10.10 Node.js-specific helpers

Node.js is the only web platform that supports the following helper functions that it calls utility consumers:

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';

These functions convert web ReadableStreams, Node.js Readables and AsyncIterators to Promises that are fulfilled with:

Binary data is assumed to be UTF-8-encoded:

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  type: 'bytes',
  start(controller) {
    // TextEncoder converts strings to UTF-8 encoded Uint8Arrays
    const encoder = new TextEncoder();
    const view = encoder.encode('"😀"');
    assert.deepEqual(
      view,
      Uint8Array.of(34, 240, 159, 152, 128, 34)
    );
    controller.enqueue(view);
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

String streams work as expected:

import * as streamConsumers from 'node:stream/consumers';

const readableByteStream = new ReadableStream({
  start(controller) {
    controller.enqueue('"😀"');
    controller.close();
  },
});
const jsonData = await streamConsumers.json(readableByteStream);
assert.equal(jsonData, '😀');

10.11 Further reading

All of the material mentioned in this section was a source for this chapter.

This chapter doesn’t cover every aspect of the web streams API. You can find more information here:

More material: