ETL from Mongo to Redshift

 | 

This is in a sense a follow-up to our CTO’s blog post about our Jenkins ETL. In that post, he talks about how we use Jenkins pipeline stages for each part of our data pipeline, which runs daily. In this post, I’ll be talking specifically about a single new stage, in which we attempt to ETL the Mongo nosql database of a company we merged with (Retrofit) into our Redshift data warehouse, and all the fun of dealing with migrating JSON representation to columnar.

A major thing I want to discuss here is what was missing when I went to look for precedent. There are plenty of people on the internet talking about things at a high level, or basic cases, and I certainly benefited from many of them [1][2][3][4]. However, none of them went into detail like I wanted, especially when it comes to arrays in JSON records or other things where JSON doesn’t fit SQL well. Before we get into the juicy stuff I’ll try to quickly cover the ground of what our goal was and the basic parts of what we accomplished. If you got here via Google and just want to know what we did about arrays, you can safely skip ahead to the arrays section.

Goals

The primary goal of this project was to migrate an existing report written for Postgres which leveraged mosql (a simple tool for getting Mongo data into PostgreSQL). While we could have probably used mosql via an intermediate Postgres step, we decided to avoid it this time around because mosql is not under active development. We also wanted to avoid storing JSON in Redshift, even though there are Redshift JSON functions to match the Postgres JSON functions used in the original report – they’re not really the most natural way of working with SQL and data scientists using these tables later would appreciate standard SQL.

Getting started

Getting the initial simple (not-containing-array) collections ETL’d was rather trivial. We started with Variety to get de facto schemas for our collections. With some light scripting, we generated DDLs from Variety (along with the jsonpaths and explicit export fields to limit IO in collections containing arrays). Then for the actual pipeline (orchestrated by Jenkins) we:

  1. export the collections to JSON lines using mongoexport
  2. upload those JSON lines and the jsonpath to S3, then
  3. use Redshift’s Copy from JSON to load the data (using jsonpaths made flattening easy here as well)
-- update_from_mongo_to_json.sql (parameterization is psql-style)

BEGIN;

DELETE FROM :table; -- do a full reload

COPY :table
FROM :s3_file
WITH CREDENTIALS :s3_credentials
TIMEFORMAT 'auto'
JSON :jsonpath
GZIP;

COMMIT;

For simplicity, we started with full reloads rather than incremental (the larger collections took ~5 minutes each). This was our first milestone. Incremental was inevitable though…

When full reloads take too long

The simple collections really were a breeze, which was a pleasant surprise. The first problem was that we have one collection which is massive compared to the others, for which I killed the export after 3 hours of waiting during local testing (no way we want to extend the data pipeline by 3+ hours!). The fix for this was surprisingly straightforward as well though. We added an index on a lastModified field in the Mongo collect, then exported only the most recent data and performed an upsert like so

-- upsert_table.sql

BEGIN;

CREATE TEMP TABLE staging(LIKE :destination);

-- (1) copy the export into a temporary table
COPY staging
FROM :s3_file
WITH CREDENTIALS :s3_credentials
TIMEFORMAT 'auto'
JSON :jsonpath
GZIP;

-- (2) update the existing rows
UPDATE :destination SET :cols  -- cols is generated in Python
FROM :destination dest
  JOIN staging stg ON stg._id=dest._id;

-- (3) insert the new rows
INSERT INTO :destination
SELECT stg.* FROM staging stg
LEFT JOIN :destination dst ON stg._id = dst._id
WHERE dst._id IS NULL;

COMMIT;

This should be mostly self-explanatory, although the cols variable is generated by a Python file which looks at the export fields mentioned earlier on, then generates comma-separated elements like “{col}=stg.{col}”.

Only handling a day or so meant that keeping that data up to date only takes a minute or so. There was a less nice bootstrapping process, but being a one-off, we didn’t genericize it or anything and it’s not interesting enough to talk about here. This was our second milestone.

Arrays…

Remember how I mentioned I wrote a script for turning the Variety result into basically everything I needed? I gave up on that for arrays fairly quickly. It’s tough enough that the top Google result for “etl mongo to redshift” doesn’t even mention arrays, and the things that do don’t tell you how to solve the problem, they just want you to buy their product.

The first problem is that SQL has no concept of embedded arrays, instead you need a separate table with which you’d do a JOIN. The second problem is that Redshift doesn’t provide any nice way of ingesting arrays (without hard-coding indexes), so we have to do something ourselves to handle arrays of arbitrary size.

On the positive side, I was pleasantly surprised when I realized that mongoexport produces files with a single JSON record per line – it seems obvious in retrospect, but I’d never seen that before; I was afraid that the file would contain a JSON list and that I’d have to find a tool that parses JSON lazily in order to be confident we wouldn’t blow out our memory. This meant that writing a fairly simple Python script to generate “flat” (not-containing-array) JSON lines was easy to scale out, since you iterate through the lines in the file and generate a new file line by line. No memory bloat to worry about unless your individual records are huge.

For completeness, an example input record for the script is:

{"_id":{"$oid":"52c1ab5ce4b05c06a14c79d9"}, "userId": 1, "race":["white", "asian"]}

and that kind of record ends up being converted to:

{"__index": 0, "context:" {"_id": {"$oid": "52c1ab5ce4b05c06a14c79d9"}, "userId": 1}, "element": "white"}
{"__index": 1, "context:" {"_id": {"$oid": "52c1ab5ce4b05c06a14c79d9"}, "userId": 1}, "element": "asian"}

So what does this look like in a SQL DDL?

-- example_ddl.sql

CREATE TABLE retrofit_raw.dppScreening$race (
  __index BIGINT NOT NULL,
  source$_id VARCHAR(32),
  element$value VARCHAR(80)
);

To go through this line by line…

  1. retrofit_raw is the schema we’re ETL’ing things into. clientHistory is the source collection. plans is the array inside of that collection.
  2. __index is the index of the array.
  3. source$_id is the Mongo record _id (like a primary key) of the record containing the array, in the clientHistory collection (and we usually have a table for each collection, so that’s essentially a foreign key).
  4. source$companyId is still from that same source collection. We use the source$ prefix to indicate that it was in the containing collection, rather than the array. Those elements won’t vary by array index, so represent denormalization of the Mongo data.
  5. element$planType is an element of the array (at the __index). In this case there are two element$ columns, indicating that the array from which they came contained objects, otherwise there would just be one column (a number or string).

For completeness, the jsonpath file for this table would look like

// dppScreening$race_jsonpath.json
{
    "jsonpaths": [
        "$.__index",
        "$.context._id.$oid",
        "$.element"
    ]
}

(We didn’t get into this before, but the $s in the jsonpaths are to handle BSON, which has first-class “object ID” and date data types, represented as more free-form JSON.)

That concludes our third milestone, getting arrays ETL’d.

Other notable mentions

I won’t get into details here, but here are some notable roadbumps that came up:

  • Mongo keys are case sensitive and SQL columns are not, so my generated DDLs at points were invalid.
  • I tried to be clever in flattening arrays during the first go-around, which led to key collisions before I namespace’d into source and context. While ugly, it’s stable.
  • Python file handling has some platform-dependent behavior that was annoying (and I’m not even talking about newlines).
  • mongoexport doesn’t let you transform while you export (we looked into that for the flattening before writing the Python script, but it would have required an intermediate collection but we wanted our ETL to be read-only).
  • Redshift COPY doesn’t work for NaNs?!?!
  • I originally tried to export as CSV for a few reasons (the first coming to mind is file size being sent over the network), but it wasn’t worth it – JSON differentiates a non-present, null, empty or real string so much more easily than CSV.
  • mongoexport’s –fieldFile option (bafflingly) doesn’t work for JSON exports.

What would I have done differently with hindsight that wasn’t mentioned previously?

  • Written my DDL/export fields/jsonpaths generation in Scala (my team’s primary language).
    • Writing functional programming code in Python is hard
    • I was learning Python’s type hinting as I went along, especially the surprise limitations
  • Written the actual ETL code in Python instead of Bash, which I’ve learned is… not my favorite language

Future work

  • Clean up the schema generation scripts I wrote and have them code-reviewed (only the generated code was reviewed)
  • Some method of detecting new fields in the Mongo data, since currently those will be ignored silently
  • Upserts for arrays (unnecessary for now, based on observed timing)