February 10th, 2013

This blog posting was created by Ed Capriolo. Visit Ed’s blog here.

Ed will be speaking at NYC* Big Data Tech Day on March 20th, 2013. Be sure to register today for early bird pricing.


I have been away from the blog game for a bit. There are two reasons for this. The first, I am hard at work on the second edition of the High Performance Cassandra Cookbook. The second is I have embarked on a skunkworks project which I think is going to bring a few game changers to Apache Cassandra.

Nate ( https://github.com/zznate ), who I internally think of as the Hector godfather, and a few others were brainstorming on features we would want for Cassandra. I learned a great lesson about improv from Tine Fey. I apply these rules to brainstorming and even technical conversations now. Agree, use ‘and’ to take the idea to the next level, and there are no mistakes.

Nate had the idea pretty much from that get-go that he wanted to usevertx. The older pre-Tina Fey inspired me might have tossed this idea thinking async-io is just part of some trendy new NodeJS craze, but the new me decided to role with it. Nate called the project IntraVert and we all just started cracking on cool things we wanted Cassandra to do.

(An older line of thinking that I associated with Cassandra was, “That should be done on the client side”. This was an argument against co-processor or triggers, that it all could and should be done client side. This argument is valid, but now you may notice that CQL sets time stamps automatically on the server side, or that the Cassandra server can automatically satisfy a query using a secondary index, or that counters read before write, you can conclude this argument is not an absolute.)

My first long standing gripe that I wanted to tackle was what I consider the keyspace fallacy. In the thrift 0.6.X days keyspace was an argument in each thrift operation.


For example the signature of get used to look something like this:


  get(String keyspace, String columnFamily, byte [] rowkey, byte [] column)


But by Cassandra 0.7.X the signature changed to something like:


  set_keyspace(String s)

  get(String columnFamily, byte [] rowkey, byte [] column)


Setting the keyspace moved to a thread local. I will be the first to admit that the old signature was cumbersome. However I never liked the change, the StorageProxy (the underlying server side cassandra object) can do cross keyspace mutations, but thrift and CQL can not! (CQL has limited support for operations that cross keyspaces). This resulted in a non-optimal situation for me, many of our column families had different replication factors and were stored in different keyspaces, and now we needed either a keyspace aware connection pool or had to have two connection pools. But the thing that bothered me most was wasted RPC operations.

Calling set_keyspace required a network round trip, and there are many cases where you want to batch all types of operations together in one RPC call. This was the biggest thing I set out to conquer. I wanted a format where I could batch together anything into one RPC call, not just writes to a single keyspace, writes and reads, meta operations, custom operations, etc. Today, as I was hacking on IntraVert, after I wrote a unit test , I took a look:


                List<Map> batch = new ArrayList<Map>();

Map row = new HashMap();





IntraReq req = new IntraReq();

req.add( Operations.setAutotimestampOp() )

.add( Operations.createKsOp(“ks1”, 1) )

.add( Operations.setKeyspaceOp(“ks1”) )

.add( Operations.createCfOp(“cf1”) )

.add( Operations.createKsOp(“ks2”, 1) )

.add( Operations.setKeyspaceOp(“ks2”) )

.add( Operations.createCfOp(“cf2”) )

.add( Operations.batchSetOp(batch).set(“timeout”, 10000) )

.add( Operations.assumeOp(“ks1”, “cf1”, “value”, “UTF-8”) )

.add( Operations.assumeOp(“ks2”, “cf2”, “value”, “UTF-8”) )

.add( Operations.getOp(“mykey”, “mycol”)

.set(“keyspace”, “ks1”)

.set(“columnfamily”, “cf1”));


I really love how this project is shaping up. First, API has builder pattern, awesome. Second, batching multiple operations into one request, nice. Third, setting timeouts per operation, check. Forth, types are easy, no byte buffer madness. Fifth, that above exchange might have been 10 network round trips/ request responses, now it is one.

Passing mention: IntraVert uses JSON over HTTP so end users do not have to worry about thrift or other native libraries. Users can read and write data to cassandra with basic HTTP libraries, or even work with Cassandra from JavaScript.

Even though I mentioned them first, the client API and the transport are not the most innovative parts of IntraVert. The ability to do processing server side is.  IntraVert does this by allowing JVM languages like groovy (support for scala/clojure is coming) to upload functions to be compiled on the server. These functions can then be leveraged as part of other operations.


For example, suppose a column has JSON data in it. The user wishes to run a JSONPath query on the server side and only return the results to the client. Users can dynamically create a class that applies the transformation and apply this to a Cassandra slice.


    req.add( Operations.sliceOp(“jsonkey”, “a”, “z”, 100));//6

    req.add( Operations.createProcessorOp(“JsonPathEx”, “groovy”, 

            “import com.jayway.jsonpath.*; \n” +

            “public class JsonPathEx implements org.usergrid.vx.experimental.Processor { \n”+

            ”  public List<Map> process(List<Map> input){” +

            ”    List<Map> results = new ArrayList<HashMap>();”+

            ”    for (Map row: input){” +

            ”      Map newRow = new HashMap(); “+

            // grovvy requires you to escape $

            ”      Integer match = JsonPath.read(row.get(\”value\”).toString(), \”\\$.[1].value\”); \n”+

            ”      newRow.put(\”value\”,match.toString()); \n “+

            ”      results.add(newRow); \n”+

            ”    } \n” +

            ”    return results;”+

            ”  }”+



    req.add( Operations.processOp(“JsonPathEx”, Collections.EMPTY_MAP, 6));//8 


There are multiple types of server side processors in IntraVert. They can be used on both the insert side and the read side to transform data. This allows the user to do things not possible before. For example if the user wishes to do a server side union of two slice results or two CQL queries, this can be done (in a single client server exchange like the other examples). Processors can also be used to filter results like a where clause, or possibly return only counter columns that have a value greater then X. Processors also have the ability to implement procedural logic on the server side. (Think PL-SQL meets Cassandra)

IntraVert is still in the brainstorming phase, (but we want to have an official 1.0 release soon). You can check it out on githubhttps://github.com/zznate/intravert-ug and have fun hacking at it. We have a good deal of documentation to get you started. Coming soon hbase style scanners!