Target Delivers Digital Shopping Experiences with Apache Cassandra

March 30, 2015


As consumers continue moving towards digital shopping channels to make purchases, online and mobile shopping has become a core component of any retail business strategy.  And rightfully so as the eCommerce market is exploding with over 40 percent of worldwide internet users having purchased products or goods online contributing to a growing industry amounting to more than 1.2 trillion US dollars in 2013.

It doesn’t come as a shock when traditional retail giants realize the potential of eCommerce and start to build out new offerings.  That’s the case with Target Corporation, the second-largest discount retailer in the United States with over 1900 stores across the U.S. and Canada.  Target saw the massive potential which spawned a plethora of competitive retailer website all vying for a piece of their pie and quickly realized the need to embrace cutting-edge technology to drive new customer acquisition and competitive differentiation.

With sights on and mobile applications, Target’s API team was tasked to build out their existing API platform to provide an engaging digital experience across a wide array of platforms, from mobile devices to 3rd parties applications. Cutting-edge features for and their mobile applications would create a more engaging experience for millions of Target customers and increase Target’s brand awareness.

When asked if they considered relational database systems to support their API platform, Target’s API team quickly pointed out that all their systems are required to support multiple data centers and ensure continuous availability.  After some research and speaking with domain experts, Target selected Apache Cassandra for its ability to handle large volumes of simple transactions, masterless architecture, and support for multi-data center replication.

“Based on our requirements, Apache Cassandra was exactly what we needed. Apache Cassandra scales, can ingest huge amounts of data, can replicate across multiple data centers, and ensures continuous uptime. “ explained Dan Cundiff, Lead Architect at Target.  “We listened to the community and experts and realized that we also wanted access to DataStax’s expert support and services, as well as enterprise functionality such as security, analytics and search.”

Target relies on Cassandra to deliver engaging online and mobile customer experiences

Apache Cassandra has had a significant impact on Target’s API business by reducing time to market and increasing agility, both of which are key to delivering engaging online and mobile experiences in an extremely competitive retail market. In addition, because Apache Cassandra is easy to develop against, Target engineers are now able to say “yes” to new innovative features for APIs that support their high transaction mobile applications, which previously were a challenge.

What’s Next for Target?

Target is an example of an online retailer that is always reinventing itself.  They realized that in order to maintain their edge in the market, they need to take the next step in becoming an Internet Enterprise.  As a result, their infrastructure services team continues to focus on improving the online and mobile shopping experience.  And they rely on Cassandra everyday to achieve these results.

“Our time to market for new features has been reduced dramatically.  For some perspective of how easy Cassandra is to get up and running, it took us only three months from choosing Cassandra to going live into production,” said Cundiff.  “We were amazed at how fast we were able to go live with such a critical technology.”

And, with great success comes plans for expansion as Target continues to grow its current setup of 14 nodes across 2 data centers that supports its current APIs.  “With Apache Cassandra as our backbone, we have the confidence to deliver amazing features very quickly, and that’s what will continue to separate our customer experience from the rest of the pack.”


Watch Video: Target — Pioneering NoSQL in a Big Data Enterprise


Read Use Case: Target — Pioneering NoSQL in a Big Data Enterprise


Devin Ben-Hur, Senior Architect & Paul Kohan, Software Engineer at Whitepages
"There are still some challenges, but we think Cassandra and Titan together are tremendous; enabling technology for us to rapidly grow our data set and increase the velocity with which we use and move about data and publish data."
Devin Ben-Hur, Senior Architect & Paul Kohan, Software Engineer at Whitepages

Whitepages is the largest and most authoritative source of contact information about people and businesses in the United States. We have current plans to rapidly expand globally as well. 

We have a free website that gets about 50-60 million new unique visitors a month. A number of pretty popular mobile applications that do caller ID and search and we have a B2B offering called Whitepages Pro where we sell data directly to other companies. 

We have information in our data set of about 300 million people in North America and pretty much every landline. Increasingly mobile is hard to cover so that’s a big challenge for us, but we are acquiring numbers at about 50-60 million a year and trying to ramp that up by an order of magnitude this year.

Database evolution

This data set is driven by every search that’s done at Whitepages and often involves the retrieval of a hundred or a few hundred business objects just to return one search page. We are currently using a Postgres instance to hold most of our materialized people data. Scaling that has been expensive. It’s  just a big Postgres box with a lot of memory on it.

We actually put the entire database in-memory. It’s really just expensive big boxes which is a big reason why we looked at Titan and Cassandra going forward. As your hardware expands quickly Postgres just wasn’t going to cut it anymore.

PostgreSQL to Cassandra

We’ll still have Postgres around at Whitepages for use cases where scaling isn’t as important.  For our actual people data, what you think of as Whitepages data, that’s all going into Titan at this point and, in some ways, Solr, too. 

Historically the way we have built our data has been through a large batch based process where we get a digest of huge feeds of data from other people on a monthly basis, sometimes quarterly basis. Somebody has to go cook that through a bunch of large MapReduce jobs and basically publish a new view of the world. 

We’ve had a desire to migrate because we increasingly have a lot of touch points where we’re learning new information about the world on a continuous basis and gathering those events. We’ve been in the process of building a continuous reasoning pipeline around that data with the desire that if I observe a new fact about the world, like this name and this phone number and this address are associated in a certain way, I can then calculate the confidence that that is something we deem as true and if it’s above a certain threshold, publish it to our search engine.

This would let us be able to do that in essentially real-time, or at least with very low latency when we learn real information and it becomes searchable on the website. To do that, we have to be able to actually scale writes to that main store. Postgres was this large in-memory database and it would be even more expensive to scale that out on a write basis. There are all sorts of ways to do it but a lot of those ways involve building surrogate systems that are embedded in some inherently distributive storage, such as Cassandra.

The other motivation was just data modeling and data representation; that’s where Titan comes in. We had a lot of problems trying to be agile in the type of data that we return – how many people, how they link to locations and phones.

We’ve been saying for a long time that our data is inherently graph-like so we finally put our money where our mouths were and decided to actually invest in making a graph database work.  It’s really been helpful in designing the shapes of data we return to our customers. We can iterate much faster now.


We had a goal to have very low latency so this cluster was sized with enough memory across the cluster to keep everything cached and in-memory.  We have on the order of 20+ decent sized, SSD boxes, all in-memory with every instance running both Cassandra and Rexster, a HTTP end-point for Titan.

We evenly distribute queries across the entire cluster. We actually do use some token awareness on it so we aren’t just randomly going everywhere. We are already getting near capacity on those servers. We might need to scale up soon. But that’s just Big Data.

To the future

There are still some challenges, but we think Cassandra and Titan together are tremendous; enabling technology for us to rapidly grow our data set and increase the velocity with which we use and move about data and publish data.

Multi-data center is in the roadmap for the future, part of that desire is from our global expansion. Giving the end user a better experience by letting us move data closer to them.  In addition, we would have the resiliency of not being subject to some kind of outage that affects your data center or region. So whether that happens this year, or next year, who knows, but it will happen.


Cassandra Migration To EC2

March 13, 2015


In January we migrated our entire infrastructure from dedicated servers in Germany to EC2 in the US. The migration included a wide variety of components, web workers, background task workers, RabbitMQ, Postgresql, Redis, Memcached and our Cassandra cluster. Our main requirement was to execute this migration without downtime.

This article covers the migration of our Cassandra cluster. If you’ve never run a Cassandra migration before, you’ll be surprised to see how easy this is. We were able to migrate Cassandra with zero downtime using its awesome multi-data center support. Cassandra allows you to distribute your data in such a way that a complete set of data is guaranteed to be placed on every logical group of nodes (eg. nodes that are on the same data-center, rack, or EC2 regions…). This feature is a perfect fit for migrating data from one data-center to another. Let’s start by introducing the basics of a Cassandra multi-datacenter deployment.

Cassandra, Snitches And Replication Strategies

Going Multi Datacenter

In most scenarios, Cassandra comes configured with data-center awareness turned off. Cassandra multi-datacenter deployments require an appropriate configuration of the data replication strategy (per keyspace) and snitch. By default Cassandra uses the SimpleSnitch and Simple replication strategy. The following sections explain what these terms actually mean.

The Snitch

Snitches are used by Cassandra to determine the topology of the network. The snitch also makes Cassandra aware of the datacenter and the rack it’s in. The default snitch (SimpleSnitch) gives no information about the data center and the rack where Cassandra resides. This snitch only works for single data center deployments. The EC2MultiRegionSnitch is required for deployments on EC2 that spans among different regions. This snitch maps the region name as datacenter and the availability zone as the name of the rack. On top of this, it also make sure that nodes will use private and public IPs correctly.

Replication Strategies

Replication strategies determine how data is replicated across the nodes. The default replication strategy is the SimpleStrategy. Using the SimpleStrategy, data is replicated across all nodes in the cluster. This doesn’t work for multi data center deployments due to latency constraints. For multi dc deployments the right strategy to use is the NetworkTopologyStrategy. This strategy allows you to specify how many replicas you want in each data center and rack. For example:

defines a replication policy where the data is replicated on 3 nodes for the datacenter ‘dc1’ and on 2 nodes for the datacenter ‘dc2’.

Changing Snitch And Replication Strategy

As you can see, the snitch and replication strategy work closely together. The snitch is used to group nodes per datacenter and rack. The replication strategy defines how data should be replicated amongst these groups. If you run with the default snitch and strategy, you will need to change these two settings in order to get to a functional multi datacenter deployment.

Changing the snitch alters the topology of your Cassandra network. After making this change you need to run a full repair. If you do not fully understand the consequences of changing these two settings, you should really take some time to read more about it.Making the wrong changes can lead to serious issues. In our case, we started a clone of our production Cassandra cluster from snapshots and tested every single step to make sure we got the whole procedure right.

The 10 Migration Steps

The following list describes our migration in detail. You should make sure you understand these steps before trying to run your own migration. The steps below assume that you’re moving from a non EC2 data center to EC2. If you understand the steps you can use a similar approach for other scenarios.

Phase 1 – Cassandra Multi DC Support

Step 1 – Configure the PropertyFileSnitch

The first step is to change from a SimpleSnitch to a PropertyFileSnitch. The PropertyFileSnitch reads a property file to determine which data center and rack it’s in.

After changing this setting you should run a Cassandra rolling restart. Make sure that you have the same property file on every node of the cluster. Here is an example of how a property file for two datacenters looks like:

Step 2 – Update the replication strategy

Next update your keyspaces to use a NetworkTopologyStrategy

Step 3 – Client connection setting

Update your clients’ connection policy to DCAwareRoundRobinPolicy and set the local data center to ‘DC1’. This ensures your client will only read/write from the local data center and not from the EC2 cluster we’re going to create in the next step.

These three steps don’t have an impact on how replicas are placed in DC1 or how your clients connect to your cluster. The purpose of these three steps is to make sure we can add the second Cassandra data center safely.

Phase 2 – Setup Cassandra On EC2

Step 4 – Start the nodes

The next step is to start your cluster on EC2. Datastax provides a great AMI with easy instructions to get up and running quickly.

Step 5 – Stop the EC2 nodes and cleanup

By default your Cassandra instances will be configured as a new cluster. As we want to join an existing cluster we have to stop Cassandra and drop the data dirs from your new Cassandra nodes on EC2.

After that you have to adjust the following 4 Cassandra settings on EC2

  1. broadcast_address: <public_ip>
  2. listen_address: <private_ip>
  3. endpoint_snitch: Ec2MultiRegionSnitch
  4. auto_bootstrap: false

As you can see we are using a different snitch on the EC2 cluster than on the previous cluster. Ec2MultiRegionSnitch is a special snitch that infers the data center from EC2 region and the rack name from the availability zone (eg. a node running on us-east 1c would have datacenter: us-east and rack 1c).

Step 6 – Start the nodes

Start your Cassandra nodes on EC2 and wait for all nodes to be shown in the nodetool status.

Step 7 – Place data replicas in the cluster on EC2

Update your keyspace to replicate to the new datacenter

then run nodetool rebuild on every EC2 node with the name of the original DC as parameter

Depending on the amount of data in your keyspace, this will take minutes or hours. Once this is done, you should promote at least one node in the EC2 cluster to a seed node.

Phase 3 – Decommission The Old DC And Cleanup

Step 8 – Decommission the seed node(s)

Remove the IPs of nodes in the seed list that are in DC1 and run a rolling restart.

Step 9 – Update your client settings

Update your clients on EC2 to connect to the new data center. Update your clients’ connection policy to DCAwareRoundRobinPolicy and set the local data center to us-east. This makes sure your client will only read/write from the EC2 data center.

Step 10 – Decommission the old data center

Now you can decommission the DC1 datacenter following the steps from Datastax documentation.

Make sure that you update your keyspace to stop replicating to the old datacenter after you run the full repair.

This 10 step procedure seems long. But if you understand the basic concepts, it’s pretty straightforward. The best part is that your Cassandra cluster stays up and fast during entire procedure.

Alternative Migration Paths

1.) Dump And Load The Data

One alternative is to simply dump the data and load it up in the new Cassandra cluster. Unfortunately restoring node snapshots of a distributed database is not as simple as it would be with a traditional database. Another downside of this approach is that the cluster becomes temporarily unavailable. Stream already had production users, so we could not afford to take down the app.

2.) Rotate The Nodes

Another approach is to simply add new nodes on EC2 and then decommission the old nodes. Adding and decommissioning nodes is a very simple operation. The problem with this option is that there often is a high latency between data centers. Running your migration by adding and removing nodes will increase the read and write latency. Depending on your requirements this may or may not be a viable alternative.


Migrating a Cassandra cluster across data centers takes a bit of time, but it works amazingly well. It’s possible to move from one datacenter to another without any downtime.

About Stream

Stream is a web service for building scalable newsfeeds and activity streams. Be sure to check out our tutorial, it explains our API in a few clicks.

Cassandra Migration To EC2” was originally posted on High Scalability; to read more postings, visit the High Scalability Blog.

Accessing Cassandra in a Reactive Way

March 13, 2015


Gregor Roth Software Architect at United Internet
Gregor is a software architect at United Internet, a leading European internet specialist. He has implemented Cassandra to scale essential parts of their mail systems WEB.DE, GMX and 1&1. Gregor spent many years working on high scalable and resilient web-based architectures. He is a main developer of Troilus.

Stefan Podkowinski Software Architect & Cassandra Developer at United Internet
Stefan is a software architect and Cassandra developer at United Internet, a leading European internet specialist. After working with several NoSQL datastores in the past, he now enjoys building distributed systems on top of Cassandra and is also interested in topics such as functional reactive programming and event based architectures.

Reactive is a programming paradigm with the focus to describe how components in your system should react to get data from A to B. Reactive is about modeling flows of data and describing how data is propagating through your system. Instead having functions interactively call each other with some values, we use functions in a reactive way to transform, combine or aggregate any objects in our data flow. The program will become a composition of function-oriented code that reacts to data, asynchronously.

Having smaller, side-effect free execution units (reactive functions) with the possibility to execute code asynchronously allows us to parallelize actual data flow execution. Such applications tend to be highly concurrent and less prone to issues locking and thread synchronization. Execution details on concurrency and thread use will move out of your business logic into the execution context of the used framework.

In each case we must use an event based approach to make this happen, as data flows need objects to react upon. The classic way to write event oriented code is to use callback functions.

Synchronous and Asynchronous Programming Using the Datastax Java Driver

The asynchronous callback style is also supported by the DataStax’s Java Driver for Apache Cassandra. The driver supports both synchronous and asynchronous query execution since day one. Depending on the situation, developers can pick a suitable execution model. A plain execute(...) call will block until the query result is available. Afterwards the result can be iterated and consumed as needed. This is probably the most common way to deal with database results. However, asynchronous query execution can provide significant scalability benefits and is a requirement for reactive applications. Asynchronous query execution is a non-blocking operation that will return a ListenableFuture n our example instead of the actual result. ListenableFutureinstances accept classic completion callbacks which will be executed when the Future’s computation is complete.

Unfortunately, the classic callback approach tends to become obscure and hard to follow as soon you start to nest callbacks within each other. For instance, the JAX-RS based REST-service of listing 1 below requests a hotel database to get the assigned picture url of the hotel. If the database response is received, the picture url will be used to get the binary picture data over HTTP. To perform the HTTP request Jersey is used. Jersey is the JAX-RS reference implementation. The received binary picture data will be resized and returned to the service caller.

Listing 1. callback example


Java8 CompletableFutures

Fortunately, Java 8 introduces new language features which makes asynchronous, non-blocking programming in Java much easier and helps to escape from the callback hell mentioned above. The lambda expression enables you to treat functionality as a method argument or code as data. Another exciting feature of Java 8 is the new CompletableFuture interface. CompletableFuture makes use of the new lambda expressions by allowing you to register lambda expressions and to chain higher-order functions. Similar to the callback approach the lambda expression will be executed by completing an operation. Unlike the classic callback approach the code remains much more readable.

For instance the executeAsync() method of listing 2 returns such aCompletableFuture instance. Please consider that the code is based on Troilus, a high level Cassandra Java client on the top of the DataStax Java Driver. Troilus has been developed specifically with Java 8 and lambda expressions in mind. Each of the fluent methods of the listing below such as readWithKey(...)executeAsync() or thenAccept(…) is non-blocking, which means that the method call returns immediately. Subsequently the outer printHotelname(..) method will return without waiting for the database response. If the database response is received, the lambda expression of the thenAccept(...) method will be processed in an asynchronous way.

Listing 2. reactive database call

The functionality of listing 3 below is equivalent to listing 1. Listing 3 makes use of Java 8 aware libraries. For instance the HTTP request is executed by using Jersey’s Reactive Client API for CompletableFuture. The result of the HTTP query is aCompletableFuture. Furthermore the convenience method ResultConsumer.writeTo(…) is used to write the HTTP result.

Listing 3. asynchronous, non-blocking REST service example


Reactive Streams

As can be seen in the example above, using CompletableFutures allows us to work with smaller units of codes or simple expressions that can be executed asynchronously. CompletableFutures are a great match to define and combine functions that compute an individual result in an asynchronous way. But what if we want to process a sequence of data? How can we handle unbound streams of data? Instead of having two states “in-progress” and “done” we need to come up with some new semantics that allow us to pass individual elements to a consumer instead of just the complete result. We still need a way to signal an error or the absence of any further data just as we do with CompletableFutures. We might also need to be able to indicate back pressure signals, in case the consumer can’t keep up asynchronously processing the data.

Obviously this is not a new, particularly complex problem. We can iterate through result sets from databases since decades, although not asynchronously is most cases. But since this is such a common problem, an initiative has been formed to create a formalized standard for such an protocol under the name reactive streams. This standard is now supported by a couple of libraries including Troilus.

As already mentioned, reactive applications are event driven. In case of reactive streams, events will be initially emitted by a producer that will feed objects to your data flow. A producer can also act as an adapter to other parts of the system. E.g. you can have a producer to feed HTTP requests into your flow or read from a message queue. The same applies for consumers which will sit at the end of your flow. However, all streams between producers and consumers will be agnostic of the actual source of events or where the output is eventually supposed to go. This makes it very easy to combine and reuse streams in your application. With Troilus we provide a producer representing a result set. The idea is to be able to execute a Cassandra query and use the result as source for reactive streams.

The readSequenceWithKey(...) query of Listing 4 returns a sequence handle which implements the reactive streams Publisher interface. Remember that reactive streams only define an API and a corresponding behavior. We picked RxJava for our example as the actual implementation on top of it.

Listing 4. reactive streams example

With RxJava and our Troilus publisher in place, we can go on and build a first data flow based on observables. RxJava also comes with a set of existing operators that can be applied to transform and aggregate data. In our example we’re going to use thegroupBy(...) operation to split up the data flow by a hotelId followed by a reduce(...) to create an average of all user ratings for each hotel.

The average rating stream of the example above is consumed by a console-based subscriber which prints the elements. In contrast to streams in Java 8, the reactive streams Subscriber interface supports back pressure as mentioned above. By calling the subscription’s request(...) method, the consumer signals the producer the amount of elements it is willing to consume. In some cases it can be more efficient to read a larger number of result. In this example, the console subscriber requests individual elements one by one.

Listing 5: simple console-based stream consumer


In Conclusion

As can be seen from the examples above, reactive applications can be implemented in different ways. In essence, reactive applications should be oriented around data flows and events. Troilus shows how to create such applications using different solutions that have evolved around this topic during the last years. It’s up to the developer to pick the best option while making this decision on case by case basis.

Java 8 and lambdas have been a big milestone with the potential to significantly change how we design applications based on Java. Other languages such as Scala or Clojure have been using functional and reactive programming paradigms successfully on the JVM as well. You’ll also find some of the discussed ideas in emerging big data platforms such as spark or even efforts to create new network protocols based on them.

The example code is available under reactivecassandra.

Delta Architectures: Unifying the Lambda Architecture and leveraging Storm from Hadoop/REST

March 11, 2015


Brian O’Neill, Chief Technology Officer at Health Market Science
Brian is Chief Technology Officer at Health Market Science (HMS) where he heads development of their data management and analytics platform, powered by Storm and Cassandra.  Brian won InfoWorld’s Technology Leadership award in 2013 and authored, Storm: Blueprints for Realtime Computation (due out 12/2013).  He has a number of patents and holds a B.S. in C.S. from Brown University.

Recently, I’ve been asked by a bunch of people to go into more detail on the Druid/Storm integration that I wrote for our book: Storm Blueprints for Distributed Real-time Computation.  Druid is great. Storm is great. And the two together appear to solve the real-time dimensional query/aggregations problem.

In fact, it looks like people are taking it mainstream, calling it the RAD Stack, and adding the “Lambda Architecture” label. Honestly though, there may be a better way.  Lamda Architectures make the following supposition that has always bothered me.

From Nathan’s article on Lambda Architectures:

Computing arbitrary functions on an arbitrary dataset in real time is a daunting problem. There is no single tool that provides a complete solution. Instead, you have to use a variety of tools and techniques to build a complete Big Data system.

The lambda architecture solves the problem of computing arbitrary functions on arbitrary data in real time by decomposing the problem into three layers: the batch layer, the serving layer, and the speed layer.

This advice has lead most people to deploy separate infrastructure/frameworks for batch, speed/processing, and query, which is good because it allows you to “use the right tool for each job”.  And that has lead to things like the “RAD Stack”.   People select a technology for each layer.  (e.g. Speed = Storm, Batch = Hadoop, and Serving = Impala)

But if you have lived in environments like these, they require an awful lot of resources because there is very little re-use across the systems.  More and more, I believe people are starting to question the distinction between layers. Others are proposing a Unified Lambda Architecture.

And lately, I’ve found myself in the camp of the unificationists…

At HMS, we’ve been iterating on our Lambda architecture for a few years now.  We have Storm, Hadoop and a real-time Web Services layer.  Each of these functions as a data ingestion mechanism.

They all process the same kinds of data, and only differ by interface, capacity and client-side expectations:

  • Transactional Processing:
    • Our transactional processing is our web services layer.  (We still use and love dropwizard)  In this scenario, the client expects that the data is ingested and integrated into analytics within a very short time period (e.g. seconds).  Capacity must always match or exceed demand, or else the system is considered “down”.


  • Stream/Queue-based Processing
    • Often, we find ourselves leaning more on our transactional processing capabilities.  More and more clients are demanding real-time integrations, which means web services API integrations.  If that is the case, and transactions are continuous, then there is only a small distinction between stream processing and “transactional processing”.  However, the distinction is important.  First, with our “stream processing” we introduce a queue.  With the queue in place, capacity need not always exceed demand.  The queue can capture over-run, and the system will work it off later.  Clients tolerate a delay (e.g. minutes) in data/analytics availability, but the system is more tolerant of downstream architectural issues with availability.  Since data is logged to a queue, the system tolerates disruptions in persistence layer(s).


  • Batch Processing
    • For batch processing, client expectations are lowered even further.  It is often hours until the client expects their data to be available.  Additionally with batch, there is a functional difference.  There is an end.  With streams and transactional input, it is an infinite set of data.  However, for batch, we often want to know the status of processing a set of data.  If you try to use Stream processing for batch interactions, then you need to build in the concept of a “set”, and somehow add the ability to track status.  (initially, to avoid having to maintain separate infrastructure for Hadoop, we did this on top of storm… painfully)

Like many others, we found ourselves needing to support all of these paradigms.  Quite literally, we were rewriting code across the different frameworks/systems, which caused major pain when those implementations differed (even slightly).  Numbers didn’ line up, etc.

We were forced to come up with a solution, and collapse the systems a bit.

We looked at DRPC with Storm, and considered calling Storm from our web services tier, but DRPC seemed clunky and under supported.  Also, it seemed unwise to call DRPC from Hadoop.  (has anyone tried this?)
Instead, we decided to lock in on an abstraction for persistence.  We looked around,at ORM’s and DAO patterns, but most did not support the concept of micro-batching, which is an abstraction we wanted the option to leverage across the different processing mechanisms.  In the end, we decided to leverage the Storm/Trident State abstraction as a universal mechanism for persistence.  We built out storm-cassandra-cql, and embedded it in our web services and in Hadoop.

From both Hadoop and our web services, we instantiate our own Tuples, which implement the Storm Tuple interface.  From there, we can use the State abstraction, and re-use Mappers, to ensure a consistent data model across all three processing paradigms.

From Hadoop, as a shortcut, we used the State object directly from the reduce phase, setting the output format to NullOutputFormat.  Ideally, we probably should have implemented a new OutputFormat that was StormCassandraCqlFormat or something, but I’m not sure that would have bought us much.

For the web services, the immediate integration was straight-forward.  Convert the JSON to a Tuple, call update() on the StateUpdater, then call commit() on the State object.  But we also wanted to be able to batch, and perform dimensional aggregations prior to committing to “deep storage”. This introduced a problem, we would have data that was acknowledge (200 response code), but not yet persisted.  Not good.  In the event of a node failure, we would lose data.  Really not good.

So, what was the solution?  We could have integrated Druid, but instead we decided to keep it lightweight, and… leverage Storm as our safety net!

Consider the following “traditional” interpretation of the Lambda Architecture:

In this traditional approach, the batch layer (Hadoop) is often used to “correct” errors in processing introduced in the speed layer (Storm).  Hadoop is the safety net, correcting numbers (typically via overnight batch jobs)  We decided to flip that model, and use Storm as our safety net, with this approach:

In this case, we use the embedded State object to aggregate data across a batch, but we also write to a Kafka queue for persistence before we acknowledge the HTTP request.  The sequence diagram looks like this:

We persist the event to a queue, update the Trident State object, and *then* return a 200. Then, periodically, we flush the State to storage. (Cassandra in this case)  It is okay if we drop a node, because Storm will eventually (re)process the event and (re)incorporate the data if need be.  (and here is where I’m going to gloss over some really important details — to be addressed in my next post)

The point being… we’ve begun to collapse our layers, starting with persistence.  We are re-using the Trident State abstraction from both Hadoop and Web Services, and we’ve moved Storm into a “re-processing / safety net” layer, which was previously filled by Hadoop/Batch processing.

For lack of a better term, we’ve been calling this a Delta Architecture because the whole system is focused on incremental updates to state, made from any and all processing paradigms.

Hopefully, this gets people thinking.  In my next post, I’ll explain how you can use the same architecture to deliver dimensional aggregations (like Druid), without incorporating Druid directly.

We also have open questions –
Can we execute an embedded topology!?
Does it make sense to do so?

For more detail, have a look at the presentation I did at the Storm NYC meetup, Data Pipelines and Improving on the Lambda Architecture.

I fully appreciate that much of Lambda is a matter of perspective.  And FWIW — this is mine (currently — and subject to change =).   And thanks to Nathan for articulating the concept of a Lambda architecture, materializing the “Big Data” view has given people a common vernacular with which to discuss solutions to some really hard problems.

Delta Architectures: Unifying the Lambda Architecture and leveraging Storm from Hadoop/REST” was originally posted by Brian O’Neill, CTO at Health Market Science. To view more postings by Brian O’Neill, please visit his blog here.
1 2 3 149