Cassandra: Daughter of Dynamo and BigTable

May 18, 2015

By 

Austin Ouyang Program Director at Insight Data Engineering
I am an electrical engineer by training and have bounced from hardware/software integration to biomedical research and now into data engineering. I currently help academics and others in industry make their transition into the data engineering field. Improving my ability to help others understand complex concepts has always been a passion of mine, and I strive to continually expand my knowledge in the data engineering field.

David Drummond, Program Director at Insight Data Engineering
I am a passionate learner of anything and everything! As a physicist, I have pushed myself to understand how the natural world works, using that knowledge to design new technologies in quantum computing. As a data engineer, I love to learn new technologies and tools for taming large data sets, enabling me to build awesome products with the ever-increasing wealth of information available. I also have a passion for helping others learn and communicating difficult concepts in a clear, concise way.

The Technology Highlight series covers the leading open source tools learned in our seven week Data Engineering Fellowship. In this post, Austin Ouyang – an Insight alum and Program Director – discusses how he and recent Fellows used the NoSQL database Apache Cassandra.

This blog post is also being published here on the Data Insight Engineering blog.

Cassandra has become one of the most popular NoSQL databases used today by companies such as Netflix, Instagram, and many more. Originally designed at Facebook, Cassandra came from Amazon’s highly available Dynamo and Google’sBigTable data model. Cassandra is a highly available and decentralized database with tunable read and write consistency. All nodes communicate with each other through a Gossip protocol similar to Dynamo and Riak, exchanging information about themselves and other nodes they have gossiped with. Cassandra can also deploy multiple data centers acting as backups or to reduce latency across regions.

Getting Started with CassandraAlmost all of the incoming Fellows had little experience with NoSQL databases, but were able to quickly pick up Cassandra. They got started by spinning up clusters on Amazon Web Services (AWS) using the preconfigured DataStax Amazon Machine Image, which allowed Fellows to quickly begin using Cassandra “out of the box”. Using a SQL-like language known as the Cassandra Query Language (CQL), we were able to learn and test queries immediately using the CQL shell.

We also got a great introduction to NoSQL and Cassandra internals from Insight alumnus Mike Grinolds and Patricio Echague, Director of Engineering at RelateIQ and a committer to Hector – the Java client for Cassandra. He also discussed data modeling for Cassandra, which requires a different approach than relational databases like MySQL or PostgreSQL.

Patricio Echague gave an excellent introduction to NoSQL and Cassandra

Unlike relational models where the schema and tables are designed to represent objects and their relationships, data in NoSQL should be modeled around the specific questions that the end-user will ask. This often means using multiple tables with the same data, but the redundancy of this denormalized approach is often worth the better performance on massive volumes of data. While Cassandra doesn’t support ad-hoc data manipulation with functions like JOIN, SUM, and MAX, it can handle a higher throughput of data if you design a data model specifically for the desired queries.

Cassandra’s data model, derived from Google’s BigTable and similar to HBase, is a partitioned row datastore organized into tables (formerly known as column families), where the rows are key-value pairs. Each table contains a set of primary keys which uniquely identify each row of data. The primary key can be composed of multiple columns resulting in a compound primary key. The first part of the primary key is always the partition key, which assigns a consistent hash that determines where each row of data will reside in the cluster of nodes. The partition key can also be composed of multiple columns (known as a composite or compound partition key) which can mitigate scenarios where specific nodes end up with significantly more data than others in the cluster. Columns in the primary key following the partition key are known as clustering columns, which are sorted keys that map to values. Since each row can contain up to 2 billion columns and the values can be sorted by clustering column types like timestamp, Cassandra is great for storing time series data.

Cassandra for Messaging AnalyticsWhile this approach to designing tables was initially difficult (especially when coming from a background with relational databases), the best way to understand the data model is to explore an example use case. The first step when creating a table is to determine the type of queries a user would make. For example, let’s suppose a management team from a messaging app would like to request the total number of messages sent on a specific date across all US counties. They would also like to see how the number of messages sent each day has progressed this year for a specific county.

Example of a cluster with a composite partition key

The above diagram outlines a keyspace (analogous to a schema) design and how data resides on each node for the messaging app example. All tables must reside within a specific keyspace (analogous to a SQL schema) and defines if the data will reside in multiple datacenters, as well as the replication factor for the tables. Our table will consist of columns labeled, state, county, date and count. In this example both the state and county have been assigned to be the partition key. Reasons for this design may be that a majority of messages come from mainly California and New York. If only the state were chosen as the partition key, the nodes in the cluster responsible for these states would have a much higher workload than others. This diagram shows how each partition key is hashed to one of the five nodes in this Cassandra cluster (the replication factor is set to one for simplicity).

Example of a compound partition key with sorted clustering columns

Data for each partition residing on the same node will be clustered by the date column as shown in the second diagram. The dates will also be sorted which makes range seeks extremely efficient. With this design both queries from the example can be easily queried: the total number of messages sent on a specific date across all US counties is given by

and the number of messages sent each day this year for a specific county

Reads and Writes Anywhere with Multiple DatacentersSince Cassandra uses a decentralized approach, it works well for use cases that require multiple datacenters in different locations. This was one of most interesting features for Guang, one of our recent Fellows who is now a data engineer at a stealth startup. He built a data pipeline to analyze game data from the real-time strategy game Starcraft II, which is played worldwide. If his data needed to be quickly accessible to several locations, Cassandra would easily scale to multiple datacenters with flexible configurations. For example, a piece of data can be stored in two replicas on the east coast and three replicas on the west coast using:

More so, Cassandra has built-in support for AWS datacenters (which we use during the Insight program) that optimizes the Gossip protocol for the best read and write performance. This ability to easily and flexibly sync data geographically is an important use case for national and international companies like Netflix and Spotify.

Mutli-datacenter clusters for geograpically separate data
Tunable Consistency for Each QueryIn the language of Brewer’s CAP theorem – which states that partition-tolerant distributed systems can be either consistent or available, but not both – Cassandra is considered available and partition-tolerant (AP). Since there is no ‘master’ node to coordinate reads and writes, data is always available on one of the nodes, but it’s possible it could be out of date for about a second before the other nodes gossip about new updates. However, one of the main differences from other NoSQL databases is Cassandra’s ability to tune the level of consistency for each query.

For example, another recent Fellow, Silvia – who is now a data engineer at Silicon Valley Data Science – built a platform for a fantasy football service. As a consumer facing product that needs to avoid downtime, she chose to use Cassandra and can easily check a player’s point with a simple query:

At the same time, she wanted to implement a feature that allowed users to make real-time player trades during games. For this feature, it’s important that different users see consistent data – it would be unfair if one user had more recent data than another. Fortunately, she can tune the consistency for this use case by simply adding a USING CONSISTENCY clause to the query:

which reads the data from multiple nodes and only returns the result if a quorum (i.e. majority) of the responsible nodes responds with consistent results. The consistency for both reads and writes can be controlled for each query: from accessing a single node, to requiring all the nodes to respond, and many options in between.

Cassandra with Spark for Ad-hoc AnalysisCassandra currently has limited exploratory capabilities such as performing aggregates on grouped data or joining tables on selected columns – the ideal use case is to know the desired queries prior to writing data. When this post-write processing was desired, many Fellows used Apache Spark’s in-memory processing framework to easily extract and transform data with the spark-cassandra connectordeveloped by Datastax. The spark-cassandra connector and its dependencies need to be built into a JAR file with sbt, which will include it when starting the spark-shell REPL. The following example shows how we can read a Cassandra table into Spark, compute averages for each specified group using Spark’s newly implemented DataFrames, and write the new table back into Cassandra.

Suppose we begin with a Cassandra table, by_county_month, which represents the total number of messages sent in each US county for each month from the previous example.

Messaging data by county and month
 

Let’s say we would like to calculate the average number of messages sent per month for each county. First we will need to create an output Cassandra table for the computed average number of messages.

We can then start the spark-shell REPL from the terminal and include the spark-cassandra connector jar file.

The following shows how to pull a table from Cassandra into Spark as a Resilient Distributed Dataset (RDD) composed of case classes, convert the RDD into a DataFrame for aggregate computations, and write the results back into a Cassandra table.

Finally, we can view our results in the CQL shell with:

Messaging by county averaged over months with TTL

Writing this aggregate operation is extremely straightforward with Spark’s new DataFrame feature which is built on Spark SQL. It also includes Cassandra’s native support of time to live (TTL) that allows columns of data to expire automatically. This feature was useful for another Fellow, Shafi – now at Yelp – that used TTL to keep a rolling count of the top trending stocks on Twitter for the last ten minutes. A new feature in the spark-cassandra connector allows users to add a TTL option for the data written into a Cassandra table.

Looking forwardCassandra is one of the many NoSQL databases used today and has helped Fellows better understand some of the key differences with relational databases. Fellows had an enjoyable experience learning to choose appropriate NoSQL databases based on their queries and understand the tradeoffs of other NoSQL options. Fellows will keep pushing the envelope and continue exploring various technologies for different use cases, but Cassandra’s will likely remain a popular choice as a highly available, decentralized database.

Find out more about the Insight Data Engineering Fellows Program  and apply for our next sessions in Silicon Valley and New York by June 29th.

Synchronizing Clocks In a Cassandra Cluster Pt. 1 – The Problem

May 14, 2015

By 

Viliam Holub, Chief Technology Officer at Logentries
I am a researcher and a teacher at the University College Dublin in the School of Computer Science and Informatics, formerly from Distributed Systems Research Group, Charles University. My research interests cover formal verification and parallel and distributed model checking, however I am interest in many other fields of software engineering and computer science. I studied Electronic Computer Systems within 1994-1998 where I got an electrotechnical background. Then I graduated in Software Engineering from Faculty of Mathematics and Physics, Charles University and finished my postgraduate study in the Department of Software Engineering. I like programming and I programmed a lot so far. I’m fluent in C, C++, Perl, PHP, Java, and x86, I’m perfectly familiar (although haven’t written so much code yet) with Python, Ruby, and ECMAScript, and I’m able to read a lot of others. I don’t feel stuck in one programming language.

Cassandra is a highly-distributable NoSQL database with tunable consistency. What makes it highly distributable makes it also, in part, vulnerable: the whole deployment must run on synchronized clocks.

It’s quite surprising that, given how crucial this is, it is not covered sufficiently in literature. And, if it is, it simply refers to installation of a NTP daemon on each node which – if followed blindly – leads to really bad consequences. You will find blog posts by users who got burned by clock drifting.

In the first installment of this two part series, I’ll cover how important clocks are and how bad clocks can be in virtualized systems (like Amazon EC2) today. In the next installment, coming out next week, I’ll go over some disadvantages of off-the-shelf NTP installations, and how to overcome them.

About clocks in Cassandra clusters

Cassandra serializes write operations by time stamps you send with a query. Time stamps solve an important serialization problem with inherently loosely coupled nodes in large clusters. At the same time however, time stamp are its Achilles’ heel. If system clocks runs off each other, so does time stamps of write operations and you are about to experience inexplicable data inconsistencies. It is crucial for Cassandra to have clocks right.

Boot-time system clock synchronization is not enough unfortunately. No clock is the same and you will eventually see clock drifting, i.e. growing difference among clocks in the system. You have to maintain clock synchronization continually.

It is a common misconception that clocks on virtual machines are somewhat resistant against clock drifting. In fact, virtual instances are especially prone to, even in a dramatic way, if the system is under heavy load. On Amazon EC2, you can easily observe drift about 50ms per day on unloaded instance and seconds per day on a loaded instance.

How much clocks need to be synchronized? It depends on your type of work load. If you run read-only or append-only queries, you are probably fine with modestly synchronization. However if you run concurrent read-update queries it’s starting to be serious. And if you do so because of API calls or concurrent job processing, it’s critical down to milliseconds.

Unfortunately, there is great, off-the-shelf ready solution. Why unfortunately?

Network Time Protocol

Network Time Protocol (NTP) gets the time from external time source in the network and propagates it further down the network. NTP uses hierarchical tree-like topology, where each layer is referred to as “clock strata”, starting with Stratum 0 as the authoritative time source, and continuing with Strata 1, 2, etc. Nodes which synchronizes clocks with nodes on Stratum n become nodes on Stratun n+1. NTP daemon sends time queries periodically to specified servers, adjusts the value to network latency associated with the message transmission, and re-adjusts the local clock to the calculated time. Running NTP daemon will help to avoid clock drifting especially on loaded machines.

In order to make NTP work you need to specify a set of servers where the current time will be pulled from. NTP servers may be provided by your network supplier, or you can use publicly available NTP servers. The best list of available public NTP servers is NTP pool project where you can also find best options for you geographical region. It is a good thing to use this pool. You should not use NTP servers without consent of the provider.

How to install NTP daemon

Installing NTP daemon is as simple as:

and it works immediately. That’s because it is pre-configured to use a standard pool of NTP servers. If you look at /etc/ntp.conf you will see servers defined with the server parameter, for example:

This is default for Debian systems, you may see a slightly different list in your distribution. The iburst parameter is there for optimization. If you want to check how NTP daemon works, run the following command: ntpq -p. You will get a list similar to this one:

It shows you the list of servers it synchronizes to, its reference, stratum,
synchronization periods, response delay, offset from the current time and jitter.

NTP uses optimizing algorithm which selects the best source of current clock as well as a working set of servers it takes into account. Node marked with “*” is the current time source. Nodes marked with “+” are used in the final set. Nodes marked with “-” are discarded by the algorithm.

You can restart NTP daemon with

and watch grabbing a different set of servers, selecting the best source and gradually increasing the period when servers are contacted when the clock gets stabilized.
Works like a charm.

Why not to just install NTP daemon on each node

If NTP works so great out of the box, why not simply install it on all boxes? In fact, this is exactly the advice you commonly get for cluster setup.

With respect to Cassandra, it’s the relative difference among clocks that matters, not their absolute accuracy. By default NTP will sync against a set of random NTP servers on the Internet which will result in synchronization of absolute clocks. Therefore the relative difference of clocks in the C* cluster will depend on how clocks are synchronized to absolute values from several randomly chosen public servers.

Look at the (real) example output from the ntpq command, the offset column. The difference among clocks is about 0.1ms, 0.5ms, but there is also an outlier with 9ms difference. Synchronization to the millisecond is a reasonable requirement, which requires one to synchronize absolute clocks to 0.5ms after/before boundary.

How precise, in absolute values, are public NTP servers? We ran a quick check of 560 randomly chosen public NTP servers from the public pool. The statistics are:

  • 11% are below 0.5ms drift
  • 15% are below 1ms drift
  • 62% are below 10ms drift
  • 11% are below 100ms drift

There are also outliers, with one being off by multiple hours.

Assuming: (1) our checks are representative, (2) each NTP daemon picks up 4 random NTP servers, and (3) synchronizing to the second best option (this is optimistic) these are the probabilities of our cluster clocks being off:

How to read it: assume a cluster of 25 nodes, then with the probability of 50% there will be two nodes with clock difference of more than 18.8ms.

The results may be surprising – even in a small cluster of 10 nodes they will be off by more than 10.9ms half of the time, and with a probability of 10% it will be off by more than 30ms.

Synchronizing Clocks In a Cassandra Cluster Pt. 1 – The Problem” was created by Viliam Holub, CTO at Logentries.

Spark SQL against Cassandra Example

May 14, 2015

By 

Brian O’Neill, Chief Technology Officer at Health Market Science
Brian is Chief Technology Officer at Health Market Science (HMS) where he heads development of their data management and analytics platform, powered by Storm and Cassandra.  Brian won InfoWorld’s Technology Leadership award in 2013 and authored, Storm: Blueprints for Realtime Computation.  He has a number of patents and holds a B.S. in C.S. from Brown University.

Spark SQL is awesome.  It allows you to query any Resilient Distributed Dataset (RDD) using SQL.  (including data stored in Cassandra!)

First thing to do is to create a SQLContext from your SparkContext.  I’m using Java so…
(sorry — i’m still not hip enough for Scala)

Now you have a SQLContext, but you have no data.  Go ahead and create an RDD, just like you would in regular Spark:

(The example above comes from the spark-on-cassandra-quickstart project, as described inmy previous post.)

Now that we have a plain vanilla RDD,  we need to spice it up with a schema, and let the sqlContext know about it.  We can do that with the following lines:

Shazam.  Now your sqlContext is ready for querying.  Notice that it inferred the schema from the Java bean. (Product.class).  (Next blog post, I’ll show how to do this dynamically)

You can prime the pump with a:

The count operation forces Spark to load the data into memory, which makes queries like the following lightning fast:

That’s it.  Your off to the SQL races.

P.S.  If you try querying the sqlContext without applying a schema and/or without registering the RDD as a table, you may see something similar to this:

Spark SQL Against Cassandra Example” was created by DataStax Cassandra MVP Brian O’Neill; to view more postings by Brian, check out his blog.

Stavros Kontopoulos Big Data Engineer at Pollfish
"Cassandra gave us an extendible write capacity, matching our use case and future needs. It is a perfect match for our time series based data. All in all, it is an easily managed technology for cloud deployments, fault-tolerance and with no downtime."
Stavros Kontopoulos Big Data Engineer at Pollfish

Pollfish is an online survey platform which permits real time surveys targeting mobile app users.

This is accomplished through its Android/iOS SDK which is installed to thousands of mobile apps while the company closely cooperates with mobile app developers for a smooth integration. Currently Pollfish is developing its own analytic pipeline to deliver insights about mobile users. My role here is that of a data engineer. I am part of a small yet great team comprised of Mr. Nick Gavalas the devops engineer who makes sure everything works fine around the clock, and Mr. Euaggelos Linardos our data scientist who builds real-time data insights.

I am currently working on ETL and data management use cases using DataStax Enterprise product, mostly utilizing Spark and Cassandra technologies.

To summarize, I am responsible for bringing data scientist’s results to the production pipeline, pretty cool!

Survey data

Cassandra is our main database technology for storing/querying the raw data coming from user mobile apps where Pollfish sdk is installed. It also stores system events. It is the raw data repository for the Pollfish application.

Being part of the free DataStax Startup Program we also use the DataStax Enterprise extensions such as the Cassandra CFS to store large files instead of a native HDFS implementation. By using Apache Spark on top of Cassandra (through spark – Cassandra connector), data processing capabilities are really unlimited. Any restrictions at the Cassandra level can always be removed at the spark level assuming the data schema is reasonably tuned. As a last step which we took recently, we integrated Cassandra through DataStax Enterprise with Apache Solr, allowing us to do advanced search queries on top of processed data.This is more like an OLAP usage.

Time series; efficient writes; Spark and Hadoop

As a first need we wanted an efficient storage technology for writing raw time series data coming from the user side. Cassandra with its tunable consistency and its efficient write path was just a perfect match in the world of NoSQL databases. Another important aspect is that we wanted a tool to start fast and make progress, as there are limited resources for a start-up to support complex setups. Business as always matters the most, and Cassandra along with DataStax Enterprise served our goals.

It was mostly a use case driven evaluation process. After initial stress tests and model validation we moved to this technology via DataStax Enterprise.  DataStax provided not only the production certified Cassandra database but also very easy integration with Apache Spark our next targeted technology and Hadoop FS compatibility through Cassandra FS.  We have considered other technologies like MongoDB but there was a clear match for our data modeling process, so we moved to that direction.

Cassandra gave us an extendible write capacity, matching our use case and future needs. It is a perfect match for our time series based data. All in all, it is an easily managed technology for cloud deployments, fault-tolerance and with no downtime.

DataStax’ free startup program gave us use of the product without limitations. Giving us the flexibility of fees not coming until company grows, with no node number limitation!

Azure deployment

Currently we use DataStax Enterprise version 4.6.2 which ships with Cassandra version: 2.0.12.274 (cqlsh 4.1.1 , CQL spec 3.1.1 | Thrift protocol 19.39.0).

Our production cluster setup has been documented on Microsoft Azure’s blog and is as follows:

  • 4 nodes running on Cassandra DC, serving our application needs. These nodes are Standard_A4 machines (8core, 14GB of RAM).
  • 3 nodes running on Analytics DC, running spark and hadoop (cfs) analytics. These are Standard_D13 machines (8core, 56GB of RAM)
  • 1 node running in Solr DC, with replication of one single keyspace. This last node is a new addition in our cluster and we’re still testing Solr capabilities. However, it’s deployed on a Standard_D4 VM (8core, 28GB of memory),  and has its data directory pointing on the temporary SSD disk, again for testing purposes.
  • All the machines (except the Solr node) have a 15-disk RAID-0 configuration, in order to achieve maximum disk throughput.
  • All the machines operate on CentOS 6.5, on kernel 2.6.32-431.29.2.el6.x86_64.
Lessons learned

Firstly, isolate your work loads. Don’t be tempted if for example you are designing an analytics pipeline to mix the workloads. Issues will arise such as failing nodes and/or instability of the cluster in general.

There are hundreds of configuration options to use but you should start only with the basics and tuning on demand only and after careful thought. Example you could start with the heap size a Cassandra node uses and if that matches your use case.  Follow best practices from the start and change one thing at a time.

Monitoring is crucial in order to verify your production environment settings, Opscenter is a very useful tool if not the only one towards that direction. Look out for full disks, exceeding JVM old generation heap size, frequent gc cycles, read/write latencies.

Cassandra as other products is an evolving technology, having said that you may hit some issues like current implementation limitations or bugs. As a common solution strategy you should evaluate your use case, the roadmap for the features you need and plan for upgrades or refactorings.

Another thing to consider is that if you integrate Cassandra and Spark in your Lambda architecture you must design the schema for idempotent writes. Repeatability is a requirement in such a use case eg, due to spark job failures or development bugs. While deleting data is an option overwriting is best, exploiting Cassandra’s fast write path.

Do not optimize early.  It is very tempting with all the available settings to start tuning.  Refrain from doing that and make sensible changes using a measured justifiable process (e.g. use jconsole, a java profiler etc). Finally just learn your tool and enjoy it!

Community

The Cassandra community is evolving along with the products and getting pretty mature. My interaction was through mail lists and communication has been really helpful. I am looking forward to learn more, it is a very exciting technology!

Data Locality w/ Cassandra : How to scan the local token range of a table…

May 12, 2015

By 

Brian O’Neill, Chief Technology Officer at Health Market Science
Brian is Chief Technology Officer at Health Market Science (HMS) where he heads development of their data management and analytics platform, powered by Storm and Cassandra.  Brian won InfoWorld’s Technology Leadership award in 2013 and authored, Storm: Blueprints for Realtime Computation.  He has a number of patents and holds a B.S. in C.S. from Brown University.

I’m working on a mechanism that will allow HPCC to access data stored in Cassandra with data locality, leveraging the Java streaming capabilities from HPCC (more on this in a followup post). More specifically, we want to allow people to write functions in ECL that will execute on all nodes in an HPCC cluster, using collocated Cassandra instances as the source of their data.

To do this however, we need a couple things. If you remember, Cassandra data is spread across multiple nodes using a token ring. Each host is assigned one or more slices of that token ring. Each slice has a token range, with a start and an end. Each partition/row key hashes into a token, which determines which node gets that data. (Typically, Murmur3 is used as the hashing algorithm.)

Thus, to scan over the data that is local to a node, we need to determine the token ranges for our local node, then we need to page over the data in that range.

First, let’s look at how we determine the ranges for the local node. You could query the system tables directly: (SELECT * FROM system.local), but if you are using a connection pool (via the Java driver), it is unclear which node will receive that query. You could also query the information from the system.peers table using your IP address in the where clause, but you may not want to configure that IP address on each node. Instead, I was able to lean on the CQL java-driver to determine the localhost:

With a Host in hand, the java-driver makes it easy to get the Token Ranges:

The code is very straightforward, with the exception of the call to unwrapTokenRanges.   When you ask for the token ranges for a host, CQL will give you the tokens in ranges, but CQL does NOT handle wrapped ranges.  For example, let’s assume we had a global token space of [-16…16].  Our host may have token ranges of [-10…-3], [12…14] and [15…-2].  You can issue the following CQL queries (Notice that token ranges are start exclusive and end inclusive.):

However, you CANNOT issue the following CQL:

That range wraps around.  To accommodate this, the java-driver provides a convenience method called unwrap().  You can then use that method, to create a set of token ranges usable in CQL queries that will account for the token range wrapping.

Finally, we need to be able to page over the data.   Fortunately, with the 2.0 release of the java-driver, we are able to do this in a few lines of code:

The above code issues a select statement and pages over the results, scanning the portion of the table specified within the token range.

If we throw a loop on top of all of this, to go through each token range and scan it, we have a means of executing a distributed processing job, that uses only the local portions of the Cassandra tables as input.

Stay tuned for my next post, when I show how to plug this into HPCC.

This posting was originally created by Brian O’Neill; to read more postings by Brian, check out his blog here.

1 2 3 151