December 20th, 2013

By 

Migrating from MongoDB to Cassandra blog posting was created by Michael Rose. To view more postings by Michael and the FullContact team, check out the FullContact blog here.

The Story

Long long ago (2011) in a Littleton founder’s basement not so far away, the decision was made to use MongoDB as the persistence engine for our Person API product. The idea of the Person API is pretty simple:

Given some input piece of data {email, phone, Twitter, Facebook}, find other data related to that query and produce a merged document of that data. This is essentially a curated, recursive federated search, where multiple databases are consulted, reconsulted and their results aggregated and stored into MongoDB. These databases consist of various internal services (including previously searched data) and APIs from providers across the web.

When we first started, we needed to move fast according to what our market wanted. As our customers queried us and we added data, our database continued to grow in size and operations per second.

We were a young startup and made a few crucial mistakes. MongoDB was not a mistake. It let us iterate rapidly and scaled reasonably well. Our mistake was when we decided to put off fixing MongoDB’s deployment, instead vertically scaling to maintain the Person API product while working on other technologies such as the FullContact Address Book and our ‘secret sauce’ behind the Person API and our deduplication technology (look for posts on this later this month).

Eventually MongoDB started to have issues with lock time percentage, even on the generous hardware it already had. MongoDB has what’s known as a shared-exclusive or readers-writer lock over each database. While a write operation is in progress, a read is unable to proceed until the write operation yields the lock. Even then, a queued write is given precedence over a read and can queue reads, leading to a latency spike. As you can imagine, the Person API writes a lot of data (sometimes over 200K for a single document), averaging a 50/50 or 60/40 distribution of read/writes and several million daily writes. We worked hard to eliminate multiple updates by no longer using partial updates and staging documents in Redis, but even this wasn’t enough and our lock percentage continued to climb into the 40-50%’s leaving us with unhappy customers.

We pushed Mongo onto the best database hardware AWS could provide: hi1.4xlarge’s. With these we had 2TB of SSD. This dropped our lock percentage to a small fraction of the 40-50% we’d been pushing, not to mention much more storage space.

Oops...

And it was for a long time. We refactored products and launched others. One day we realized our ‘data burn’ was on the order of 20GB a day and we had less than 200GB remaining on our SSDs. Even MongoDB’s ‘repairDatabase’ to compact the data was unable to comfortably free enough data for more than a few days.

The Dirty Solution

The most painful part of this story to admit is the solution we came up with to limp along. Maybe you can guess from the code snippit below:

Code Snippit

By the time we had cycles to spend, it was too late to shard effectively. It would have been terribly painful and unacceptably slowed our cluster for days. We could have done it in parallel, but if we were going to make a painful transition it was better to align our database technology to our eventual goals of extreme fault-tolerance, horizontal scalability, and operational simplicity. MongoDB’s HBase-esque model of config servers, shard routing daemons, and the shards themselves is needlessly complicated. Webscale, right?

To buy us time, we ‘sharded’ our MongoDB cluster. At the application layer. We had two MongoDB clusters of hi1.4xlarges, sent all new writes to the new cluster, and read from both. If a document was present in the new cluster we’d return that, otherwise the old cluster. Meanwhile, Sherlock (internal codename for our search system) was busily refreshing older data and writing it into the new cluster as well. This in mind, we projected this to only last 2-3 months.

Cassandra

We’re believers in the Netflix way of thinking. Build with resilience in the DNA of an application, with the public cloud in mind. We’ve been long Apache HBase users, but found it to be tough to maintain 100% availability on Amazon EC2 for a variety of reasons. Cassandra attracted us with a veritable siren song of operational simplicity. No HDFS or Zookeeper dependencies, multiple servers serve the same data without requiring regions to failover (a process of sometimes multiple minutes or more), and a similar data model to HBase.

It’s important to note that Cassandra was a year-long evaluation for us and aligned with our availability and fault-tolerance goals. Additionally, strong consistency isn’t something we needed for our use case. YMMV. That’s not to say things would be simple or expected, so we moved a less critical service over.

Trial Run – HBase -> Cassandra

As experience is everything when it comes to stable production deployments, we first moved a different part of our system, an outbound HTTP proxy cache if you will, to Cassandra. The existing system ran on Apache HBase, quite well for the most part. Periodically, this system is MapReduce’d into a coherent set of data for ingestion. This was a simple refactor and taught us some of the patterns we’d need to undertake our MongoDB conversion.

The general pattern of any online conversion looks a little like this:

Untitled 5

We lean toward reading HBase HFiles directly when we can and avoiding HBase. It’s unnecessary overhead when the entire table is processed anyways. Our plan was to go from HFiles -> SSTables -> Cassandra Cluster.

One of the issues we first ran into was a deficiency of Cassandra’s Hadoop MapReduce tooling. By default, it’s not compiled for Cloudera CDH4. We ended up vendoring half of the classes and applying patches from the Cassandra JIRA before giving up on their BulkOutputFormat (BOF) and ColumnFamilyOutputFormat (CFOF). It was a nice concept to write pre-sorted SSTables and load them directly into Cassandra, but it didn’t work out due to a combination of Hadoop bugs and operational issues (Context#progress no-op, SSTableLoader just randomly breaking, having to manually delete the SSTables after load).

The other option was to use the ColumnFamilyOutputFormat, but we had ended up deciding to use CQL3 tables to make things simple from a tooling perspective (cqlsh and co.) and future-proofing perspective. CQL3 is really some pattern-sauce on top of traditional ‘Thrift’ tables using Composite Types. The Hadoop utilities are rooted in these more complex Thrift APIs and doesn’t easily support non-COMPACT CQL3 tables. I’d already written some basic utilities around being able to utilize CQL3 tables from the BulkOutputFormat, MutationUtils.java (these are pretty rough, you’ll get the idea if you need it), and found the CFOF to be just as awkward but without the promise of SSTables bulk-loaded. We didn’t try the CQL3 tools in recent releases.

Eventually we decided to just use Astyanax (Netflix’s Cassandra Client) directly from our MapReduce job reducers, which worked splendidly for writes. The first time we did this, we wrote as fast as we could to the Cassandra cluster. Topping out at just under 9300 writes/s on a 6-node cluster, we were appropriately impressed. Unfortunately, using all the CPU for the write path left no time for leveled compaction and the cluster spent several days trying to compact 128mb SSTables. Thanks to AWS, we spun up another cluster and repeated the job, this time with the Size-Tiered Compaction strategy (less IO, good for spinning disks) and a Guava RateLimiter to allow Cassandra to breathe and compact during the load process. A handy side-effect of ratelimiting, we were able to accurately project when our jobs would be finished.

The rest of this trial wasn’t without issues, some of which we didn’t discover until we’d moved ahead with moving MongoDB to Cassandra. In general, a simple move.

MongoDB Conversion

We’re big users of Hadoop MapReduce and tend to lean on it whenever we need to make large scale migrations, especially ones with lots of transformation. That fact along with our existing conversion project from before, we used 10gen’s mongo-hadoop project which has input and output formats for Hadoop. We immediately realized that the InputFormat which connected to a MongoDB cluster was ill-suited to our usage. We had 3TB of partially-overlapping data across 2 clusters. After calculating input splits for a few hours, it began pulling documents at an uncomfortably slow pace. It was slow enough, in fact, that we developed an alternative plan.

The plan was to export large BSON files to S3 and MapReduce over them the same way we’d converted our cache before. Files are MapReduce’s bread and butter, this would work much better than opening a lot of cursors against a backup-restored MongoDB server. The Mongo-Hadoop project has a BSONInputFormat for reading in BSON dump files. A match made in heaven? Perhaps.

MongoDB is capable of exporting a single BSON file comprising your entire collection with the mongodump utility. The mongodump utility doesn’t need MongoDB to be running, making it ideal to extract data from backups. As we’d kept backups with snapshotted EBS volumes, we were able to boot a new machine with backup-bearing volumes attached. We pointed mongodump at the data directory on the EBS volume and exported the data as BSON to another disk, which took about a day for each cluster. After this process, we had to run mongo-hadoop’s bson_splitter.py tool to generate split points the BSONInputFormat can use, otherwise the entire file must be read in by a single mapper. It goes without saying, a single mapper is very slow. Then you can push both files to S3 and kick off your Hadoop job.

It’s important to use a newer s3cmd with multipart support. S3 uploads tend to fail for single large objects with single puts, or too many puts. 5mb (the default with s3cmd-1.1.0) was too small — we found an acceptable size at 1GB which partitioned the upload into ~1500 individual uploads resulting in a single large ~1.5TB S3 object. The other cluster was about half the size. Phew. The only wrinkle we had with this process was getting SBT (mongo-hadoop’s build tool) to upload our CDH4-compiled artifacts to our Artifactory for inclusion into our conversion project.

The mapper in this case is quite simple:

01 package com.fullcontact.hadoop.mongo;
02  
03 import com.mongodb.hadoop.io.BSONWritable;
04 import org.apache.hadoop.io.Text;
05 import org.apache.hadoop.mapreduce.Mapper;
06 import org.bson.BSONObject;
07  
08 import java.io.IOException;
09  
10 public class MongoConversionMapper extends 

Blog