Here at Ooyala, we process over two billion video events a day, all of which are archived in a big Cassandra cluster. The challenge is, how do we turn the mountain of raw events into small, actionable nuggets of truth? And how do we make the development and querying of these insights quick, scalable, and easy?
How Ooyala uses Spark and Cassandra
Hadoop is very scalable, but very slow, and the programming API is very lacking. Real-time streaming frameworks like Storm are a good fit for fixed processing from firehoses, but not so much for flexible queries from a datastore. Thus, we have turned to Spark, a very fast in-memory distributed computing framework, to help us with these insights.
Spark is used to run distributed jobs that read raw player events from Cassandra and generate materialized views, which are cached in memory. These materialized views can then be queried using subsequent jobs.
Spark is fast enough that these jobs running on materialized views can be used for interactive queries!
Secrets to using Spark as a query engine for in-memory datasets
First, we start with a Resilient Distributed Dataset (RDD) created from our Cassandra InputFormat:
val rdd = sc.newHadoopRDD(classOf[CassandraInputFormat], ....)
After applying transformations to turn it into a materialized view, all one has to do to cache it is
val dd2 = dataset.cache()dd2.count()
count() call is needed to force Spark to go through the entire dataset; otherwise Spark performs lazy evaluation and just reads the minimum necessary to complete the query.
While the code is pretty straightforward, we really need a service for processing the queries. We created a Spark job server with a REST API for spinning up a shared SparkContext, and running “jobs” inside of the context. One can think of a SparkContext as a “container” for running jobs, and for caching datasets.
The first job creates the materialized view from Cassandra data, caches it, and the second job processes aggregations and queries off of the materialized view. Servicing queries consists of a POST route that takes in a Typesafe Config file as the POST body, and the configuration is passed to the job as input. Parameters might include which metrics to query, how to sort or group results, and what is the date or time range to query.
The shared SparkContext has two major benefits:
- It allows the first job to cache the in-memory dataset for the second job to quickly query
- It significantly reduces start up time for the query job.
Starting up a SparkContext involves creating new Executor processes on each worker node, and depending on network latency and how busy the cluster is, this can easily take several seconds. The query jobs right now run in well under a second; if we keep running queries against our server the roundtrip can easily fall below 100ms.
There are other benefits of having the job server as well. By offering Spark as a Service, and giving end users an easy API for submitting, tracking, killing jobs and SparkContexts, it defines a very clear boundary between developers and our team that is managing the Spark and Cassandra infrastructure.
Even presented at Cassandra Summit 2013; the slides and video can be found below.
We are also planning to contribute the job server back to the Spark community, as we expect it to be a big productivity win for many Spark users. To see some of the other OSS projects Ooyala has released, please visit our github page.
– See more at: http://engineering.ooyala.com/blog/fast-spark-queries-memory-datasets#sthash.yI9tQFFp.dpuf