March 13th, 2015

Gregor Roth Software Architect at United Internet
Gregor is a software architect at United Internet, a leading European internet specialist. He has implemented Cassandra to scale essential parts of their mail systems WEB.DE, GMX and 1&1. Gregor spent many years working on high scalable and resilient web-based architectures. He is a main developer of Troilus.

Stefan Podkowinski Software Architect & Cassandra Developer at United Internet
Stefan is a software architect and Cassandra developer at United Internet, a leading European internet specialist. After working with several NoSQL datastores in the past, he now enjoys building distributed systems on top of Cassandra and is also interested in topics such as functional reactive programming and event based architectures.

Reactive is a programming paradigm with the focus to describe how components in your system should react to get data from A to B. Reactive is about modeling flows of data and describing how data is propagating through your system. Instead having functions interactively call each other with some values, we use functions in a reactive way to transform, combine or aggregate any objects in our data flow. The program will become a composition of function-oriented code that reacts to data, asynchronously.

Having smaller, side-effect free execution units (reactive functions) with the possibility to execute code asynchronously allows us to parallelize actual data flow execution. Such applications tend to be highly concurrent and less prone to issues locking and thread synchronization. Execution details on concurrency and thread use will move out of your business logic into the execution context of the used framework.

In each case we must use an event based approach to make this happen, as data flows need objects to react upon. The classic way to write event oriented code is to use callback functions.

Synchronous and Asynchronous Programming Using the Datastax Java Driver

The asynchronous callback style is also supported by the DataStax’s Java Driver for Apache Cassandra. The driver supports both synchronous and asynchronous query execution since day one. Depending on the situation, developers can pick a suitable execution model. A plain execute(...) call will block until the query result is available. Afterwards the result can be iterated and consumed as needed. This is probably the most common way to deal with database results. However, asynchronous query execution can provide significant scalability benefits and is a requirement for reactive applications. Asynchronous query execution is a non-blocking operation that will return a ListenableFuture n our example instead of the actual result. ListenableFutureinstances accept classic completion callbacks which will be executed when the Future’s computation is complete.

Unfortunately, the classic callback approach tends to become obscure and hard to follow as soon you start to nest callbacks within each other. For instance, the JAX-RS based REST-service of listing 1 below requests a hotel database to get the assigned picture url of the hotel. If the database response is received, the picture url will be used to get the binary picture data over HTTP. To perform the HTTP request Jersey is used. Jersey is the JAX-RS reference implementation. The received binary picture data will be resized and returned to the service caller.

Listing 1. callback example


Java8 CompletableFutures

Fortunately, Java 8 introduces new language features which makes asynchronous, non-blocking programming in Java much easier and helps to escape from the callback hell mentioned above. The lambda expression enables you to treat functionality as a method argument or code as data. Another exciting feature of Java 8 is the new CompletableFuture interface. CompletableFuture makes use of the new lambda expressions by allowing you to register lambda expressions and to chain higher-order functions. Similar to the callback approach the lambda expression will be executed by completing an operation. Unlike the classic callback approach the code remains much more readable.

For instance the executeAsync() method of listing 2 returns such aCompletableFuture instance. Please consider that the code is based on Troilus, a high level Cassandra Java client on the top of the DataStax Java Driver. Troilus has been developed specifically with Java 8 and lambda expressions in mind. Each of the fluent methods of the listing below such as readWithKey(...)executeAsync() or thenAccept(…) is non-blocking, which means that the method call returns immediately. Subsequently the outer printHotelname(..) method will return without waiting for the database response. If the database response is received, the lambda expression of the thenAccept(...) method will be processed in an asynchronous way.

Listing 2. reactive database call

The functionality of listing 3 below is equivalent to listing 1. Listing 3 makes use of Java 8 aware libraries. For instance the HTTP request is executed by using Jersey’s Reactive Client API for CompletableFuture. The result of the HTTP query is aCompletableFuture. Furthermore the convenience method ResultConsumer.writeTo(…) is used to write the HTTP result.

Listing 3. asynchronous, non-blocking REST service example


Reactive Streams

As can be seen in the example above, using CompletableFutures allows us to work with smaller units of codes or simple expressions that can be executed asynchronously. CompletableFutures are a great match to define and combine functions that compute an individual result in an asynchronous way. But what if we want to process a sequence of data? How can we handle unbound streams of data? Instead of having two states “in-progress” and “done” we need to come up with some new semantics that allow us to pass individual elements to a consumer instead of just the complete result. We still need a way to signal an error or the absence of any further data just as we do with CompletableFutures. We might also need to be able to indicate back pressure signals, in case the consumer can’t keep up asynchronously processing the data.

Obviously this is not a new, particularly complex problem. We can iterate through result sets from databases since decades, although not asynchronously is most cases. But since this is such a common problem, an initiative has been formed to create a formalized standard for such an protocol under the name reactive streams. This standard is now supported by a couple of libraries including Troilus.

As already mentioned, reactive applications are event driven. In case of reactive streams, events will be initially emitted by a producer that will feed objects to your data flow. A producer can also act as an adapter to other parts of the system. E.g. you can have a producer to feed HTTP requests into your flow or read from a message queue. The same applies for consumers which will sit at the end of your flow. However, all streams between producers and consumers will be agnostic of the actual source of events or where the output is eventually supposed to go. This makes it very easy to combine and reuse streams in your application. With Troilus we provide a producer representing a result set. The idea is to be able to execute a Cassandra query and use the result as source for reactive streams.

The readSequenceWithKey(...) query of Listing 4 returns a sequence handle which implements the reactive streams Publisher interface. Remember that reactive streams only define an API and a corresponding behavior. We picked RxJava for our example as the actual implementation on top of it.

Listing 4. reactive streams example

With RxJava and our Troilus publisher in place, we can go on and build a first data flow based on observables. RxJava also comes with a set of existing operators that can be applied to transform and aggregate data. In our example we’re going to use thegroupBy(...) operation to split up the data flow by a hotelId followed by a reduce(...) to create an average of all user ratings for each hotel.

The average rating stream of the example above is consumed by a console-based subscriber which prints the elements. In contrast to streams in Java 8, the reactive streams Subscriber interface supports back pressure as mentioned above. By calling the subscription’s request(...) method, the consumer signals the producer the amount of elements it is willing to consume. In some cases it can be more efficient to read a larger number of result. In this example, the console subscriber requests individual elements one by one.

Listing 5: simple console-based stream consumer


In Conclusion

As can be seen from the examples above, reactive applications can be implemented in different ways. In essence, reactive applications should be oriented around data flows and events. Troilus shows how to create such applications using different solutions that have evolved around this topic during the last years. It’s up to the developer to pick the best option while making this decision on case by case basis.

Java 8 and lambdas have been a big milestone with the potential to significantly change how we design applications based on Java. Other languages such as Scala or Clojure have been using functional and reactive programming paradigms successfully on the JVM as well. You’ll also find some of the discussed ideas in emerging big data platforms such as spark or even efforts to create new network protocols based on them.

The example code is available under reactivecassandra.