Hitendra Pratap Singh, Cassandra Software Engineer at Recruiting.com
"We investigated several NoSQL solutions including Redis, MongoDB and Cassandra. We landed on Cassandra for it’s great track record of scalability, performance, reliability and availability of support, as well as it’s ease of integration with our API."
Hitendra Pratap Singh, Cassandra Software Engineer at Recruiting.com

Recruiting.com provides next generation software and technology to help organizations succeed by recruiting and hiring the right people. Our technology solutions enable employers to find and effectively recruit talent through our cloud-based candidate management software and network of leading local, diversity, and niche job boards, including Jobing.com.

My role at Recruiting.com is Software Engineer, working on Cassandra based features starting from designing, development, testing, support and maintenance.

From SQL to Apache Cassandra

Recruiting.com is using Apache Cassandra to achieve real-time, high-throughput applications in our Candidate Relationship Manager. Some of the use cases include tracking millions of events per day, which are computed into client analytics and messaging systems to our clients.

We knew before we made our decision to go with Apache Cassandra that we needed a highly-scalable solution to deliver real-time analytics and messaging to our clients. Our SQL server wasn’t keeping up with our growing demands of tracking, and we found ourselves architecting around areas where SQL isn’t great. We investigated several NoSQL solutions including: Redis, MongoDB and Cassandra. We landed on Apache Cassandra for it’s great track record of scalability, performance, reliability and availability of support, as well as it’s ease of integration with our API.

We have a 6-node cluster in our own data center and the Apache Cassandra version we are using is 1.0.9; we’re in the process of upgrading it to 1.2.10 and an 8 node cluster.

Monitoring Apache Cassandra with SPM

We started using SPM Performance Monitoring and Reporting from Sematext for Apache Solr and were impressed with the amount of real-time stats we could analyze using SPM. We expected the same amount of details for Cassandra as well and decided to go with SPM.  Some of the benefits we’ve seen from SPM include the alert notification system, graphical interface [i.e. easy to analyze], detailed stats related to JVM, and creation of our own custom metrics.

We also utilize SPM for monitoring our deployments of Apache Solr and Memcached servers.

On the “overview” screen found below, you can check out some Cassandra metrics, as well as various OS metrics. Specific Cassandra metrics can be drilled down by clicking on one of the tabs along the left side; these metrics include: Compactions, Bloom Filter (space used, false positives ratio), Write Requests (rate, count, latency), Pending Read Operations (read requests, read repair tasks, compactions), and more.

cassandra-app
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

(Click to Enlarge)
Conclusion

The advice we have for new Apache Cassandra users is to pay extra attention to schema design, by imagining all possible ways that data will be queried. It is much more involved to change Cassandra schemas once you have live data.

Additionally, you may be disappointed with Apache Cassandra if you think it’s a solution for all your database needs; if you are looking to solve specific problems with scalability, reliability, performance, time series data, and data access speed (especially if you have lot of writes) then Apache Cassandra is the perfect NoSQL database.  Additionally, Cassandra has a great community.

Apache Cassandra and Low-Latency Applications

January 26, 2015

By 

Alexey Zotov, Senior Software Engineer at Grid Dynamics.
Alexey Zotov is a Senior Software Engineer at Grid Dynamics. He joined Grid Dynamics in August 2011. Since that time he has participated in several projects. Currently, his work is related to a custom BigData platform for analysis and data reporting.

 

Dmitry Yaraev, Senior Software Engineer at Grid Dynamics.
Dmitry brings over 10 years in software development to his role of a Senior Software Engineer at Grid Dynamics. He was previously a Software Engineer at several other companies where he found his way to BigData development. Currently, Dmitry is involved in development of scalable BigData solutions built on Hadoop/Spark/Cassandra.

 

Sergey Tryuber, Big Data Practice Lead at Grid Dynamics.
Sergey is a designer of massive data processing pipelines on top of in-stream/batch processing frameworks and NoSQL databases. He advocates open source model and is a supporter of community cooperation. Sergey is currently a BigData Practice Lead at Grid Dynamics.

Introduction

Over the years, Grid Dynamics has had many projects related to NoSQL, particularly Apache Cassandra. In this post, we want to discuss a project which brought exciting challenges to us, and questions we tried to answer in that project remain relevant today, as well.

Digital marketing and online ads were popular in 2012, and a demand for them has only increased. Real-time bidding (RTB) is an integral part of the domain area. RTB supposes that an ad is placed (bought and sold) via real-time auction of digital ads. If the bid is won, the buyer’s ad is instantly displayed on the publisher’s site. RTB requires a low-latency response from server side (<100ms), otherwise the bid is lost. One of our clients, a US media company, was interested in real-time bidding and user tracking (i.e. the analysis of website visitors’ behavior and their preferences).

Initially, the client’s infrastructure for processing RTB requests included installations of Kyoto Cabinet. On the image below (Picture 1), you can see a source for RTB and third-party requests. All the requests were sent to real-time applications which performed lookup and update requests in the database. Kyoto Cabinet kept the whole dataset in memory, and custom add-ons provided functionality for retention management and persistence.

 Screen Shot 2015-01-23 at 1.42.26 PM

Picture 1. The previous architecture.

The aforesaid architecture was good enough from latency perspective, but nevertheless, it had several disadvantages:

  1. Scalability. The architecture supposed only vertical scaling of servers with installations of Kyoto Cabinet. At that time, the servers were equipped with about 50GB memory each. It was clear to everybody that increasing memory amount would solve the problem long term.

  2. Robustness. The only installation of Kyoto Cabinet might cause very serious consequences in case of a failure.

  3. Cross-datacenter replication. The architecture did not have automatic synchronization between data centers. Manual synchronization was a real headache because it required a lot of additional operations.

Our task was to create a new architecture for the system which would not have the aforesaid drawbacks and, at the same time, would allow us to achieve good results in response latency. In other words, we were in need of a data store which would allow us to keep user profiles as well as to perform lookups and updates on them, and all the operations were to be performed within a certain time interval. The architecture was supposed to be built around such a data store.

 

Requirements

The new architecture was intended to solve all these problems. The requirements for the new architecture were as follows:

  • persistency (no data should be lost in case of power outage in one or both data centers)

  • high availability (there should be no single point of failure)

  • scalability (database volume should be relatively easy to increase by adding more nodes)

  • cross-datacenter replication (data should be synchronized between both data centers)

  • TTL for data (outdated user profiles should be automatically evicted)

  • data volume (about 1 billion homogeneous records with multiple attributes, where one record is ~400 bytes)

  • throughput (5000 random reads + 5000 random writes per second for each data center)

  • latency of responses (3ms on average, processing time should not exceed 10 ms for 99% of requests)

Also we had some limitations which were related to the infrastructure. One of the limitations was the ability to install a maximum of eight servers per database in each datacenter. At the same time we could select certain server hardware, such as memory amount, storage type, and size. One of the additional requirements from the client was to use replication factor TWO which was acceptable due to the statistical nature of data. This could reduce the hardware cost.

We examined several possible solutions that could meet our requirements and finally opted for Cassandra. The new architecture with Cassandra became a much more elegant solution. It was just a Cassandra cluster synchronized between two data centers. But a question about its hardware specifications still remained unanswered. Initially we had two options:

  • SDDs but less memory (less than the entire dataset)

  • HDDs and more memory (sufficient for keeping the whole dataset)

Actually, there was one more option which implied using hard drives and less memory, but this configuration did not provide the read latency acceptable for our requirements as random read from an HDD takes about 8ms even for 10K RPM hard drives. As a result, it was rejected from the very beginning.

Thus, we had two configurations. After some tuning (the tuning itself will be discussed in the next section) they both satisfied our needs. Each of them had its own advantages and disadvantages. One of the main drawbacks of the SSD configuration was its cost. Enterprise-level SDDs were rather expensive at that time. Besides, some data center providers surcharged for maintaining servers with SSDs.

The approach with HDDs meant reading data from disk cache. Most disadvantages of the configuration were related to the cache, for example, the problem of cold start. It was caused by the fact that the cache was cleaned after system reboot. As a result, reading uncached data from HDD brought about additional timeouts. The timeouts, in fact, were requests which got no response within 10ms. Besides, disk cache could be accidentally cleaned as a result of copying a large amount of data from a Cassandra server while it was up. The last issue was related to the memory size rather than to the cache. Increasing data amount for a single node was quite difficult. It was possible to add an additional HDD or several HDDs, but memory size for a single machine was limited and not very large.

Finally, we managed to resolve most of the aforesaid issues of the HDD configuration. The cold start problem was resolved by reading data with cat utility and redirecting its output to /dev/null on startup. The issue related to disk cache cleaning went away after patching rsync which was used for creating backups. But the problem with memory limitations remained and caused some troubles later.

In the end, the client selected the HDD + RAM configuration. Each node was equipped with 96GB memory and 8 HDDs in RAID 5+0.

 

Tuning Cassandra

A version of Cassandra we started with was 1.1.4. Further on, in the process of development we tried out different versions. Finally, we decided upon version 1.2.2 which was approved for production because it contained changes we had committed to Cassandra repository. For example, we added an improvement which allowed us to specify the populate_io_cache_on_flush option (which populates the disk cache on memtable flush and compaction) individually for each column family.

We had to test both remaining configurations to select a more preferable one. For our tests we used a Cassandra cluster that included 3 nodes with 64GB memory and 8 cores each. We started the testing with write operations. During the test, we wrote data into Cassandra at the speed of 7000 writes per second. The speed was selected in proportion to the cluster size and the required throughput (doubled for writes in order to take into account cross-datacenter replication overhead). This methodology was applied to all tests. It is worth mentioning that we used the following preferences:

  • replication_factor=2

  • write_consistency_level=TWO

  • LeveledCompactionStrategy

 

LeveledCompactionStrategy (LCS) was used because the client’s workflow was supposed to have a lot of update operations. Another reason for using LCS was the decreasing overall dataset size and read latency. Test results were the same for the both configurations:

  • Avg Latency: ~1ms

  • Timeouts: 0.01%

  • CPU usage: <5%

 

Both configurations satisfied our needs, though we did not spend time investigating timeouts nature at this stage. Timeouts will be discussed later. Presumably, most of the response time was taken by the network transfer. Also, we tried to increase the number of write queries per second and it yielded good results. There were no noticeable performance degradation.

After that we moved to the next step, i.e. testing read operations. We used the same cluster. All read requests were sent with read_consistency_level=ONE. The write speed was set to 3500 queries per second. There were about 40GB of data on each server with the single record size of about 400 bytes. Thus, the whole dataset fit the memory size. Test results were as follows:

Screen Shot 2015-01-23 at 1.42.46 PM

Table 1. Initial test results of read operations

Looking at test results for both configurations, we found unsatisfactory percentage values of timeouts which were 2-3 times the required value (2-3% against 1%). Also, we were anxious about the high CPU load (about 20%). At this point, we came to a conclusion that there was something wrong with our configurations.

It was not a trivial task to find the root of the problem related to timeouts. Eventually, we modified the source code of Cassandra and made it return a single fixed value for all read requests (skipping any lookups from SSTables, memtables, etc.). After that, the same test on read operations was executed again. The result was perfect: GC activity and CPU usage were significantly reduced and there were almost no timeouts detected. We reverted our changes and tried to find an optimal configuration for GC. Having experimented with its options, we settled upon the following configuration:

  • -XX:+UseParallelGC

  • -XX:+UseParallelOldGC

  • -XX:MaxTenuringThreshold=3

  • -Xmn1500M

  • -Xmx3500M

  • -Xms3500M

We managed to reduce the influence of GC to performance of Cassandra. It is worth noting that the number of timeouts on read operations exceeded that on write operations because Cassandra created a lot of objects in heap in the course of reading, which in its turn caused intensive CPU usage. As for the latency, it was low enough and could be largely attributed to the time for data transfer. Executing the same test with more intensive reads showed that in contrast to write operations increasing the number of read operations significantly affected the number of timeouts. Presumably, this fact is related to the growing activity of GC.

It is a well-known fact that GC should be configured individually for each case. In this case, Concurrent Mark Sweep (CMS) was less effective than Parallel Old GC. It was also helpful to decrease the heap size to a relatively small value. The configuration described above is one that suited our needs, though it might have been not the best one. Also, we tried different versions of Java. Java 1.7 gave us some performance improvement against Java 1.6. The relative number of timeouts decreased. Another thing we tried was enabling/disabling row/key caching in Cassandra. Disabling caches slightly decreased GC activity.

The next option that produced surprising results was the number of threads in pools which processed read/write requests in Cassandra. Increasing this value from 32 to 128 made a significant difference in performance as our benchmark emulated multiple clients (up to 500 threads). Also, we tried out different versions of CentOS and various configurations of SELinux. After switching to a later 6.3 version, we found that Java futures returned control by timeout in a shorter period of time. Changes in configuration of SELinux made no effect on performance.

As soon as read performance issues were resolved, we performed tests in the mixed mode (reads + writes). Here we observed a situation which is described in the chart below (Picture 2). After each flush to SSTable Cassandra started to read data from disks, which in its turn caused increased timeouts on the client side. This problem was relevant for the HDD+RAM configuration because reading from SSD did not result in additional timeouts.

Screen Shot 2015-01-23 at 1.42.52 PM

Picture 2. Disk usage in the mixed mode (reads + writes) before improvements.

We tried to tinker around with Cassandra configuration options, namely, populate_io_cache_on_flush (which is described above). This option was turned off by default, meaning that filesystem cache was not populated with new SSTables. Therefore when the data from a new SSTable was accessed, it was read from HDD. Setting its value to true fixed the issue. The chart below (Picture 3) displays disk reads after the improvement.

Screen Shot 2015-01-23 at 1.42.58 PM

Picture 3. Disk usage in the mixed mode (reads + writes) after improvements.

 

In other words, Cassandra stopped reading from disks after the whole dataset was cached in memory even in the mixed mode. It’s noteworthy that populate_io_cache_on_flush option is turned on by default in Cassandra starting from version 2.1, though it was excluded from the configuration file. The summary below (Table 2) describes the changes we tried and their impact.

Screen Shot 2015-01-23 at 1.43.04 PM

Table 2. Changes to Cassandra and the system itself and their effect on latency.

 

Finally, after applying the changes described in this post, we achieved acceptable results for both SSD and HDD+RAM configurations. Much effort was also put into tuning a Cassandra client (we used Astyanax) to operate well with replication factor two and reliably return control on time in case of a timeout. We would also like to share some details about operations automation, monitoring, as well as ensuring proper work of the cross data center replication, but it is very difficult to cover all the aspects in a single post. As stated above, we had gone to production with HDD+RAM configuration and it worked reliably with no surprises, including Cassandra upgrade on the live cluster without downtime.

 

Conclusion

Cassandra was new to us when it was introduced into the project. We had to spend a lot of time exploring its features and configuration options. It allowed us to implement the required architecture and deliver the system on time. And at the same time we gained a great experience. We carried out significant work integrating Cassandra into our workflow. All our changes in Cassandra source code were contributed back to the community. Our digital marketing client benefited by having a more stable and scalable infrastructure with automated synchronization reducing the amount of time they had to maintain the systems.

 

About Grid Dynamics

Grid Dynamics is a leading provider of open, scalable, next-generation commerce technology solutions for Tier 1 retail. Grid Dynamics has in-depth expertise in commerce technologies and wide involvement in the open source community. Great companies, partnered with Grid Dynamics, gain a sustainable business advantage by implementing and managing solutions in the areas of omnichannel platforms, product search and personalization, and continuous delivery. To learn more about Grid Dynamics, find us at www.griddynamics.com or by following us on Twitter @GridDynamics.

KZ Win DevOps Engineer at Peloton
"We find that an appropriately configured Cassandra cluster is a scalable database backend. The ability to upgrade server software without causing any downtime in our application is a true luxury."
KZ Win DevOps Engineer at Peloton

Peloton is a fitness company. Peloton sells a high-end indoor stationary bike attached with a 22″ Android tablet to which we stream live cycling classes from our flagship studio in New York City. I am a DevOps Engineer and my role is to scale and maintain our cloud infrastructure.

A quick and scalable ride

At Peloton we needed to store and retrieve packet data from active Peloton stationary bikes in real time.

My team members had knowledge of Cassandra from working on past projects and had a positive experience with the technology. We were aware of other technologies that compete with Cassandra but chose Cassandra because we needed to build our application in a short time and it made sense to go with a familiar technology.

PelotonScreen

Scale in luxury

We started with version 2.0.3 and recently upgraded successfully to 2.0.10. We deploy with Chef; we can easily add a node to our existing cluster using knife bootstrap. We are currently in one region of AWS.

Cassandra allows us to scale our real time data storage horizontally by throwing hardware at the problem.  We find that an appropriately configured Cassandra cluster is a scalable database backend. The ability to upgrade server software without causing any downtime in our application is a true luxury.

Words from the wise

There can never be enough monitoring and logging for Cassandra.

The replication factor should be high enough for your read/write consistency level. For example, if you are reading and writing at a quorum, the replication factor should be at least three if you want high availability. Ask yourself whether you really need quorum and whether you can get away with a consistency of one.


Learn more about Peloton’s use case at an upcoming New York Cassandra meetup

User-Defined-Functions presentation at Cassandra EU Summit 2014

January 20, 2015

By 

Robert Stupp, Apache Cassandra Committer.
Robert started his professional IT career in 1995 and is specialized in high-volume backend data processing with strong knowledge about coding and networks. He’s a contributor to the Apache Cassandra open source community and active with Cassandra’s 3.0 feature, “user defined functions.” Besides that he conducted Cassandra presentations and workshops as part of his previous projects.

Back in 2014 I took a stab on implementing Apache Cassandra 3.0 feature User-Defined-Functions – it developed nicely in the last months.

We have added a lot more functionality, made things clearer and so on. As I mentioned in the presentation: it’s stuff, that changes – even shortly before release. When Cassandra 3.0 is going to be release you can expect a blog article about UDFs again that covers the whole stuff.

UDF – Current Status

Until now it supports execution of small pieces of user code in Java (or a scripting language like JavaScript) and aggregation of data using your own aggregation functions.
But – UDF is not for doing something like map-reduce or doing expensive analysis stuff. Keep in mind, that UDFs are executed on the coordinator node (the node in your C* cluster, that received your query).

There are thoughts about moving UDF processing to the nodes owning the partitions if a win in terms of e.g. execution time is certain. But please do not expect that for 3.0. It’s just an idea right now.

UDF – Red flag

I want to put a big red flag (“don’t do it“) on using UDFs with scripting languages. Although it is a nice feature, it is also an expensive feature. Why?

Java source UDFs are directly compiled to byte code – Java UDFs can be immediately invoked without any indirection – without reflection or invoke-dynamic. It has nearly-zero invocation latency and can be optimized by Hotspot.

For scripted UDFs this is not true! These are invoked via the scripting language’s SPI implementation, which maybe has to convert types, lookup functions, eventually just interprets the functions and has to go back into Java if you work with collections, tuples or UDTs. For example: invoking a JavaScript UDF takes approx. 1000 times longer than invoking a Java UDF.
So – IMO scripted UDFs are nice for quick prototyping – but should be replaced with Java UDFs in production code.

Cassandra 3.0

It is mostly certain that these features find their way into the Apache Cassandra 3.0 release:

  • Core User-Defined-Functions
  • User-Defined-Aggregates
  • Functional indexes
  • Permissions for functions
  • Support for this in cqlsh

Presentation

Thanks to all those people who gave me a lot of useful tips. I’d like to sum them up:

  • Prepare you presentation – take your time. I prepared this presentation months before the Summit. For this one, I also asked others to do a review.
  • Don’t put too much onto the slides – focus on the „big thing“
  • Train your skills on local meet-ups
  • If in doubt: less is more. Don’t go too much into details. You can always explain some specifics, that people are interested in, later during Q&A or in a chat. A longer Q&A part is better than to overrun the (fixed) time slot.
  • Use a real presentation remote :)
  • One of the best tips IMO: Start your presentation by starting with a loud welcome.

During other presentations and trainings I’ve used my iPhone as a remote for Keynote – it worked nicely. But on the Summit, the iPhone always changed the orientation. So when I wanted to go forward, it went backwards. Not funny…

Lesson learned: use a “real“ presentation remote – you just need one big button to advance slides. :)

Robert Stupp
Committer to Apache Cassandra


User-Defined-Functions presentation at Cassandra EU Summit 2014” was created by Robert Stupp, Committer to Apache Cassandra.

Kindling: An Introduction to Spark with Cassandra (Part 1)

January 20, 2015

By 

Erich Ess, CTO at SimpleRelevance.
Erich is the CTO for SimpleRelevance a company which does dynamic content personalization using all the tools of data science.  Before joining SimpleRelevance, Erich spent many years working on scalable distributed architectures.  In college, he focused on mathematics and computer graphics.  To this day, he still loves studying and talking about mathematics and is currently having a lot of fun with category theory and the functional languages F# and Clojure.

This is an introduction to the new (relatively) distributed compute platform Apache Spark.  The focus will be on how to get up and running with Spark and Cassandra; with a small example of what can be done with Spark.  I chose to make this the focus for one reason:  when I was trying to learn Spark two months ago I had difficulty finding articles on how to setup Spark to use Cassandra.  The process is actually not that difficult, but pulling all the steps together required some searching and investigation.  The secondary focus is to give you a small taste of what can be done with Spark, so that once you have the Spark cluster up and running there is motivation to keep exploring and experimenting.

This is based on a talk I gave at the Chicago Cassandra meet up in late October.  That talk in turn was done to give me a fixed goal for learning how to use Spark.  This blog post is the reverse of that, it took quite a bit of work in my free time to get Spark working with Cassandra, so here I hope to lay out the minimum steps needed to get a Spark and Cassandra cluster up and running.

What Is Spark?

Spark is a recent Hadoop successor and it looks to have taken a lot of lessons from how the Hadoop API was designed.  Spark works with a master/slave configuration, where there is a light weight “Master” service which acts as an interface to the cluster; it keeps track of the state of each node and wrangles nodes when jobs are submitted.  Spark differs from Hadoop in several ways:  it supports both batch and stream processing, multiple programming languages out of the box (Scala, Java, and Python), in memory computations, an interactive shell, and a significantly easier to use API.  The last three are, in my opinion, the most exciting aspects of Spark.

With in memory computations, you can tell Spark to cache data into RAM as it is pulled out of data stores (HDFS, SQL, Cassandra, etc.).  After that, any computations performed on that data are done in memory without any expensive queries; this makes analytics much much faster than Hadoop.  If you go to http://spark.apache.org, that little graph on the landing page which shows the performance advantage over Hadoop is why Spark has in memory computations.

The interactive shell is used to connect to a Spark cluster and do interactive computations on your data.  This makes experimenting, poking around, and general hacking much easier.  Having a Spark Cluster also allows a developer or data scientist to quickly test out code and verify that it works without having to go through a slow deployment process.

Spark’s API design is what truly differentiates it from Hadoop.  The focus is on making interacting with distributed data incredibly elegant with as little boilerplate and configuration code as possible.  This design is founded upon the Resilient Distributed Dataset (referred to as an RDD) which abstracts out the data (that is spread across many machine) into a single enumerable data structure.  You program against this data structure exactly as if it were the familiar Lists, Vectors, or Arrays that are built into modern languages.  The RDD makes reasoning about distributed programs straightforward and natural.

The RDD truly shines in how you use it in code.  A very functional language paradigm is used for transforming and acting upon data:  which will be familiar to anyone who has used .Net’s LINQ framework or Java 8’s Streams.  To work with an RDD, you simply chain together a series of transformations (such as filters or maps or sorts) and then use an action to do something with that data (such as grab the first 10 elements).  There are two types of functions: transformations and actions.

Transformations perform change the state of the data in some way; they may filter the data, or double the value of an integer, or sort they data, or group the data by a key.  It’s important to understand that transformations are lazily evaluated:  this means that no work is done until an Action function is called which asks Spark to produce a result.  Lazy evaluation also means that Spark will do the bare minimum amount of work to produce the desired result.

Actions are functions which actually pull a value or values from the RDD; this could be asking for the first element, or the first ten elements, or the number of elements in the RDD, or iterating through each element of the RDD.  Many things can be accomplished in Spark simply by chaining a series of transformations together and then iterating with a foreach.  As mentioned above, the Action is what causes Spark to go out and perform a computation.

How to Install Spark with Cassandra

The following steps describe how to setup a server with both a Spark node and a Cassandra node.

Note: For the purposes of this article, Spark and Cassandra will both be running on localhost.

There are two paths for setting up a Spark & Cassandra server:  if you have DataStax Enterprise then you can simply install an Analytics Node and check off the box for Spark or, if you are using the open source version, then you will need to follow these steps.  The second path is a little tricky, and getting the Spark Shell to work with Cassandra required the assistance of the very kind Cassandra community at StackOverflow.

Setting Up Open Source Spark

This assumes you already have Cassandra setup.

  1. Download and setup Spark
    1. Go to http://spark.apache.org/downloads.html.
    2. To make things simple, we will use one of the prebuilt Spark packages.
    3. Choose Spark version 1.2.0 and “Pre-built for Hadoop 2.4” then Direct Download.  This will download an archive with the built binaries for Spark.
    4. Extract this to a directory of your choosing.  I will put mine in ~/apps/spark-1.2
    5. Test Spark is working by opening the Shell
  2. Test that Spark Works
    1. cd into the Spark directory
    2. Run “./bin/spark-shell”.  This will open up the Spark interactive shell program
    3. If everything worked it should display this prompt: “scala>”
    4. Run a simple calculation:
      sc.parallelize( 1 to 50 ).sum(_+_)
      which should output 1250.
    5. Congratulations Spark is working!
    6. Exit the Spark shell with the command “exit”

The Spark Cassandra Connector

To connect Spark to a Cassandra cluster, the Cassandra Connector will need to be added to the Spark project.  DataStax provides their own Cassandra Connector on GitHub and we will use that.

  1. Clone the Spark Cassandra Connector repository: https://github.com/datastax/spark-cassandra-connector
  2. cd into “spark-cassandra-connector”
  3. Build the Spark Cassandra Connector
    1. Execute the command “./sbt/sbt assembly”
    2. This should output compiled jar files to the directory named “target”.  There will be two jar files, one for Scala and one for Java.
    3. The jar we are interested in is: “spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar” the one for Scala.
  4. Move the jar file into an easy to find directory:  I put mine into ~/apps/spark-1.2/jars

To load the connector into the Spark Shell:

  1. start the shell with this command:  ../bin/spark-shell –jars ~/apps/spark-1.2/jars/spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar
  2. Connect the Spark Context to the Cassandra cluster:
    1. Stop the default context:
      sc.stop
    2. Import the necessary jar files:
      import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
    3. Make a new SparkConf with the Cassandra connection details:
      val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “localhost”)
    4. Create a new Spark Context:
      val sc = new SparkContext(conf)
  3. You now have a new SparkContext which is connected to your Cassandra cluster.

 

Test that the Spark Connector is working from the Shell

  1. Create a keyspace called “test_spark” in Cassandra
  2. create the table test_spark.test (value int PRIMARY KEY); in the test_spark keycap
  3. Insert some data (INSERT INTO test_spark (value) VALUES (1);)
  4. From the Spark Shell run the following commands:
    1. val test_spark_rdd = sc.cassandraTable(“test_spark”, “test”)
    2. test_spark_rdd.first
    3. The output should be:
      res1: com.datastax.spark.connector.CassandraRow = CassandraRow{value: 1}

Opening the Spark Shell

(A large number of thanks to the people who helped me: http://stackoverflow.com/questions/25837436/how-to-load-spark-cassandra-connector-in-the-shell )

 

Interacting with Cassandra from Spark

To read data from Cassandra, you create an RDD from a specific table.  This is very simple:

This will return an RDD of type Row, where Row is a data type which stores a single row from a table as a map from column name to value.

To make interacting with the data from Cassandra more elegant, it is possible to map the rows directly to a custom type which represents an atomic piece of data from the table.  One way to do this is to create a case class which represents the data from table.  For example, if there was a table of movies which consisted of these columns: (id int, title text, genres text).  Then a respective case class would be:

Then to read map the Cassandra table directly to that type you simply call ‘cassandraTable’ and specify the type:

Writing data to Cassandra is just as simple: on an RDD call the function saveToCassandra and specify the keycap and the table.  Make sure that the type of the RDD maps to the Cassandra table.

Will save the RDD of type Movie to the movies table in the keyspace spark_demo.

We have gotten Spark setup and running with Cassandra, and shown the basics of interacting with Cassandra from Spark and this marks the end of the first part of my article.  This may seem like an abrupt end, but do not worry, the focus of this post was explaining how to get Spark up and running with Cassandra.  In two weeks, I will be writing another post where we take our Spark+Cassandra stack and start doing cool things with it.

1 2 3 146