Ok.ru is one of the largest social networks for Russian-speaking audiences. It’s used across 220 countries and translated in multiple languages. We have 250 million registered users and roughly 80 million unique visits monthly. We started in 2006 and since that time we have been growing fast.
We started with cassandra in 2010. Now in 2014 we have Cassandra comprises much of our infrastructure and the data volume is growing fast there, e.g. in 2013 we stored about 80 terabytes of data in our largest cluster (it stores private messages of chats between users), and now in 2014 it stores about 130 terabytes.
We now have 27 clusters and about 500 Cassandra nodes in total. Total data stored in Cassandra is approximately 300 terabytes.
Most of these clusters are all our a branch of Cassandra, which I presented last year at Cassandra Summit Europe (Video & Slides). During the last year we were working hard on development of our new data storage solution, based on Cassandra 2.0. We were able to make a number of performance and stability improvements to 2.0 and even 2.1 codebases, and committed them back to the community, of course.
So now we’re excited to say that we have deployed it finally in to production, and we have now 2 new clusters with our new storage. But this storage is not just adoption of cassandra 2.0. The new storage is different:
Yes, it is mainly about supporting ACID (not lightweight) transactions, so you can open a transaction, read and modify data in one or more tables, and choose do you want to commit or rollback. This storage is primarily targeted for replacing our legacy SQL DBMS to manage data with high consistency requirements.
Historically we have a lot of Microsoft SQL and until this year we had more than 200 nodes running and managing about 50 terabytes of data.
Microsoft SQL Server was in our infrastructure from the very beginning. Of course it’s very expensive data storage solution at our scale: we have to pay for licensing costs and buy an expensive hardware to make it highly available. But, this expensive hardware and HA clusters, running on it, cannot save us from outage if one of our data centers fail – they only are good with failing a single database nodes when located close to shared storage with its data.
Another problem to address with Microsoft SQL is scaling out. Currently we use a simple in-house solution for sharding data among database servers. It does not support any automatic data re-sharding, so adding a new server to a cluster includes a lot of manual work and requires downtime.
We have a lot of business logic around this data. A typical business operation reads data from one or more tables, modifies it, and writes it back to storage. Almost all of them require no concurrent transaction and can read or modify the same data until current operation is commited or rolled back.
If you tried to migrate some legacy SQL ACID style code to use an eventually consistent NoSQL engine you probably know – the amount of effort and way to do this depends on application specifics. In our case, re-implementing all these operations with no ACID transactions supported in data store would require a vast amount of effort and, considering new feature development going on concurrently on the same codebase, renders this re-implementation impossible.
So we started to look for a replacement of our MSSQL database with reasonable balance between consistency, availability, and performance, while running on commodity hardware.
Needless to say, we did not find anything that suited these needs at the moment. Probably, Google’s Spanner could likely fit these requirements, but we could not find who we could buy it from (ok, just joking). So we decided to develop our own solution.
Analyzing this problem we were faced with 2 different architectural choices in the very beginning: We could take a SQL storage and develop scalability and availability on top of it, or, we could take a NoSQL storage and develop transactions and necessary data query features from the SQL world instead.
Analyzing the problem further we realized that we don’t need full SQL support – due to performance reasons and data sharding we don’t use JOINs, constraints, queries on non-indexed data. We found that set of SQL features we use is actually very much like features Cassandra 2.0 CQL offers. Cassandra has a proven record within our organization with great scalability, availability and performance since our very first installation of it back in 2010.
From the other hand we found that development of replication with conflict resolution in distributed environment on top of an RDBMS is not trivial and would require much greater effort. So we elected to use Cassandra 2.0 as a data storage engine and develop transaction support by ourselves.
This is how our new DBMS was born, which we named C*One.
This is how it looks like :
When we built our system we broke it down into 3 main components. One is storage nodes, second is update coordinator nodes, and the third part is client nodes. Although all components have different responsibilities, they all are part of the same cluster, and use Cassandra native messaging protocol to communicate with each other and exchange cluster information using Gossip.
The storage nodes form a Cassandra ring and are used for storing and retrieving data. These are very like any Cassandra nodes out there in the wild. Their responsibilities are to reliably store and retrieve data and provide possibilities to scale with data and load growth.
Update coordinator nodes
The update coordinators, is what we have developed. They are responsible for transactions and lock management.
The simplest way to achieve isolation in transactions is pessimistic locking of data participating in it. So in other words data row in transaction has to be locked first, then read, modified and written back to storage on commit. Then only after the transaction is completed, it can be unlocked and other transactions are free to modify this row.
This sounds simple on non-distributed systems, but when you go distributed you basically have 2 choices: either implement some distributed locking on cluster or partition transactions all the way, letting transactions on the same row be served by the same update coordinator. We decided to implement the latter, mainly because it is very similar to sharding with RDBMS which we are replacing. This is also the most performant and reliable approach, but it comes with restrictions: transactions can span only data served by the same transaction coordinator, thus forming a transaction group.
Update coordinator nodes form their own ring to define which transaction group is served by which node at the moment. To avoid a single point of failure (SPOF) problem, they have their own “replication strategy” so every transaction group can be served by several nodes. Then they decide which node is master through a leader election protocol to ensure only one node serves transactions of a single group. This way locking becomes as trivial as a concurrent hash map.
Another important responsibility of the update coordinator is management of data operated by transaction. We wanted to achieve READ COMMITTED isolation level as much as possible, so modifications are not visible outside of a transaction until it is committed, but are visible by modifying transaction.
The update coordinator does not persist data to storage while a transaction is in progress, but collects all modifications and applies them to a memtable, which is local to a transaction. When reads are made within the transaction, it reads data from storage nodes and local memtable this way resolving the actual state of modified data. When data are read from another transaction, these local transaction’s changes are not applied.
On transaction commit, all collected mutations are written to storage nodes. If transaction rolls back, all collected mutations as well as local memtable are just discarded from memory.
We use batch logs of Cassandra to ensure that committed data is written atomically and there are no partially applied transactions.
What is also interesting is that client nodes are part of Cassandra cluster too. They don’t use Thrift or Netty based protocol people usually use for clients. They use the same messaging protocol like Cassandra nodes use communicating with each other. Participating in Gossip exchange clients are aware of cluster topology, node aliveness, everything a normal Cassandra node is aware of. This allows us to implement client as its own read coordinator. When a read is performed outside of an active transaction, the client has no reason to read from the update coordinator, so it reads from storages directly, saving on latency and off-loading CPU intensive read coordinator tasks from storage nodes to hundreds of client nodes.
When in a transaction clients cannot read from storages directly, so read requests as well as mutations are routed to appropriate update coordinator nodes.
As we mentioned above, fast and consistent index access to business data is critical functionality for any OLTP system. We wanted our indexed reads to work as fast as possible.
To provide indexed reads, Cassandra has a secondary indexes feature. But these are not for OLTP, they are mainly meant for making serving analytic queries faster (mainly because all nodes are queried for every single indexed query and each node will read from index stable and make a random read for each row found in index). To workaround such use cases, it is supposed to create a copy of data, ordered by index expression and develop custom code to write to both column families – main and copy. But we want our developers to not develop custom code for each and every index, so we developed our own “global indexes” feature.
The global index is basically a copy of the original table’s data – they have the same or less columns, like the original table has, but they have different partitioning and a clustering key. This partitioning and clustering key composes the indexing expression on the original data table.
This global index could be created by simply issuing a CREATE GLOBAL INDEX statement. This index then becomes part of data schema. Update coordinator nodes are aware of these indexes and generate necessary mutations for all of affected indexes automatically when original data table is modified by some transaction. These additional mutations become part of the same transaction, which modify the original data table. Thanks to very fast Cassandra writes, this does not add much latency to write operations, but makes reads dramatically faster.
Changing an original table and all of its indexes includes modifications of data served by different nodes of the storage cluster (because partitioning keys of the original table and index table are different, data of the same original row and its index record could be located on different nodes). Without having an update coordinator in place, it could be very difficult to guarantee consistent and efficient changes and even implementation of custom code to manage indexed data becomes very tricky and error prone.
We also hacked on the CQL SELECT statement, so it detects presence of the global index on table and uses it for query execution without any additional hassle, much like good old SQL executor does.
Having consistency guarantees provided by update coordinators simplify a lot of everyday challenges: for example the implementation of a simple and fast monotonically increasing sequence generator (which we need for use cases when using an UUID is not possible) is very straightforward for C*One and very hard to implement efficiently for Cassandra.
We have 2 clusters deployed into production with Cassandra using ACID transactions. One of the clusters stores the descriptive information about user video. This includes, for instance, who owns this video, what’s the description of the video, etc. There is not as much information on video (not counting their binary streams of course) – only about 50M of them, so we’ve chosen video for the first application of C*One.
The other cluster stores this same descriptive information about pictures uploaded to the social network. This information chunk is much bigger: currently we have about 11 billion photos, which take up about 15 terabytes of data in the new storage. Thanks to migration we completed recently, this very important information is now always available.
More data migrations and new features are currently in development using C*One storage by application development teams inside of ok.ru.
The solution is running in production for several months now. It has proven its reliability on real failures of individual components as well as on partial data center failures.
We are very optimistic about deploying it further for data migrations from legacy SQL servers, as well for development of new product features using storage, which balances cost, reliability and ease of development the right way for us.
Cassandra, C*One, is much cheaper over Microsoft SQL because you don’t need to buy the shared storage hardware configurations, you don’t have to pay licensing costs, and you can host your data on the commodity hardware. Our current solution is half of the legacy SQL solution’s price with more than 3x more capacity included for free.
It is also much more reliable and much more fault tolerant because even with a data center failure you can do reads and writes to your data – something you just cannot achieve with MS SQL server.
Additionally we are seeing greater performance as well. After migrating from the SQL server to this new storage we had a drop in latency of the common operations on this data from 2 to 6x for regular operation on the cluster. It scales much simpler than before migration. Thanks to Cassandra, scalability is almost unlimited, very simple for operators, and can happen at any time without interruptions.
The consistency model of “lock something in transaction and do your modifications” is very simple and familiar for developers, so they adapt fast and don’t need to redesign their business logic. The similarity of CQL to the well known SQL adds to its benefits as well.