Shell scripting with Node.js
You can buy the offline version of this book (HTML, PDF, EPUB, MOBI) and support the free online version.
(Ad, please don’t block.)

9 Native Node.js streams



This chapter is an introduction to Node’s native streams. They support asynchronous iteration which makes them easier to work with and which is what we will mostly use in this chapter.

Note that cross-platform web streams are covered in §10 “Using web streams on Node.js”. We will mostly use those in this book. Therefore, you can skip the current chapter if you want to.

9.1 Recap: asynchronous iteration and asynchronous generators

Asynchronous iteration is a protocol for retrieving the contents of a data container asynchronously (meaning the current “task” may be paused before retrieving an item).

Asynchronous generators help with async iteration. For example, this is an asynchronous generator function:

/**
 * @returns an asynchronous iterable
 */
async function* asyncGenerator(asyncIterable) {
  for await (const item of asyncIterable) { // input
    if (···) {
      yield '> ' + item; // output
    }
  }
}

In the remainder of the chapter, pay close attention to whether a function is an async function or an async generator function:

/** @returns a Promise */
async function asyncFunction() { /*···*/ }

/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }

9.2 Streams

A stream is a pattern whose core idea is to “divide and conquer” a large amount of data: We can handle it if we split it into smaller pieces and handle one portion at a time.

Node.js supports several kinds of streams – for example:

9.2.1 Pipelining

To process streamed data in multiple steps, we can pipeline (connect) streams:

  1. Input is received via a readable stream.
  2. Each processing step is performed via a transform stream.
  3. For the last processing step, we have two options:
    • We can write the data in the most recent readable stream into a writable stream. That is, the writable stream is the last element of our pipeline.
    • We can process the data in the most recent readable stream in some other manner.

Part (2) is optional.

9.2.2 Text encodings

When creating text streams, it is best to always specify an encoding:

The default value for encodings is null, which is equivalent to 'utf8'.

9.2.3 Helper function: readableToString()

We will occasionally use the following helper function. You don’t need to understand how it works, only (roughly) what it does.

import * as stream from 'stream';

/**
 * Reads all the text in a readable stream and returns it as a string,
 * via a Promise.
 * @param {stream.Readable} readable
 */
function readableToString(readable) {
  return new Promise((resolve, reject) => {
    let data = '';
    readable.on('data', function (chunk) {
      data += chunk;
    });
    readable.on('end', function () {
      resolve(data);
    });
    readable.on('error', function (err) {
      reject(err);
    });
  });
}

This function is implemented via the event-based API. We’ll later see a simpler way of doing this – via async iteration.

9.2.4 A few preliminary remarks

9.3 Readable streams

9.3.1 Creating readable streams

9.3.1.1 Creating readable streams from files

We can use fs.createReadStream() to create readable streams:

import * as fs from 'fs';

const readableStream = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});

assert.equal(
  await readableToString(readableStream),
  'This is a test!\n');
9.3.1.2 Readable.from(): Creating readable streams from iterables

The static method Readable.from(iterable, options?) creates a readable stream which holds the data contained in iterable. iterable can be a synchronous iterable or an asynchronous iterable. The parameter options is optional and can, among other things, be used to specify a text encoding.

import * as stream from 'stream';

function* gen() {
  yield 'One line\n';
  yield 'Another line\n';
}
const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
assert.equal(
  await readableToString(readableStream),
  'One line\nAnother line\n');
9.3.1.2.1 Creating readable streams from strings

Readable.from() accepts any iterable and can therefore also be used to convert strings to streams:

import {Readable} from 'stream';

const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
assert.equal(
  await readableToString(readable),
  'Some text!');

At the moment, Readable.from() treats a string like any other iterable and therefore iterates over its code points. That isn’t ideal, performance-wise, but should be OK for most use cases. I expect Readable.from() to be often used with strings, so maybe there will be optimizations in the future.

9.3.2 Reading chunks from readable streams via for-await-of

Every readable stream is asynchronously iterable, which means that we can use a for-await-of loop to read its contents:

import * as fs from 'fs';

async function logChunks(readable) {
  for await (const chunk of readable) {
    console.log(chunk);
  }
}

const readable = fs.createReadStream(
  'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);

// Output:
// 'This is a test!\n'
9.3.2.1 Collecting the contents of a readable stream in a string

The following function is a simpler reimplementation of the function that we have seen at the beginning of this chapter.

import {Readable} from 'stream';

async function readableToString2(readable) {
  let result = '';
  for await (const chunk of readable) {
    result += chunk;
  }
  return result;
}

const readable = Readable.from('Good morning!', {encoding: 'utf8'});
assert.equal(await readableToString2(readable), 'Good morning!');

Note that, in this case, we had to use an async function because we wanted to return a Promise.

9.3.3 Reading lines from readable streams via module 'node:readlines'

The built-in module 'node:readline' lets us read lines from readable streams:

import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';

const filePath = process.argv[2]; // first command line argument

const rl = readline.createInterface({
  input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
});
for await (const line of rl) {
  console.log('>', line);
}
rl.close();

9.4 Transforming readable streams via async generators

Async iteration provides an elegant alternative to transform streams for processing streamed data in multiple steps:

To summarize, these are the pieces of such processing pipelines:

readable
→ first async generator [→ … → last async generator]
→ readable or async function

9.4.1 Going from chunks to numbered lines in async iterables

In the next example, we’ll see an example of a processing pipeline as it was just explained.

import {Readable} from 'stream';

/**
 * @param chunkIterable An asynchronous or synchronous iterable
 * over “chunks” (arbitrary strings)
 * @returns An asynchronous iterable over “lines”
 * (strings with at most one newline that always appears at the end)
 */
async function* chunksToLines(chunkIterable) {
  let previous = '';
  for await (const chunk of chunkIterable) {
    let startSearch = previous.length;
    previous += chunk;
    while (true) {
      // Works for EOL === '\n' and EOL === '\r\n'
      const eolIndex = previous.indexOf('\n', startSearch);
      if (eolIndex < 0) break;
      // Line includes the EOL
      const line = previous.slice(0, eolIndex+1);
      yield line;
      previous = previous.slice(eolIndex+1);
      startSearch = 0;
    }
  }
  if (previous.length > 0) {
    yield previous;
  }
}

async function* numberLines(lineIterable) {
  let lineNumber = 1;
  for await (const line of lineIterable) {
    yield lineNumber + ' ' + line;
    lineNumber++;
  }
}

async function logLines(lineIterable) {
  for await (const line of lineIterable) {
    console.log(line);
  }
}

const chunks = Readable.from(
  'Text with\nmultiple\nlines.\n',
  {encoding: 'utf8'});
await logLines(numberLines(chunksToLines(chunks))); // (A)

// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'

The processing pipeline is set up in line A. The steps are:

Observation:

9.5 Writable streams

9.5.1 Creating writable streams for files

We can use fs.createWriteStream() to create writable streams:

const writableStream = fs.createWriteStream(
  'tmp/log.txt', {encoding: 'utf8'});

9.5.2 Writing to writable streams

In this section, we look at approaches to writing to a writable stream:

  1. Writing directly to the writable stream via its method .write().
  2. Using function pipeline() from module stream to pipe a readable stream to the writable stream.

To demonstrate these approaches, we use them to implement the same function writeIterableToFile().

Method .pipe() of readable streams also supports piping but it has a downside and it’s better to avoid it.

9.5.2.1 writable.write(chunk)

When it comes to writing data to streams, there are two callback-based mechanisms that help us:

In the following example, we promisify these mechanisms so that we can use them via an async function:

import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';

const finished = util.promisify(stream.finished); // (A)

async function writeIterableToFile(iterable, filePath) {
  const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
  for await (const chunk of iterable) {
    if (!writable.write(chunk)) { // (B)
      // Handle backpressure
      await once(writable, 'drain');
    }
  }
  writable.end(); // (C)
  // Wait until done. Throws if there are errors.
  await finished(writable);
}

await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
assert.equal(
  fs.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
  'One line of text.\n');

The default version of stream.finished() is callback-based but can be turned into a Promise-based version via util.promisify() (line A).

We used the following two patterns:

9.5.2.2 Piping readable streams to writable streams via stream.pipeline()

In line A, we use a promisified version of stream.pipeline() to pipe a readable stream readable to a writable stream writable:

import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);

async function writeIterableToFile(iterable, filePath) {
  const readable = stream.Readable.from(
    iterable, {encoding: 'utf8'});
  const writable = fs.createWriteStream(filePath);
  await pipeline(readable, writable); // (A)
}
await writeIterableToFile(
  ['One', ' line of text.\n'], 'tmp/log.txt');
// ···

Method readable.pipe() also supports piping, but has a caveat: If the readable emits an error, then the writable is not closed automatically. pipeline() does not have that caveat.

Module os:

Module buffer:

Module stream:

Module fs:

The static type information in this section is based on Definitely Typed.

9.7 Further reading and sources of this chapter