Skip to content

What Is a Node.js Stream?

Streams are a built-in Node.js language feature that represent an asynchronous flow of data, and are a way to handle reading/writing files. They’re meant to be like Unix pipes, allowing us to hook up a data source one end, any number of consumers on the other end, and pass data between them. Node streams can help you process a file that’s larger than your free memory space because, instead of a program reading a file into memory all at once, it processes data into smaller chunks.

By the end of this tutorial, you should be able to:

  • Learn what is a Node.js stream
  • Understand the limitations of reading an entire file into memory
  • Know what are the benefits of, and use cases for, Node streams
  • Understand some of the events streams emit
  • Learn what pipe does in Node, and how to connect streams using pipe
  • Use the fs module to create Node streams from files
  • Know how to handle stream errors in Node

Goal

  • Understand what a stream is in the context of a Node.js application.

Prerequisites

  • None

Watch: What Is a Stream?

What are streams?

Streams are an interface for working with streaming data: data which arrives chunk by chunk over time. If you’re familiar with Unix pipes (|), then you already have a working mental model of streams. If you’re not, no worries — you don’t need to know Unix to work with streams in Node.js.

Essentially, a stream is a collection of data which isn’t available all at once. The data in a stream arrives piece by piece, typically one chunk at a time. As a result, we handle each piece of data as it arrives from the stream, whenever that may be.

In Node.js, streams are used by many of the built-in modules to handle asynchronous data processing. For example, the built-in http module uses streaming interfaces within ClientRequest and ServerResponse.

Stream data is a Buffer by default, unless the stream has been specifically configured to work with Objects.

Why use streams?

When working with data which is too large to fit into memory, streams are absolutely essential.

Imagine you need to work with a 20gb file of analytics data. The file contains millions of rows of data, and you want to transform each row in the dataset. If you read the entire file into memory, not only would this take a long time, but you’ll hit the memory limit of Node.js and receive an out of memory error. Even if you didn’t hit the the memory limit, you’d likely run out of memory on your computer!

Streams let us work with data that is too large to fit into memory. That’s because with streams, we only ever work with chunks of data at a time. Instead of reading the entire file into memory, we can work on each row from the dataset one at a time. Streams are memory efficient in this regard; they consume minimal amounts of memory compared to buffering all the data in memory at once.

But streams are useful beyond working with large files. In the scenario where you read an entire file into memory (assuming it fits into memory), time passes before the file has finished being read.

When consuming data from a stream, we are able to start processing the data as soon as the first chunk arrives. Streams are time efficient compared to waiting for all the data to be read into memory.

Finally, streams are composable, meaning they can be connected to and combined with other streams. The output of one stream can be used as the input for another stream, allowing us to combine streams into a pipeline through which data can flow between the various streams.

Get to know your streams

There are 4 main types of streams in Node.js in the built-in stream module.

  • Readable: A source of input. You receive data from a Readable stream.
  • Writable: A destination. You stream data to a Writable stream. Sometimes referred to as a “sink”, because it is the end-destination for streaming data.
  • Duplex: A stream that implements both a Readable and Writable interface. Duplex streams can receive data, as well as produce data. An example would be a TCP socket, where data flows in both directions.
  • Transform: A type of Duplex stream, where the data passing through the stream is altered. The output will be different from the input in some way. You can send data to a Transform stream, and also read data from it after the data has been transformed.
  • PassThrough: A special type of Transform stream, which does not alter the data passing through it. According to the documentation, these are mostly used for testing and examples; they serve little actual purpose beyond passing data along.

Out of these, you’re most likely to encounter Readable, Writable, and Transform streams.

Stream events

All streams are instances of EventEmitter. EventEmitter objects in Node.js are used to emit and respond to events asynchronously.

The events emitted by streams can be used to read and write data, manage the stream state, and handle errors.

While it’s important to understand some of the events streams can produce, listening to events is not the recommended way to consume streams.

Instead, we (and the Node.js Docs) recommend the pipe and pipeline methods which consume streams and handle the underlying events for you. We will cover those methods next in this tutorial, but want to point out that all streams emit events.

Working with stream events directly is useful when you want finer grained control over how a stream is consumed. Controlling, for example, what happens when a particular stream ends or begins sending data.

You can read the Node.js Stream docs for more in-depth information about stream events, but here’s some of the most important ones to understand.

Readable stream events

  • data: Emitted when the stream outputs a chunk of data. The data chunk is passed to the handler.
  • readable: Emitted when there is data ready to be read from the stream.
  • end: Emitted when no more data is available for consumption.
  • error: An error has occurred within the stream, and an error object is passed to the handler. Unhandled stream errors can crash your program.

Writable stream events

  • drain: The writable stream’s internal buffer has been cleared and is ready to have more data written into it.
  • finish: All data has been written to the underlying system.
  • error: An error occurred while writing data, and an error object is passed to the handler. Unhandled stream errors can crash your program.

Connect streams with pipe

To connect streams together and start the flow of data, we use the pipe method available on readable streams. Piping is the main way we consume streams, directing the flow of data from a readable stream to the attached writable stream. You can consume data with either event listeners or the pipe method, but you should not mix the two approaches.

Under the hood, the pipe method uses the events emitted by streams, abstracting away the need to handle those events. One exception is that you still need to handle error events somehow, and we’ll talk about the best way to do that in a moment.

The pipe method is available on streams that implement a Readable interface (Readable, Duplex, Transform, and Passthrough streams). The pipe method accepts a destination to pipe data to. The destination stream must implement a Writable interface (Writable, Duplex, Transform, and Passthrough streams).

Put the following example into a file and run it on the command line, type “Hello World”, and then hit enter. You will see “Hello World” echoed back to you.

Note: process.stdin is a globally available readable stream that reads input from the terminal, and process.stdout is writable stream that outputs to the terminal

process.stdin.pipe(process.stdout);

In the example, process.stdin is the readable source of data, and the process.stdout is the writable destination. When you input text at the command line with the example running, the text is piped from stdin to stdout, creating the echo effect you see.

Calling pipe will return the destination stream, allowing us to chain another pipe onto the stream if the destination was a duplex or transform stream.

To demonstrate chaining multiple streams together, we can use a PassThrough stream. Because PassThrough (like a Duplex or a Transform stream) implements both a Writable and Readable interface, we are able to pipe data into it as well as from it. It’s a trivial example, though, because the PassThrough stream does nothing but pass the data on.

const { PassThrough } = require("stream");
const passThrough = new PassThrough();
process.stdin.pipe(passThrough).pipe(process.stdout);

Use fs to create streams from files

So far we’ve talked about the basic concepts of streams, the differences between read and write streams, and how to connect them using pipe. We haven’t talked yet about how to create our own streams. There is a big difference between implementing streaming interfaces and consuming ones which are available to us.

Implementing your own streams is not as common as consuming them, but there are instances when we do want to create our own streams. However, we’re going to talk about the most common use cases for doing so, and that involves streaming data to and from files using the fs module.

The fs module is able to create read and write streams for us, using convenient helper methods fs.createReadStream and fs.createWriteStream.

These methods allow us to either read data from a file as a stream, or write data to a file as a stream. The use case we mentioned at the very beginning of this tutorial, processing files too large to fit in memory, relies on these methods.

To demonstrate, let’s create a simple stream that writes text from stdin to a file called file.txt:

const fs = require("fs");
const outputStream = fs.createWriteStream("file.txt");
process.stdin.pipe(outputStream);

If you run this example, it works very much the same as when we piped stdin to stdout — with the important exception that we are writing the output directly to a file instead of to stdout.

The fs.createWriteStream method takes a file path as its first argument, and optional config arguments.

After running the example and typing some data into the terminal, open file.txt, and you’ll see that your text has been written to the file. If you rerun the example, file.txt is overwritten by default when we create the write stream again.

Once you have some text in the file.txt file, let’s invert this and pipe the data from file.txt to stdout:

const fs = require("fs");
const inputFileStream = fs.createReadStream("file.txt");
inputFileStream.pipe(process.stdout);

Run this code and you’ll see the text from file.txt written to the terminal.

We mentioned that creating a write stream from a file will overwrite the file by default. This can be changed by passing flags to the options object when creating the write stream. By default the file will be overwritten, but we can pass {flags: "a"} to create the write stream in append mode. This will append data to the file if it exists already, or create the file if it doesn’t exist.

const fs = require("fs");
const outputStream = fs.createWriteStream("file.txt", { flags: "a" });
process.stdin.pipe(outputStream);

Handling stream errors

One of the most important events fired by streams is the error event. When something goes wrong in a stream, an error event will be emitted. If this error event isn’t handled, it can crash your application. So it’s very important to make sure errors are handled when working with streams.

To handle error events with streams you can attach an error event handler directly onto each stream.

const { PassThrough } = require("stream");
const passThrough = new PassThrough();
passThrough.on("error", err => {
console.error("passThrough encountered an error:", err);
});
process.stdin.on("error", err => {
console.error("stdin encountered an error:", err);
});
process.stdout.on("error", err => {
console.error("stdout encountered an error:", err);
});
process.stdin.pipe(passThrough).pipe(process.stdout);
passThrough.emit("error", new Error("Uh oh!"));

If you run this code sample, the Passthrough stream will emit and then handle the error. You’ll see the error logged to the console, and then the program will end successfully because there is nothing left to do. If you remove the error handling for the Passthrough stream, running this example will crash the program with an unhandled exception, exiting the program early and with a status code of 1.

Handling error events this way works fine, and prevents the error from crashing your application. However, it can be cumbersome to attach these handlers for every stream you’re working with.

Next, we’ll look at pipeline, an alternative to pipe, which aims to help with handling errors and stream cleanup.

Connect streams with pipeline

To improve the experience of piping streams, a new method was added to the stream module in Node.js v10.0.0 called pipeline.

The pipeline method takes any number of streams as arguments, and a callback function as its last argument. If an error occurs anywhere in the pipeline, the pipeline will end and the callback will be invoked with the error that occurred. The callback function is also invoked when the pipeline successfully ends, providing us a way to know when the pipeline has completed.

const { PassThrough, pipeline } = require("stream");
const fs = require("fs");
// make sure you have a file.txt file with some text in it!
const input = fs.createReadStream("file.txt");
const out = fs.createWriteStream("out.txt");
const passThrough = new PassThrough();
console.log("Starting pipeline...");
pipeline(input, passThrough, out, err => {
if (err) {
console.log("Pipeline failed with an error:", err);
} else {
console.log("Pipeline ended successfully");
}
});
// passThrough.emit("error", new Error("Uh oh!"));

Running the above code as is will log to the console Pipeline ended successfully once the file.txt file has been piped to the out.txt file. Uncomment the last line and run the code again, and the pipeline is ended with an error, logging Pipeline failed with an error to the console.

Using pipeline simplifies error handling and stream cleanup, and makes combining streams in complex ways more straightforward.

When an error occurs while using pipeline, the streams will be destroyed. Destroying the streams releases their internal resources, freeing up any memory the stream was using.

This cleanup step of destroying streams is useful for preventing memory leaks, which occur when a stream has ended but has not released the resources that it is using. When using the basic pipe method, you are responsible for destroying streams yourself when things go wrong, so one of the main reasons to use pipeline as opposed to pipe is to make sure streams are automatically cleaned up when errors occur.

Implementing transform streams

The examples we’ve shown are fairly trivial, meant to demonstrate how to work with streams. A more powerful use of streams is creating your own transform streams to alter data as it is streamed from the source to the destination.

We can implement our own transform stream to do something useful. Let’s create a transform stream which converts all strings that pass through it to upper case:

const { Transform, pipeline } = require("stream");
const upperCaseTransform = new Transform({
transform: function(chunk, encoding, callback) {
callback(null, chunk.toString().toUpperCase());
}
});
pipeline(process.stdin, upperCaseTransform, process.stdout, err => {
if (err) {
console.log("Pipeline encountered an error:", err);
} else {
console.log("Pipeline ended");
}
});

To create a transform stream, we are using the Transform constructor from the stream module. We are required to implement a method called transform on our transform stream.

This method will receive a chunk of data that pass through the transform stream, the encoding of the chunk, and a callback function which we can use to return either an error or the newly transformed data.

In the example we are converting the chunk to a string, because by default the data chunk will be a Buffer, and then calling the string method toUpperCase to convert the string to upper case. Once we return the transformed data by using the callback function, it is outputted as readable data to the next stream.

Transform streams can be very powerful for helping us create pipelines to alter or process streaming data. If you find yourself trying to alter data from a readable stream by listening to the .on("data") event, you should consider implementing a transform stream instead, as it’s easier to understand and much more composable.

Recap

In this tutorial, we discussed what streams are and how to work with them. We learned about the different types of streams, some of the events they emit, and how to connect streams to one another.

Further your understanding

  • What is the difference between a Readable stream and a Writable stream?
  • What built-in Node.js modules offer a streaming interface?
  • How are Duplex streams and Transform streams different?
  • What does composable mean? (Read the accepted answer for What does composability mean in context of functional programming? on StackOverflow.com for additional insight.)

Additional resources