November 25th, 2013

By 

 

Streaming Map/Reduce on Wall Street blog posting was created by Colin Clark. To view more postings by Colin, check out his blog here.

 

What I Built at NYSE Technologies

I’ve been building event processing systems now for over 20 years and am very familiar with the issues that companies run into deploying them.  Over the last decade, I’ve focused on Complex Event Processing (CEP), bringing one of the first CEP engines to market in 2005 and subsequently working with many of the top vendors and products in the space.

 

One of the problems of most of these products back in 2009 when I got started with Darkstar was that CEP engines just didn’t scale.  I wanted to get past that limitation – I wanted something that could sift through the entire Internet in real time.  My specific use case was the Consolidated Audit Trail for the SEC – something they could use to look for things in real time, throughout the day.

I wanted to build a system that could handle millions of events/messages per second, scan those data streams for patterns, persist the data, and make that data *immediately* available for subsequent query.  There was nothing like this on the market at that time.  Most people said this couldn’t be done.  What I need was a business partner who’d believe in me and my team.

 

NYSE Technologies wanted to start offering a post-trade surveillance system but didn’t want to stand up a separate set of machinery/systems for each client.  They wanted to uncouple revenue growth from expense – seemed like a perfect opportunity to build a clustered solution that only required hardware to scale.  The problem with many systems is that in order to scale, they have to be re-architected.  I wanted to avoid that.

 

The answer came in the form of mixing CEP and Hadoop.  We needed a way to distribute queries to the cluster and then reassemble the results – seemed like a perfect use case for map/reduce.

 

We got the system up and running in 2010, Darkstar did deploy to production in Mahwah, and we did have customers, but NYSE Technologies pruned the offering just slightly in advance of the ICE acquisition.

One change in the diagrams below should be noted – we no longer user MySQL as the repository.  We put that into a distributed column family within its own keyspace on Cassandra.

 

 

There are a couple of things that probably aren’t covered in this document as well as they could be – we ran Darkstar clusters with a replication factor of 3.  That means that every inbound message was processed (not just stored) three times and the result of that computation was emitted.  There were no voting protocols in the system – we had to run at full speed in HA and DR.  I am very proud of that particular accomplishment – we had redundant compute in a shared nothing, clustered deployment!

Overview

DarkStar™ is a multi-purpose highly distributed processing platform designed by Cloud Event Processing.  DarkStar™’s architecture is based on Cloud principles, meaning that it is designed to be run in a highly distributed environment, with no practical limit on the number of nodes that the system may be comprised of.  DarkStar™ is not an application itself, it is a platform which provides the ability to run custom applications on top of its architecture without having to have those custom applications know anything about the distributed nature of the underlying architecture.  DarkStar™ provides an underlying Event Processing Engine which dynamically accepts queries and statements injected by DarkStar™ applications and produces new Events based on a combination of the events entering the system and the queries injected into the system.

 

DarkStar™ may be thought of as the infrastructure underlying custom applications designed to run on it.  DarkStar™ is designed to be run on multiple physical and/or virtual nodes, and as each node joins the cluster DarkStar™ uses a messaging protocol to communicate the entrance of the new node to the rest of the cluster and load balancing occurs automatically based on the token the new node communicates to the rest of the cluster.  This approach allows DarkStar™ the flexibility to dynamically act and react to the flow of events entering the system.  The workload of the Event Processing Engine is then divided among the nodes in the cluster, giving DarkStar™ effectively unlimited scalability.

 

DarkStar™ provides applications which run on it the facility to inject queries dynamically into the underlying CEP engine using RabbitMQ as the protocol for transmission of these queries and the result sets that need to be sent back to the querying application.

DarkStar™ also makes use of several third party applications and libraries, specifically as follows:

  • Cassandra – Cassandra is a distributed database which is designed to run in a clustered environment, much like DarkStar™ itself.  Every node in the Cassandra cluster is aware of the other nodes of which the cluster is comprised, making it extremely fault tolerant.  DarkStar™ uses Cassandra as a repository for persisting messages sent into the cluster.  Messages received by the DarkStar™ cluster are written to Cassandra and may be viewed at a later time.

  • RabbitMQ – RabbitMQ is an open source message broker software which uses the AMQP standard. The RabbitMQ server is written in Erlang and is built on the Open Telecom Platform framework for clustering and failover.  DarkStar™ uses the RabbitMQ message broker software for communication between DarkStar™ and any DarkStar™ applications.

  • MySQL – DarkStar™ uses MySQL to store metadata about the messages the system will be receiving.  DarkStar™ uses this information to create various message streams which can be inspected by supported applications through injecting queries into DarkStar™.  Every message stream is defined by this metadata allowing DarkStar™ the flexibility to support many different message types or new message types simply by altering the data stored in MySql before initializing DarkStar™.

The relationship of these components to each other is represented visually in the diagram below,

 

Figure 1: DarkStar™ Components

 

Architecture

Note in Figure 1 that the input from the DarkStar™ OnRamp is sent to the DarkStar™ cluster, and not to a specific DarkStar™ node.  The DarkStar™ client api allows client applications to send incoming messages to the cluster as a whole.  Since each node in the cluster is aware of the other nodes this allows for custom partitioning of input without the client application having the burden of having to know the network details of all of the nodes in the DarkStar™ cluster.  DarkStar™ spreads load among n nodes, where n = the total number of nodes in the cluster.  Incoming events are partitioned to a particular node based on the tokens specified for each node at startup.  For example, in a 6 node cluster which partitions incoming messages alphabetically node 1 might handle incoming messages which begin with A – D, node 2 might handle incoming messages which begin with E – H, node 3 might handle incoming messages which begin with I – M, node 4 might handle incoming messages which begin with N – Q, node 5 might handle incoming messages which begin with R – U, and node 6 might handle incoming messages which begin with V – Z.

 

 

In any given DarkStar™ cluster all nodes are in constant communication with each other and each node is aware of the range handled by every other node.  Because of this if any one node fails the cluster is aware of this failure and partitioning will automatically be rebalanced among the remaining nodes in the cluster.

OnRamps

In order for DarkStar™ to process massive amounts of data it needs a way of acquiring that data.  Data enters the cluster in one of two ways:

  1. DarkStar™ applications

  2. DarkStar™ OnRamps

OnRamps are DarkStar™’s way of acquiring data from outside sources.  They acquire data from a given source, such as a socket connection or a database, parse that data for information and inject the data into the DarkStar™ cluster.  OnRamps may be thought of as protocol-specific adapters.  For example a FIX connection requires a different OnRamp than a connection to a Twitter feed or an Oracle database.

All OnRamps are similar in the way they inject data into the cluster.  When an OnRamp receives a message it iterates over the fields of that message and adds each field to a map composed of field names as keys as field values as the map values.  Once all fields have been iterated over the map is converted to a JSON object and sent into the DarkStar™ cluster via the DarkStar™ Client api, an rpc-based protocol.  Once the Event is received by the DarkStar™ cluster it can be added to the streams of events being queried by client applications.

 

When DarkStar™ starts up it opens up an rpc connection on an i.p. address and port specified in the configuration file.  This connection is used to service requests from clients (e.g. OnRamps) using the DarkStar™ Client api.  Once an OnRamp connects to a DarkStar™ node using the Client api it can send event messages into the cluster in the form of JSON objects.  These event messages are then parsed by DarkStar™ for the field specifying the value to partition on and, based on the value in this field, the message is sent to the appropriate node.  Internal communication between nodes such as this occurs on a different TCP connection than communication with external clients.  Again, the port on which this communication takes place is specified in the DarkStar™ configuration file.

 

Once an event message has been received by the correct node the message is injected into an event stream specified in the message.   This allows the message to be processed by the Event Processing Engine.

 

 

Notice in Figure 2, below, how the DarkStar™ OnRamp sends messages in to the cluster, which then by means of a “Round Robin” partitioner varies the receiving node.  Each message is then passed on to the node which is handling that symbol range.  In the case of Message 3 the message is initially received by the node which handles the range Message 3 falls in to, so that it is not then forwarded to another node as Messages 1 and 2 are.

 

Figure 2: DarkStar™ Message Partitioning

 

Event Processing Engine

An Event Processing Engine is a piece of software that accepts discreet events and performs transactions on those events based on the queries that have been injected into the engine.  When DarkStar™ starts up it connects to a MySql database with parameters specified in the DarkStar™ configuration file.  From this database it then loads the definitions of every Event Stream in the system and it dynamically creates these Event Streams.  When new event types are added to the system it is only necessary to add the definitions of these event types to the MySql database DarkStar™ is using for configuration and these event types will automatically be added to the system.  As each event type is added to the system an Event Stream is added to the Event Processing Engine to contain these event types.  When an event message is received by DarkStar™ it will then add this message to the Event Stream specified in the message so that it can be compared with other events.

 

 

The queries which are injected into DarkStar™ by client applications are instructions for the Event Processing Engine written in Event Processing Language (EPL).  These instructions allow client applications to filter and mine data for specific patterns without having to write complex code to do it.  Queries allow clients to specify which Event Stream or Streams they are interested in, filtering out any unnecessary noise, as well as allowing client applications to perform operations on the events they are interested in, such as aggregation and event comparison.  Queries are broadcast to all nodes in the cluster using RabbitMQ for the transmission protocol.  In addition to handling new queries from client applications DarkStar™ also supports the notion of “Named Queries”, whereby a Query created at a given time may be referenced by other queries, allowing the stacking of queries for complex data mining.

 

Figure 3: Event Processing Engine

 

Data Persistence Layer

DarkStar™ persists all new events which enter the system provided that those events have a corresponding table defined in the Cassandra database.  On startup DarkStar™ attempts to get a connection to the Cassandra database specified in the DarkStar™ configuration file.  DarkStar™ maintains a background thread which will continue to attempt to connect to Cassandra if it cannot connect right away.  Once an event enters the cluster DarkStar™ then writes that event to the corresponding event table in Cassandra.  The DarkStar™ platform may be modified to use a different tool for persistence via the use of a persistence interface that uses a common agreement for writing data to different implementations if so desired.

DarkStar™ Applications

As previously noted DarkStar™ is a platform which supports the implementation of custom applications written to make use of the DarkStar™ api’s.  At the most basic level DarkStar™ applications are processes which create EPL statements either through user interaction or based on the application code, and then inject those statements into the DarkStar™ cluster.  Each application can then define a set of behaviors that should occur when the application receives a callback from DarkStar™ with one or more events that have been created as the result of the injected statement(s).

A single DarkStar™ application may generate and inject as many statements and/or queries as desired.  When a result is received from DarkStar™ by the application the application may then implement custom behavior defined within the application based on the results returned from DarkStar™.

DTOs

In order to support the Data Transfer of information from incoming messages to the appropriate destination(s) DarkStar™ supports the use of Data Transfer Objects (DTOs), or custom-built functions that can be called at message arrival time and are specified by the messages themselves.

DTOs consist of functions created in order to specify message arrival behavior on a message type basis.  The .class or .jar file(s) is generated and placed in a directory which is on the classpath of the DarkStar™ jvm process.

 

Incoming messages may specify the DTO to be called by populating a pre-defined field in the incoming message with the fully-qualified name of the DTO to be called.  When the message is received by DarkStar™ if that DTO has not yet been loaded DarkStar™ will attempt to load the appropriate class(es).  If there is no DTO of the specified name found by DarkStar™, or if the message does not specify a DTO the default DTO will be called for that message.  The behavior of the default DTO is to persist the incoming message to the Cassandra database, after which the event is injected into the DarkStar™ event engine.

Summary

DarkStar™ employs an innovative cloud-based approach to the processing and analysis of large sets of data.  The underlying architecture used to accomplish this task has been outlined above in some detail.  This architecture allows for nearly unlimited expansion and scalability, giving DarkStar™ the flexibility to be employed as the solution to nearly any problem set, making it a viable solution for any needs regarding processing and analyzing Big Data.