Use Streams to Extract, Transform, and Load CSV Data
Using the fs module and streams, we can take a local file and perform ETL steps on it to transform the file into JSON and write it to disk. We can use a helpful node module from npm, csvtojson, to convert the csv data for us. With these tools, we can:
- Create a readstream from a local csv file
- Pipe the csv data to csvtojson to transform it to JSON
- Alter the data using a Transform stream
- Write the transformed JSON data to a file using a WriteStream
- Handle error events
By the end of this tutorial, you should be able to:
- Explain how to use the fs module to create read/write streams
- Implement Transform streams to modify data as it passes through a stream
- Understand how to handle error events with streams
- Convert CSV data to JSON using streams
Goal
- Be able to perform ETL (extract, transform, load) operations on CSV data using streams.
Prerequisites
Watch
Overview
When working with flat files of data we can use the fs
module and streams to process their data in a memory-efficient way. Instead of reading all of the data from a file into memory, streams allow us to work with smaller chunks of data at a time. This keeps the memory consumption low when working with large files.
We want to extract, transform, and load the CSV data like we did previously with the data from the Exoplanet API. The CSV file we are working on is the same data we extracted previously from the Exoplanet API, but in CSV format instead of JSON.
A CSV file is a plain text file of tabular data that is separated by commas, and each line of the file is an individual data record.
We want to convert the CSV data to JSON, and also transform the planet records using the transformOnePlanet
function we created previously. Instead of writing a typical JSON file, we are going to be using a format called ndjson, which is a file of JSON records separated by newlines and uses the extension .ndjson.
The ndjson format works well with streaming data and large datasets where each record is processed individually. Using ndjson can make reading and processing large JSON files less prone to error, because a single invalid record being written accidentally won’t corrupt the entire file the way it would when using a traditional single JSON document structure. The main difference between ndjson and regular JSON is that each line of a file must contain a single JSON record. An ndjson file contains valid JSON, but the file itself isn’t a valid JSON document. So when reading the file later you’ll need to split the file up on each newline and parse the JSON records individually.
Clone the Starter Repo
To get the CSV file we will be working with, clone the following repo from Github:
git clone https://github.com/osiolabs/etl-streams-starter.git
Enter the etl-streams-starter directory and open the index.js file in your editor.
Create input stream and destination stream
First we will create a readable stream to read the CSV data from, and a writable stream which will be the destination we are writing the data to.
We can use the fs
module to help us create read and write streams from files.
const fs = require("fs");
const inputStream = fs.createReadStream("data/planetRaw.csv");const outputStream = fs.createWriteStream("data/outStreaming.ndjson");
To create a readable stream from our CSV file, we pass the path to the file to fs.createReadStream
. Likewise, to create the writable stream we will pass the path for where the output file will live. If this file doesn’t exist, it will be created once we start streaming data to it.
Already we can test this by piping the data from the readable stream to the writable stream. Once we use pipe
to connect the readable stream to the output stream, data is read from the input file and sent to the output file, one chunk at a time.
inputStream.pipe(outputStream);
This will copy the CSV file to outStreaming.ndjson, because we aren’t yet modifying the data.
Create a csv parser
In order to convert the CSV file to JSON, we are using an NPM module called csvtojson
. We can stream CSV data to csvtojson and it will convert the data into a JSON representation for us.
Install csvtojson
from npm:
npm install csvtojson
The csvtojson module will parse the header row of the CSV data in order to get the key names for each value, and then creates a JSON object from each row in the CSV file.
const fs = require("fs");const csv = require("csvtojson");
const inputStream = fs.createReadStream("data/planetRaw.csv");const outputStream = fs.createWriteStream("data/outStreaming.ndjson");
const csvParser = csv();
inputStream.pipe(csvParser).pipe(outputStream);
Right now the data flow from the CSV file looks something like this:
CSV File => CSV Parser => Output File
The inputStream is emitting data chunk by chunk to the CSV parser, which is parsing each CSV row to a JSON string, and then the CSV parser is emitting JSON strings to the output stream, which writes data to a file as it receives the data.
Next, let’s transform the planet records to the format that we want them to be in.
Create a transform stream to transform the planet records
The CSV parser is parsing the CSV into JSON strings for us, and emitting each row as a JSON string. Next we want to transform those individual rows into the JSON format that we have defined previously. We can even reuse the function we wrote to transform the records.
The csvtojson library has a subscribe
method which we could use to handle each record after it is parsed, but it requires us to mutate the object directly instead of creating a new one. Instead of using the subscribe
method it provides, we are going to create our own transform stream to receive each record as input and then output the transformed record.
To do this, we create an instance of a Transform stream. Transform streams are both writable and readable, so they can both consume data from readable streams (making them writable) as well as push their data to other streams (readable). A transform stream is special in that their output is computed in some way from their input.
To create a transform stream, we will require the Transform
class from the native streams
module and create a new instance of Transform.
const fs = require("fs");const csv = require("csvtojson");const { Transform } = require("stream");
const inputStream = fs.createReadStream("data/planetRaw.csv");const outputStream = fs.createWriteStream("data/outStreaming.ndjson");
const csvParser = csv();
const transformPlanetStream = new Transform({ transform: function(planet, encoding, cb) { // transform the input }});
inputStream.pipe(csvParser).pipe(outputStream);
Transform streams must implement a transform
method that will receive chunks of input as the first argument (we are calling those chunks planet
here). The transformPlanetStream
will receive data from the CSV parser in the form of JSON strings. The transform function also receives the encoding type for the data chunk and callback as arguments. We call the callback
when processing is done for this chunk of data. The callback
takes an error as its first argument (or null if no error ocurred) and the processed chunk of data as its second argument.
Our first step will be to convert the JSON string to a JavaScript object that we can transform. We are using JSON.parse
to convert the JSON strings to a JavaScript object, and wrapping the call to JSON.parse
in a try/catch block. If there is an error while parsing the JSON, because the JSON was invalid, we will catch the error which is thrown and pass it to the transform callback as the first argument.
const transformPlanetStream = new Transform({ transform: function(planet, encoding, cb) { try { const planetObject = JSON.parse(planet); } catch (err) { cb(err); } }});
Once we have a regular JavaScript object to work with, we can use the transformOnePlanet
function we wrote previously to transform the planet record. We need to require that function from the transform module we also wrote previously.
After transforming the planet object, we will output the record from the transform stream by passing the new data to the transform callback.
const fs = require("fs");const csv = require("csvtojson");const { transform } = require("stream");
const { transformOnePlanet } = require("./transform.js");
// ...
const transformPlanetStream = new Transform({ transform: function(planet, encoding, callback) { try { const planetObject = JSON.parse(planet); const transformedPlanet = transformOnePlanet(planetObject); const planetString = JSON.stringify(transformedPlanet) + "\n"; callback(null, planetString); } catch (err) { callback(err); } }});
We are using JSON.stringify
on the transformedPlanet
; this is because we want to output JSON strings, not JavaScript objects. If we were to output an object, it would need to be stringified somewhere along the way before it could be written to a file. We could create another transform stream to do that for us, but we’ve coded this transform stream to output the same type of data that it receives as input, JSON strings.
To make our JSON string compatible with the newline-delimited JSON format, we are appending a newline "\n"
character at the end of the JSON string after stringifying. This makes sure that each JSON record is written to the file on its own line.
Finally, let’s put this transform stream to work. The CSV parser will pipe data to the transform stream, which will in turn pipe its data to the output stream:
inputStream .pipe(csvParser) .pipe(transformPlanetStream) .pipe(outputStream);
If we check our output file located at data/outStreaming.ndjson we’ll see something like this:
{"name":"Kepler-213 b","discoveryMethod":"Transit","facility":"Kepler","neighbors":"2","orbitsInDays":"2.46236000","orbitsInDaysError":"±0.00000600","lastUpdate":"2014-05-14","hostStar":"Kepler-213"}{"name":"Kepler-213 c","discoveryMethod":"Transit","facility":"Kepler","neighbors":"2","orbitsInDays":"4.82296200","orbitsInDaysError":"±0.00000800","lastUpdate":"2014-05-14","hostStar":"Kepler-213"}{"name":"HAT-P-36 b","discoveryMethod":"Transit","facility":"HATNet","neighbors":"1","orbitsInDays":"1.32734683","orbitsInDaysError":"±0.00000048","lastUpdate":"2019-07-11","hostStar":"HAT-P-36"}{"name":"HAT-P-37 b","discoveryMethod":"Transit","facility":"HATNet","neighbors":"1","orbitsInDays":"2.79743600","orbitsInDaysError":"±0.00000700","lastUpdate":"2014-05-14","hostStar":"HAT-P-37"}
We’ve now successfully transformed the data from CSV into a different JSON format, stored in a newline-delimited JSON file.
Each line of the file should have exactly one planet record written to it. (Because the records are long, they might appear line-wrapped in your editor and look like they are on multiple lines.)
Error handling
Up until now we haven’t been handling any errors that our streams might produce. In the transformPlanetStream
we are passing any caught errors to the callback, but not handling the error that would cause the stream to emit. When a stream encounters an error, it emits an error event, and ends the stream.
Error events from streams must be handled or else they can crash your program. We have a few options for error handling. First is attaching a listener directly to a stream:
inputStream.on("error", err => console.log(err));csvParser.on("error", err => console.log(err));transformPlanetStream.on("error", err => console.log(err));outputStream.on("error", err => console.log(err));
This is useful if you want to handle errors for each stream individually, but it gets redundant quickly.
Another option for error handling replaces the use of pipe
. Since Node.js v10.0.0, the stream
module provides us with a method called pipeline
, which is used to pipe between streams and forward errors to a callback handler.
The pipeline
method takes at least two streams as arguments and a callback function as its last argument. When an error is emitted from any of the streams, the error will be passed to the callback function. If the streams complete successfully, the callback function will be invoked with null as the first argument.
const { Transform, pipeline } = require("stream");
// ...
pipeline(inputStream, csvParser, transformPlanetStream, outputStream, err => { if (err) { console.log("Pipeline encountered an error:", err); } else { console.log("Pipeline completed successfully"); }});
Using pipeline
is now the recommended way to pipe data between streams since Node.js v10.0.0. It helps clean up streams for us when an error occurs, destroying the streams involved in the pipeline if something goes wrong or once the streams have ended.
This is important to ensure that we don’t introduce memory leaks into our programs when working with streams. Memory leaks occur when your program is no longer using memory it has allocated, but that memory is not freed up by garbage collection. Memory leaks will slowly consume all the memory available to your application, eventually crashing the application.
When using the regular pipe
method to connect streams, the streams aren’t destroyed automatically when an error occurs. If streams aren’t destroyed, their associated memory won’t be released, which can create a memory leak.
The pipeline
method is a convenient way to both handle errors from streams, and clean streams up on completion or failure, to avoid memory leaks.
The final code looks like this:
const fs = require("fs");const csv = require("csvtojson");const { Transform, pipeline } = require("stream");
const { transformOnePlanet } = require("./transform");
const inputStream = fs.createReadStream("data/planetRaw.csv");const outputStream = fs.createWriteStream("data/outStreaming.json");const csvParser = csv();
const transformPlanetStream = new Transform({ transform: function(planet, encoding, callback) { try { const planetObject = JSON.parse(planet); const transformedPlanetRecord = transformOnePlanet(planetObject);
callback(null, JSON.stringify(transformedPlanetRecord, null, 2)); } catch (err) { callback(err); } }});
pipeline(inputStream, csvParser, transformPlanetStream, outputStream, err => { if (err) { console.log("Planet Pipeline encountered an error:", err); } else { console.log("Pipeline completed successfully"); }});
Recap
In this tutorial, we applied the concepts of ETL and streams to perform extract, transform, and load operations on CSV data. Using the fs
module we created a readable stream from the CSV file, and a writable stream for the destination file. We streamed the CSV data to an npm module called csvtojson
to convert the data to JSON strings, and then created a transform stream to alter the data into the format we wanted it to be before writing it to a file in newline-delimited JSON format.
We used the pipeline
method of the stream
module to pipe the data between the different streams while handling error events and cleaning up the streams once they are finished.
Further your understanding
- Walk through this example with a co-worker. Are there any concepts that are unclear to you that you need to review?
- How would you destroy a stream when you’re not using the
pipeline
method?
Additional resources
- Stream (Node.js Latest Version Documentation) (nodejs.org)
- csvtojson module for Node.js (npmjs.com)
- Newline-Delimited JSON Spec (ndjson.org)