readableToString()
This chapter is an introduction to Node’s native streams. They support asynchronous iteration which makes them easier to work with and which is what we will mostly use in this chapter.
Note that cross-platform web streams are covered in §10 “Using web streams on Node.js”. We will mostly use those in this book. Therefore, you can skip the current chapter if you want to.
Asynchronous iteration is a protocol for retrieving the contents of a data container asynchronously (meaning the current “task” may be paused before retrieving an item).
Asynchronous generators help with async iteration. For example, this is an asynchronous generator function:
/**
* @returns an asynchronous iterable
*/
async function* asyncGenerator(asyncIterable) {
for await (const item of asyncIterable) { // input
if (···) {
yield '> ' + item; // output
}
} }
for-await-of
loop iterates over the input asyncIterable
. This loop is also available in normal asynchronous functions.yield
feeds values into the asynchronous iterable that is returned by this generator.In the remainder of the chapter, pay close attention to whether a function is an async function or an async generator function:
/** @returns a Promise */
async function asyncFunction() { /*···*/ }
/** @returns an async iterable */
async function* asyncGeneratorFunction() { /*···*/ }
A stream is a pattern whose core idea is to “divide and conquer” a large amount of data: We can handle it if we split it into smaller pieces and handle one portion at a time.
Node.js supports several kinds of streams – for example:
Readable streams are streams from which we can read data. In other words, they are sources of data. An example is a readable file stream, which lets us read the contents of a file.
Writable streams are streams to which we can write data. In other words, they are sinks for data. An example is a writable file stream, which lets us write data to a file.
A transform stream is both readable and writable. As a writable stream, it receives pieces of data, transforms (changes or discards) them and then outputs them as a readable stream.
To process streamed data in multiple steps, we can pipeline (connect) streams:
Part (2) is optional.
When creating text streams, it is best to always specify an encoding:
The Node.js docs have a list of supported encodings and their default spellings – for example:
'utf8'
'utf16le'
'base64'
A few different spellings are also allowed. You can use Buffer.isEncoding()
to check which ones are:
> buffer.Buffer.isEncoding('utf8')true
> buffer.Buffer.isEncoding('utf-8')true
> buffer.Buffer.isEncoding('UTF-8')true
> buffer.Buffer.isEncoding('UTF:8')false
The default value for encodings is null
, which is equivalent to 'utf8'
.
readableToString()
We will occasionally use the following helper function. You don’t need to understand how it works, only (roughly) what it does.
import * as stream from 'stream';
/**
* Reads all the text in a readable stream and returns it as a string,
* via a Promise.
* @param {stream.Readable} readable
*/
function readableToString(readable) {
return new Promise((resolve, reject) => {
let data = '';
.on('data', function (chunk) {
readable+= chunk;
data ;
}).on('end', function () {
readableresolve(data);
;
}).on('error', function (err) {
readablereject(err);
;
});
}) }
This function is implemented via the event-based API. We’ll later see a simpler way of doing this – via async iteration.
await
being used at the top level. In that case, we imagine that we are inside a module or inside the body of an async function.'\n'
(LF)'\r\n'
(CR LF)EOL
in module os
.We can use fs.createReadStream()
to create readable streams:
import * as fs from 'fs';
const readableStream = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
.equal(
assertawait readableToString(readableStream),
'This is a test!\n');
Readable.from()
: Creating readable streams from iterablesThe static method Readable.from(iterable, options?)
creates a readable stream which holds the data contained in iterable
. iterable
can be a synchronous iterable or an asynchronous iterable. The parameter options
is optional and can, among other things, be used to specify a text encoding.
import * as stream from 'stream';
function* gen() {
yield 'One line\n';
yield 'Another line\n';
}const readableStream = stream.Readable.from(gen(), {encoding: 'utf8'});
.equal(
assertawait readableToString(readableStream),
'One line\nAnother line\n');
Readable.from()
accepts any iterable and can therefore also be used to convert strings to streams:
import {Readable} from 'stream';
const str = 'Some text!';
const readable = Readable.from(str, {encoding: 'utf8'});
.equal(
assertawait readableToString(readable),
'Some text!');
At the moment, Readable.from()
treats a string like any other iterable and therefore iterates over its code points. That isn’t ideal, performance-wise, but should be OK for most use cases. I expect Readable.from()
to be often used with strings, so maybe there will be optimizations in the future.
for-await-of
Every readable stream is asynchronously iterable, which means that we can use a for-await-of
loop to read its contents:
import * as fs from 'fs';
async function logChunks(readable) {
for await (const chunk of readable) {
console.log(chunk);
}
}
const readable = fs.createReadStream(
'tmp/test.txt', {encoding: 'utf8'});
logChunks(readable);
// Output:
// 'This is a test!\n'
The following function is a simpler reimplementation of the function that we have seen at the beginning of this chapter.
import {Readable} from 'stream';
async function readableToString2(readable) {
let result = '';
for await (const chunk of readable) {
+= chunk;
result
}return result;
}
const readable = Readable.from('Good morning!', {encoding: 'utf8'});
.equal(await readableToString2(readable), 'Good morning!'); assert
Note that, in this case, we had to use an async function because we wanted to return a Promise.
'node:readlines'
The built-in module 'node:readline'
lets us read lines from readable streams:
import * as fs from 'node:fs';
import * as readline from 'node:readline/promises';
const filePath = process.argv[2]; // first command line argument
const rl = readline.createInterface({
input: fs.createReadStream(filePath, {encoding: 'utf-8'}),
;
})for await (const line of rl) {
console.log('>', line);
}.close(); rl
Async iteration provides an elegant alternative to transform streams for processing streamed data in multiple steps:
Readable.from()
(which can later be piped into a writable stream).To summarize, these are the pieces of such processing pipelines:
In the next example, we’ll see an example of a processing pipeline as it was just explained.
import {Readable} from 'stream';
/**
* @param chunkIterable An asynchronous or synchronous iterable
* over “chunks” (arbitrary strings)
* @returns An asynchronous iterable over “lines”
* (strings with at most one newline that always appears at the end)
*/
async function* chunksToLines(chunkIterable) {
let previous = '';
for await (const chunk of chunkIterable) {
let startSearch = previous.length;
+= chunk;
previous while (true) {
// Works for EOL === '\n' and EOL === '\r\n'
const eolIndex = previous.indexOf('\n', startSearch);
if (eolIndex < 0) break;
// Line includes the EOL
const line = previous.slice(0, eolIndex+1);
yield line;
= previous.slice(eolIndex+1);
previous = 0;
startSearch
}
}if (previous.length > 0) {
yield previous;
}
}
async function* numberLines(lineIterable) {
let lineNumber = 1;
for await (const line of lineIterable) {
yield lineNumber + ' ' + line;
++;
lineNumber
}
}
async function logLines(lineIterable) {
for await (const line of lineIterable) {
console.log(line);
}
}
const chunks = Readable.from(
'Text with\nmultiple\nlines.\n',
encoding: 'utf8'});
{await logLines(numberLines(chunksToLines(chunks))); // (A)
// Output:
// '1 Text with\n'
// '2 multiple\n'
// '3 lines.\n'
The processing pipeline is set up in line A. The steps are:
chunksToLines()
: Go from an async iterable with chunks to an async iterable with lines.numberLines()
: Go from an async iterable with lines to an async iterable with numbered lines.logLines()
: Log the items in an async iterable.Observation:
chunksToLines()
and numberLines()
are async iterables. That’s why they are async generators (as indicated by async
and *
).logLines()
is an async iterable. That’s why it is an async function (as indicated by async
).We can use fs.createWriteStream()
to create writable streams:
const writableStream = fs.createWriteStream(
'tmp/log.txt', {encoding: 'utf8'});
In this section, we look at approaches to writing to a writable stream:
.write()
.pipeline()
from module stream
to pipe a readable stream to the writable stream.To demonstrate these approaches, we use them to implement the same function writeIterableToFile()
.
Method .pipe()
of readable streams also supports piping but it has a downside and it’s better to avoid it.
writable.write(chunk)
When it comes to writing data to streams, there are two callback-based mechanisms that help us:
'drain'
signals that backpressure is over.finished()
invokes a callback when a stream:
In the following example, we promisify these mechanisms so that we can use them via an async function:
import * as util from 'util';
import * as stream from 'stream';
import * as fs from 'fs';
import {once} from 'events';
const finished = util.promisify(stream.finished); // (A)
async function writeIterableToFile(iterable, filePath) {
const writable = fs.createWriteStream(filePath, {encoding: 'utf8'});
for await (const chunk of iterable) {
if (!writable.write(chunk)) { // (B)
// Handle backpressure
await once(writable, 'drain');
}
}.end(); // (C)
writable// Wait until done. Throws if there are errors.
await finished(writable);
}
await writeIterableToFile(
'One', ' line of text.\n'], 'tmp/log.txt');
[.equal(
assert.readFileSync('tmp/log.txt', {encoding: 'utf8'}),
fs'One line of text.\n');
The default version of stream.finished()
is callback-based but can be turned into a Promise-based version via util.promisify()
(line A).
We used the following two patterns:
Writing to a writable stream while handling backpressure (line B):
if (!writable.write(chunk)) {
await once(writable, 'drain');
}
Closing a writable stream and waiting until writing is done (line C):
.end();
writableawait finished(writable);
stream.pipeline()
In line A, we use a promisified version of stream.pipeline()
to pipe a readable stream readable
to a writable stream writable
:
import * as stream from 'stream';
import * as fs from 'fs';
const pipeline = util.promisify(stream.pipeline);
async function writeIterableToFile(iterable, filePath) {
const readable = stream.Readable.from(
, {encoding: 'utf8'});
iterableconst writable = fs.createWriteStream(filePath);
await pipeline(readable, writable); // (A)
}await writeIterableToFile(
'One', ' line of text.\n'], 'tmp/log.txt');
[// ···
readable.pipe(destination)
Method readable.pipe()
also supports piping, but has a caveat: If the readable emits an error, then the writable is not closed automatically. pipeline()
does not have that caveat.
Module os
:
const EOL: string
(since 0.7.8)
Contains the end-of-line character sequence used by the current platform.
Module buffer
:
Buffer.isEncoding(encoding: string): boolean
(since 0.9.1)
Returns true
if encoding
correctly names one of the supported Node.js encodings for text. Supported encodings include:
'utf8'
'utf16le'
'ascii'
'latin1
'base64'
'hex'
(each byte as two hexadecimal characters)Module stream
:
Readable.prototype[Symbol.asyncIterator](): AsyncIterableIterator<any>
(since 10.0.0)
Readable streams are asynchronously iterable. For example, you can use for-await-of
loops in asyc functions or async generators to iterate over them.
finished(stream: ReadableStream | WritableStream | ReadWriteStream, callback: (err?: ErrnoException | null) => void): () => Promise<void>
(since 10.0.0)
The returned Promise is settled when reading/writing is done or there was an error.
This promisified version is created as follows:
const finished = util.promisify(stream.finished);
pipeline(...streams: Array<ReadableStream|ReadWriteStream|WritableStream>): Promise<void>
(since 10.0.0)
Pipes between streams. The returned Promise is settled when the pipeline is complete or when there was an error.
This promisified version is created as follows:
const pipeline = util.promisify(stream.pipeline);
Readable.from(iterable: Iterable<any> | AsyncIterable<any>, options?: ReadableOptions): Readable
(since 12.3.0)
Converts an iterable into a readable stream.
interface ReadableOptions {
?: number;
highWaterMark?: string;
encoding?: boolean;
objectMode?(this: Readable, size: number): void;
read?(this: Readable, error: Error | null,
destroy: (error: Error | null) => void): void;
callback?: boolean;
autoDestroy }
These options are the same as the options for the Readable
constructor and documented there.
Module fs
:
createReadStream(path: string | Buffer | URL, options?: string | {encoding?: string; start?: number}): ReadStream
(since 2.3.0)
Creates a readable stream. More options are available.
createWriteStream(path: PathLike, options?: string | {encoding?: string; flags?: string; mode?: number; start?: number}): WriteStream
(since 2.3.0)
With option .flags
you can specify if you want to write or append and what happens if a file does or does not exist. More options are available.
The static type information in this section is based on Definitely Typed.