Ben Knear Software Engineer at AddThis
"Cassandra gives us a very high percentage of reliability. That’s very important to us and let's us provide the best quality tools for our AddThis users."
Ben Knear Software Engineer at AddThis

One of the goals of AddThis is making the open web more personal. That involves using different tools that we offer for publishers on their sites, that they can put out to drive engagements. Whether that’s helping users stay involved by finding new content, or getting connected with social services like Twitter, Facebook, and whichever other platform it may be. AddThis really just helps keep people more connected with that site.

I use it on my own personal blog; it looks great.  As an end user as well, it’s so seamless and easy for me to use. I’ve seen from my AddThis Dashboard how those tools have brought people to my site and kept them around.  I think it’s funny whenever I would search for AddThis I would see “sharing tools”.  One of our coolest capabilities is recommended content. Though my site may not know you, and I don’t have the infrastructure to do any of that decisioning, AddThis can provide that for my site based on analyzing the content that I have and knowing what you may be interested in.

A database to match

I’ve been with AddThis for over two years now and we’ve been using Cassandra long prior to that.

What we really needed was extremely strong reliability across multiple data centers and fast transactions to be able to retrieve the data really quickly. Additionally, the amount of data was going to be variable – we didn’t know if there was going to be small, or large pieces, but we did know that there were going to be very discrete pieces. There weren’t a lot of foreign key type things where we needed to be concerned.

That is why, and where, Cassandra fits in so perfectly. Personally, I’d done a very small amount of work with it before starting onto it. It all made it really easy to spin up our application to run across two data centers, different boxes, different localities. Cassandra really fit into the mold we needed and has worked out great every since. 

For our team’s particular use case we are storing very specific pieces of data that deal with some modeling type things, and some analytics. There are other pieces within the company that use it in different ways, too, to help support the many internal and external AddThis applications.

We use Dropwizard, which helps create a self-contained deployment package. There is a lot of buzz right now about containers. In a way I see Dropwizard doing that; it is self-contained and reaches out where it needs to based on its configuration. We’re able to have our configuration be changed on the fly depending upon where we are deploying our instances and can do rolling deployments.

For example, when one instance has an issue, one node in a cluster we can take that one down and restart it with something different, switch traffic between the nodes, which helps keep us having 100% availability.

Coming from a relational mindset

I came from several years of experience using relational databases. One important thing to plan for when switching to Big Data solutions from relational databases is the loss of relational modeling. You have to be very intentional and methodical about how you’re architecting the data model so you create indices exactly where you need them.

I actually had experience using HBase and Accumulo, Scanner based tools. The first library I tried that was Astyanax by Netflix. I quickly ran into little issues that even though I built a whole application using it, I had problems with compound keys and adjusting the table definition once the table was built.

That’s where I moved over to using DataStax libraries. It was really easy to get started. Building a little single-load instance of Cassandra running and placing a whole application on top of it.

It was interesting to learn more about the data modeling within Cassandra, which is quite different from doing MySQL. Again, you need to know how you want to pull this data, how you want to pull things out throughout the application.

In using the DataStax libraries, the documentation on there is really good. Every time I would search for CQL Shell material, I would always get taken to DataStax, which is great. The documentation is always clean and clear.

Why they Love Cassandra

Whenever we were thinking about what type of data store we wanted, the reliability and the fast throughput that we can get from Cassandra was big. Knowing the amount of requests we get in a day; even when we get 10,000 requests a second. They are coming through and Cassandra gives us a very high percentage of reliability. That’s very important to us and let’s us provide the best quality tools for our AddThis users.

Internally, because we have the infrastructure we just piggyback on that, which is always nice. Where we might have issues with MySQL keeping things in sync between multiple clusters using a slave and a master, with Cassandra we can have multiple endpoints using the same store of data and across multiple clusters. That’s been our big proponent for using Cassandra.

Tuning Hadoop & Cassandra : Beware of vNodes, Splits and Pages

April 14, 2015

By 

Brian O’Neill, Chief Technology Officer at Health Market Science
Brian is Chief Technology Officer at Health Market Science (HMS) where he heads development of their data management and analytics platform, powered by Storm and Cassandra. Brian won InfoWorld’s Technology Leadership award in 2013 and authored, Storm: Blueprints for Realtime Computation. He has a number of patents and holds a B.S. in C.S. from Brown University.

When running Hadoop jobs against Cassandra, you will want to be careful about a few parameters.

Specifically, pay special attention to vNodes, Splits and Page Sizes.

vNodes were introduced in Cassandra 1.2.  vNodes allow a host to have multiple portions of the token range.  This allows for more evenly distributed data, which means nodes can share the burden of a node rebuild (and it doesn’t fall all on one node).  Instead the rebuild is distributed across a number of nodes.  (way cool feature)

BUT, vNodes made Hadoop jobs a little trickier…

The first time I went to run a Hadoop job using CQL, I was running a local Hadoop cluster (only one container) against a Cassandra with lots of vNodes (~250).  I was running a simple job (probably a flavor of word count) over a few thousand records.  I expected the job to complete in seconds.  To my dismay, it was taking *forever*.

When you run a Hadoop job, the minimum number of splits is the number of vNodes.

In Hadoop, each split is consumed by a single container, and a container runs against only one split at a time.  Effectively, a Hadoop container is a JVM, which is spun-up to process a specific split of data.

In any Hadoop cluster, there is a maximum number of available containers.  In my case, I had one available container, which meant the splits had to be processed sequentially with spin-up and tear-down overhead for each split.  boooooo…

So, before you do anything check to see how many vNodes you are running.  This is controlled by you Cassandra yaml file.  There is a num_tokens parameter.  Below is an excerpt from cassandra.yaml.

To see how many vNodes you have on an existing cluster, you can use nodetool:

You will notice that the number of token ranges, equals the number you set in the configuration file(s):

For more information on vNodes and configuration, please see Patrick’s most excellent deck:
http://www.slideshare.net/patrickmcfadin/cassandra-virtual-node-talk

SIDENOTE:
Because the minimum number of splits is the number of vNodes, you may want to run a separate Cassandra ring, with a smaller number of vNodes, that you can use for analytics.

OK, so what does this mean and how do you fix it?

Well, if you have a small number of containers and a high number of vNodes, split overhead may become a problem (as it was in my local Hadoop cluster).  But perhaps more importantly, if you run against a large number of records, and use the default split size, you will end up with an insane number of splits.  For example,  we recently ran against a table that had 5 billion records.  The default split size is 64K. [1]  Doing the math:

That is an awful lot of splits, and you are going to take an overhead spin-up/tear-down penalty for each of those splits.  Even assuming only 5 seconds of overhead for each split, that is a ton of time:

Optimally, the number of splits would equal the number of containers available. That typically won’t be possible, but we can get as close as possible by setting the split size on the Hadoop job with the following line:

Let’s redo the math with this value:

If we are running on a Hadoop cluster with 250 available containers, we’ll finish in two passes on the splits, each container processing two splits.

With the split sizes worked out, you will want to mind the page size parameter.  The page size parameter tells the CQL driver how many rows you want back at a time.  It is the equivalent of the LIMIT clause on the CQL statement.  For this one, you will want to pull back as many as possible without blowing up memory. (which I’ve done ;)   The default value of page size is 1000. To configure the page size, use the following line:

Hopefully this saves people some time. Happy tuning. (shout out to gnana ;)

Holy momentum Batman! Spark and Cassandra (circa 2015) w/ Datastax Connector and Java

April 10, 2015

By 

Brian O’Neill, Chief Technology Officer at Health Market Science
Brian is Chief Technology Officer at Health Market Science (HMS) where he heads development of their data management and analytics platform, powered by Storm and Cassandra. Brian won InfoWorld’s Technology Leadership award in 2013 and authored, Storm: Blueprints for Realtime Computation. He has a number of patents and holds a B.S. in C.S. from Brown University.

Over a year ago, I did a post on Spark and Cassandra. At the time, Calliope was your best best. Since then, Spark has exploded in popularity.

Check out this Google Trends chart. That’s quite a hockey stick for Spark.
Also notice their github project, which has almost 500 contributors and 3000 forks!!

Datastax is riding that curve. And just in the time since my last post, Datastax developed and released their own spark-cassandra connector.

I am putting together some materials for Philly Tech Week, where I’ll be presenting on Cassandra development, so I thought I would give it a spin to see how things have evolved. Here is a follow-along preview of the Spark piece of what I’ll be presenting.

First, get Spark running:

Download Spark 1.2.1 (this is the version that works with the latest Datastax connector)

Next, unpack it and build it. (mvn install)

And that is the first thing I noticed…

Look at all these new cool toys? GraphX? SQL? Kafka? In my last post, I was using Spark 0.8.1. I took a look at the 0.8 branch on github and sure enough, all of this stuff was built just in the last year! It is crazy what momentum can do.

After you’ve built spark, go into the conf directory and copy the template environment file.

Then, edit that file and add a line to configure the master IP/bind interface:

(If you don’t set the IP, the master may bind to the wrong interface, and your application won’t be able to connect, which is what happened to me initially)

Next, launch the master: (It gives you the path to the logs, which I recommend tailing)

In the logs, you should see:

Go hit that WebUI at http://localhost:8080.

Second, get yourself some workers:

Spark has its own concept of workers. To start one, run the following command:

After a few seconds, you should see the following:

You can refresh the MasterWebUI and you should see the worker.

Third, sling some code:

This go around, I wanted to use Java instead of Scala. (Sorry, but I’m still not on the Scala bandwagon, it feels like Java 8 is giving me what I needed with respect to functions and lambdas)
I found this Datastax post:
Which lead me to this code:

Kudos to Jacek, but that gist is directed at an old version of Spark. It also rolled everything into a single class, (which means it doesn’t help you in a real-world situation where you have lots of classes and dependencies) In the end, I decided to update my quick start project so everyone can get up and running quickly.

Go clone this:
https://github.com/boneill42/spark-on-cassandra-quickstart

Build with maven:

Then, have a look at the run.sh. This actually submits the job to the Spark cluster (single node in our case). The contents of that script are as follows:

The –class parameter tells Spark which class to execute. The –master parameter is the url that you see at the top of MasterWebUI, and tells spark to which master it should submit the job. The jar file is the result of the build, which is a fat jar that includes the job (courtesy of the maven assembly plugin). The last two parameters are the args for the program. Spark passes those into the JavaDemo class. After you run this, you should see the job process…

I’ll go into the details of the example in my next post.
Or you can just come to my presentation at Philly Tech Week. =)

High-Performance Computing Clusters (HPCC) and Cassandra on OS X

April 9, 2015

By 

Brian O’Neill, Chief Technology Officer at Health Market Science
Brian is Chief Technology Officer at Health Market Science (HMS) where he heads development of their data management and analytics platform, powered by Storm and Cassandra.  Brian won InfoWorld’s Technology Leadership award in 2013 and authored, Storm: Blueprints for Realtime Computation.  He has a number of patents and holds a B.S. in C.S. from Brown University.

Our new parent company, LexisNexis, has one of the world’s largest public records database:

“…our comprehensive collection of more than 46 billion records from more than 10,000 diverse sources—including public, private, regulated, and derived data. You get comprehensive information on approximately 269 million individuals and 277 million unique businesses.”
http://www.lexisnexis.com/en-us/products/public-records.page

And they’ve been managing, analyzing and searching this database for decades.  Over that time period, they’ve built up quite an assortment of “Big Data” technologies.  Collectively, LexisNexis refers to those technologies as their High-Performance Computing Cluster (HPCC) platform.
http://hpccsystems.com/Why-HPCC/How-it-works

HPCC is entirely open source:
https://github.com/hpcc-systems/HPCC-Platform

Naturally, we are working through the marriage of HPCC with our real-time data management and analytics stack.  The potential is really exciting.  Specifically, HPCC has sophisticated machine learning and statistics libraries, and a query engine (Roxie) capable of serving up those statistics.
http://hpccsystems.com/ml

Low and behold, HPCC can use Cassandra as a backend storage mechanism! (FTW!)

The HPCC platform isn’t technically supported on a Mac, but here is what I did to get it running:

HPCC Install

Clone the github repository, and its submodules (git submodule update –init –recursive)

Pull my patches (https://github.com/hpcc-systems/HPCC-Platform/pull/7166)

Install the dependencies using brew:

Make a build directory, and run cmake from there:

 

Then, compile and install with (sudo make install)

After that, you’ll need to muck with the permissions a bit:

Now, ordinarily you would run hpcc-init to get the system configured, but that script fails on OS X, so I used linux to generate config files that work and posted those to a repository here:

https://github.com/boneill42/hpcc_on_mac

Clone this repository and replace /var/lib/HPCCSystems with the content of var_lib_hpccsystems.zip

Then, from the directory containing the xml files in this repository, you can run:

daserver (Runs the Dali server, which is the persistence mechanism for HPCC)

esp (Runs the ESP server, which is the web services and UI layer for HPCC)

eclccserver (Runs the ECL compile server, which takes the ECL and compiles it down to C and then a dynmic library)

roxie (Runs the Roxie server, which is capable of responding to queries)

Kickoff each one of those, then you should be ready to run some ECL. Then, go to http://localhost:8010 in a browser.  You are ready to run some ECL!

 

Running ECL

Like Pig with Hadoop, HPCC runs a DSL called ECL.  More information on ECL can be found here:

http://hpccsystems.com/download/docs/learning-ecl

As a simple smoke test, go into your HPCC-Platform repository, and go under: ./testing/regress/ecl.

Then, run the following:

You should see the following:

        <dataset name=”Result 1″>
        <row><result_1>Hello world</result_1></row>
        </dataset>

Cassandra Plugin

With HPCC up and running, we are ready to have some fun with Cassandra.  HPCC has plugins.  Those plugins reside in /opt/HPCC/plugins.  For me, I had to copy those libraries into /opt/HPCCSystems/lib to get HPCC to recognize them.

Go back to the testing/regress/ecl directory and have a look at cassandra-simple.ecl. A snippet is shown below:

————————-

——————–

In this example, we define childrec as a RECORD with a set of fields. We then create a DATASET of type childrec. Then we define a method that takes a dataset of type childrec and runs the Cassandra insert command for each of the records in the dataset.

Startup a Cassandra locally.  (download Cassandra, unzip it, then run bin/cassandra -f (to keep it in foreground))

Once Cassandra is up, simply run the ECL like you did the hello program.

You can then go over to cqlsh and validate that all the data made it back into Cassandra:
OK — that should give a little taste of ECL and HPCC.    It is a powerful platform.
As always, let me know if you run into any trouble.

Kindling Part 2: An Introduction to Spark with Cassandra

April 2, 2015

By 

Erich Ess, Chief Technology Officer at SimpleRelevance
Erich Ess is the son of a Swiss sailor and an American librarian. He studied Computer Science and Pure Mathematics at Purdue University, where he wrote papers on scientific visualization. He then went to work for Northrop Grumman, Verizon, and GreatCall as a software engineer. At Verizon and GreatCall he spent is time building highly scalable, reliable enterprise systems. He is currently the CTO of SimpleRelevance’s leading team which is building advanced machine learning algorithms.

Using Spark with Cassandra to ETL Some Raw Data

Welcome to the second half of my blog post about using Spark with Cassandra.  The previous post focused on getting Spark setup, the basics on how to program with Spark, then a small demonstration of Spark’s in memory processing, and finally how to interact with Cassandra from Spark.  This post will use those basics to accomplish a simple but common task:  Extract, Transform, and Load.  I will use Spark to parse and load the MovieLens dataset into Cassandra and then perform some simple analytics.

The data in this example comes from the MovieLens datasets (http://grouplens.org/datasets/movielens/); specifically, the “MovieLens 1M” dataset.  This dataset contains 1 million ratings for 4 thousand movies submitted by around 6 thousand users.

I will show the Cassandra schema I am using to store the raw data from the text files.  However, I will skip over the code that I used to take the data from the raw text files and load it into Cassandra, since that step isn’t relevant to using Spark.

The goal with this design is to group a bunch of raw data into a single place, in order to demonstrate different transformations on the same data source in Spark.

This data will be transformed into the following schemas:

For Users:

For Movies:

For Reviews:

A link to a Gist with the CQL commands: https://gist.github.com/erichgess/a02aefddd6231c91babb#file-cassandra-schemas

A link to the small Python script I used to load the raw source files into the “raw_files” table: https://gist.github.com/erichgess/a02aefddd6231c91babb#file-data_loader-py

 

In the Spark Shell:
Processing the Raw Data:

First we need to setup an RDD for the raw data table.  One of the nice things about Spark is that we can automatically map the data being read from Cassandra to an easy to work with data type.  Here I am going to use a case class data type:

This gives me an RDD of type RawFileData which I can then iterate over and process.

I’ll start by filtering to just the raw data from the “users.dat” file:

One thing to note about Spark is that the transformations are lazily evaluated, which means that nothing is done until an action is performed on an RDD.  And when that action is performed, only the bare minimum amount of work required for the result is done.

For example, I can run the first action to get the first entry in the raw_users RDD:

This will pull just that first line from Cassandra.  I find that first is a good way to validate that the correct data is being used in the RDD.

Now I want to transform that raw user data into something that I can easily use in code. A case class for the user will be perfect.  Simply, use a chain of transformations to take the raw user data and transform it into an RDD of data type User.

Now I will save that back into Cassandra.  Note that no action is performed on the raw user data until I save it to Cassandra:

Repeating this for the Movies and Ratings:

Here’s a Gist with the above code: https://gist.github.com/erichgess/a02aefddd6231c91babb#file-etl-sc

In Memory Processing

Something that may have been apparent in the above example is that all the ETL processes came from the same source of data in Cassandra: the table raw_files.  Doing multiple independent manipulations on the same source of data is a great opportunity to demonstrate Spark’s in-memory features.

To use in-memory processing simply tell Spark to cache a specific RDD.  Then, when Spark pulls data from Cassandra it will keep the data in memory across the cluster.  Each future computation on that RDD will be done from the in-memory data set.  If you do not use this feature, then Spark will pull the data from Cassandra for each computation.

To cache the raw RDD call the cache function when creating the RDD:

To demonstrate the performance improvement provided by caching I will run the ETL for users, movies, and ratings comparing the performance for when the raw data has been cached before ETL and when it has not been cached before ETL.

Data Without Caching Time With Caching Time
Users 29.78 seconds 0.267 seconds
Movies 27.65 seconds .43 seconds
Ratings 58 seconds 17.9 seconds

Just a note:  these tests were run using a 15” MacBook Pro with 16GB of RAM, a single Cassandra server running locally on a Vagrant VM and a single Spark Shell running locally. 

Repeat for Movies and Reviews

 

Some Simple Analytics

Now that we have the data, let’s do some simple analytics.

  1. Find the top 10 most prolific reviewers:
    1. val reviews = sc.cassandraTable[Review](“spark_demo”, “reviews”)
    2. val top_10 = reviews.groupBy( x => x.user_id ).map( x => (x._2.count, x._1)).top(10)
    3. top will take the the first 10 elements sorted in descending order by the implicit ordering of tuples in Scala, which compares first the first element of the tuple and then the second.  This is why the mapping put the number of ratings first and the user id second.
  2. Find the movie with the most reviews:
    1. val top_movie = reviews.map(r => (r.movie_id,1)).reduceByKey((k,v) => v+1).map(p => (p._2, p._1)).top(1)._2

As above, the movie id and the number of reviews are swapped to account for the way tuples are ordered.

  1. Find the movie with the highest average rating:
    1. val top_average = ratings.map( r => (r.MovieId, (r.Rating, 1))).reduceByKey( (n,c) => (n._1 + c._1, n._2 + 1)).map( p => (p._2._1 / p._2._2, p._1)).top(1)._2

The reason this uses reduceByKey rather than groupBy is to avoid shuffling data.  groupBy causes data to be shuffled through the cluster which means the data in the RDD will have to be redistributed across all the Spark worker nodes.  reduceByKey avoids this, but is less flexible than groupBy.

1 2 3 150