ETL: Load Data to Destination with Node.js
Now that we have our transformed planet data in our Node ETL (Extract, Transform, Load) pipeline, we need to load it to its destination. We are going to be writing the data to a JSON flat file on the file system, but you might be loading your data into a database or data warehouse. The important part is that we are sending the transformed data to the destination we need it to be in order to utilize the data, which is the final stage of an ETL pipeline.
By the end of this tutorial, you should be able to:
- Understand the Load step in a Node ETL pipeline
- Choose between a bulk insert of data or inserting each record individually
Goal
Load the transformed data to the destination.
Prerequisites
Watch ETL: Load Data to Destination
Overview
Now that we have our transformed planet data, we need to load it to its destination. We are going to be writing the data to a JSON flat file on the file system, but you might be loading your data into a database or data warehouse. The important part is that we are sending the transformed data to the destination we need it to be in order to utilize the data.
There are 2 different approaches, which might apply to your situation depending on the specifics of the destination. You can either bulk load all of the data at once, or insert each data record one at a time.
You need to think about the destination you are loading the data into. Does it support a bulk insert of many records at once? Or are you forced to insert 1 record at a time? Does order matter?
Deciding between a bulk load or inserting records individually depends heavily on the destination you are loading data into. For some systems, it might be more resource intensive to insert records individually, especially if you’re talking about hundreds of thousands or millions of records being inserted over a short time span.
But it’s also possible that an extremely large bulk insert, again with hundreds of thousands or millions of records, could lock up the database while it processes the unusually large load of data.
In this tutorial we will look at both approaches, and you’ll need to decide for yourself which is appropriate for your data source.
Bulk loading data
Bulk loading the data can be the simplest and most efficient option. It involves sending many data records at once to be inserted to the destination.
Let’s take a look at bulk loading all of our records to a JSON file. We will create a function which accepts all the records that we have transformed, stringifies them to JSON, and writes them to a file. In this example our destination is a JSON file on the file system.
const { promisify } = require("util");const fs = require("fs");const writeFilePromised = promisify(fs.writeFile);
function bulkLoadPlanetsToFile(planets, outputFilePath) { if (!outputFilePath) { throw new Error("Filepath required as second arguement"); } return writeFilePromised(outputFilePath, JSON.stringify(planets, null, 2));}
This bulkLoadPlanetsToFile
function has only one responsibility. It takes an array of transformed planet records and a file path as arguments, then writes the data as JSON to a file at the file path.
The function returns a Promise which resolves when the data was successfully written to a file. We have taken the fs.writeFile
function, which is normally callback-based, and used the promisify
function from the Node.js built-in utils
module to create a Promise based version of writeFile
.
If you were bulk loading data to a database instead of a JSON file, this function would instead take an array of records and make a bulk insert query to the DB , then return a Promise that resolves when the insert operation was complete.
The bulk loading function fits into our ETL pipeline after the Extract and Transform steps:
const { promisify } = require("util");const fs = require("fs");const writeFilePromised = promisify(fs.writeFile);
function bulkLoadPlanetsToFile(planets, outputFilePath) { if (!outputFilePath) { throw new Error("Filepath required as second argument"); } return writeFilePromised(outputFilePath, JSON.stringify(planets, null, 2));}
async function startETLPipeline() { console.log("Starting ETL pipeline"); const outputFile = `${__dirname}/out.json`; try { // extract const planets = await extractAllPlanets(); console.log(`Extracted ${planets.length} planet records`);
// transform const transformedPlanets = transformPlanetRecords(planets);
// load await bulkLoadPlanetsToFile(transformedPlanets, outputFile); console.log("Loading was successful, end of pipeline"); } catch (err) { console.log("Error encountered, ETL aborted\n", err); }}
startETLPipeline();
Load one record at a time
Sometimes you are forced to load records one at a time into your system. This is more rare than doing a bulk load if you have the option of bulk loading, but there are certain instances where it might be useful.
For example, if your system needs to treat each record as an event coming into the system, then you would want to insert the records one at a time to make sure the data is handled appropriately by the destination. Whatever the reason, this approach results in many more requests being sent to the destination than a bulk upload, so make sure you take into account the additional load this will put on the destination receiving the data.
To load data records one at a time, you can iterate over all of the records and handle them individually. We’ll be running the promises sequentially so as not to overwhelm the destination.
function insertRecord(record) { // return a Promise that resolves when record was inserted // this example is a mocked function return Promise.resolve();}
async function startETLPipeline() { console.log("Starting ETL pipeline"); const outputFile = `${__dirname}/out.json`; try { // extract const planets = await extractAllPlanets(); console.log(`Extracted ${planets.length} planet records`);
// transform const transformedPlanets = transformPlanetRecords(planets);
// load await transformedPlanets.reduce(async (previousPromise, planet) => { await previousPromise; return insertRecord(planet); }, Promise.resolve());
console.log("Loading was successful, end of pipeline"); } catch (err) { console.log("Error encountered, ETL aborted\n", err); }}
startETLPipeline();
We’ve mocked an insertRecord
function to represent inserting one record at a time. The mocked function just returns a promise, simulating the behavior of making an asynchronous insertion to a database or other destination.
To insert records sequentially, one after another, we are using a neat trick with reduce
to chain the promises together. If you aren’t familiar with reduce
, you can learn about it on the MDN Documentation. In short, reduce
iterates over each item in an array, passing in both an accumulator returned from the last iteration, and an item from the array. When using reduce
we pass it a function to run on each iteration, and also an initial accumulator value.
In the example, we pass an already resolved promise as the accumulator to reduce, and are returning a new promise on each iteration. When the first iteration runs, we are awaiting the already resolved promise, so it completes immediately. Then we return a new promise, the promise returned by insertRecord
, which will be passed to the next iteration to be awaited on before invoking the next call to insertRecord
.
Using reduce
this way allows us to use async/await to sequentially chain any number of promises without having to hard code each .then
handler. It’s roughly equivalent to manually chaining each promise like the following:
insertRecord() .then(insertRecord) .then(insertRecord) .then(insertRecord) .then(insertRecord);
We are running the insertRecord
function sequentially to avoid overwhelming whatever service you might be loading the data into. If we used something like Promise.all
to wait for the insertions to complete, we would be running all the insertions concurrently, which can easily overwhelm a database when trying to insert a large volume of records at the same time.
Recap
When it comes to loading your data at the end of an ETL process, your destination will dictate the specifics of how that needs to happen. The destination can be a database, flat file, data warehouse, or anywhere else that can store the transformed data. The destination will determine what options you have for loading the data: either sending data in bulk, or inserting each record individually. Consider the limitations of your destination and choose whichever method makes the most sense based on those limitations.
Further your understanding
- Imagine you are constrained to a maximum of 1000 records per bulk insert operation. How would you chunk the records into multiple bulk insert operations if you had 2500 records to insert?
- How would you write the sequential reduce function to collect the results of each promise as you iterate through the array?
Additional resources
- MDN docs for
Array.map
(developer.mozilla.org) - MDN docs for
Array.reduce
(developer.mozilla.org)