Saturday, 31 December 2016

Why Distributed Real-Time Stream Processing must have a Persistent Local/Embedded Store like RocksDB

Sometime back in year 2013, Facebook announced to open source their embedded storage:  RocksDB  
Initially, I could not realise the significance of that and thought its just another solution for their set of problems.
Lately, started noticing RocksDB being mentioned a lot frequent in recent times during discussions of distributed stream processing frameworks like Apache Flink, Kafka Streams, Samza ,etc.
This got me curious.

Whats making RocksDB so desirable in Distributed Stream Processing?
The answer I got after some analysis was: Need of Persistent Local Store in Distributed Stream Processing. RocksDb seems to be the only open source solution for that at this point of time.

This post is about understanding the Need of Persistent Local Store in Stream Processing world . (will be using terms “persistent local store” and “embedded store” interchangeably)

What is Embedded Store?

Embedded Store in distributed computing framework is essentially a local database in each of the nodes where the actual processing is happening. Its like moving the database adjacent to processing. The tasks running in each of the processing node can locally read and update the database instead of communicating to a remote database over the network.

When to have Embedded Store ?

Embedded Store is needed when a distributed stream processing :
1. is stateful : when processing of a record depends upon the result of previous computed records and will affect the future records' processing. In this case state of records need to be preserved . e.g. computing url hit counts till now since start.2. need to process lot of events/sec with high throughput and low latency. e.g. procesing request,impression,click type of events from millions of mobile devices

Why to have Embedded Store ?

In a Distributed Stateful Stream processing engine with high throughput like Flink or Kafka Streams, there are multiple worker nodes where partitioned data streams keep on coming from data sources like Kafka for processing. The tasks on the nodes do the meaningful computation (like url click count: url as key, count as value) over period of time for which they need to maintain state for each key.
This state they need to get from (and save to) persistent storage. Also they may need metadata info stored in database they need to attribute in records while processing.
These things require a database, a database which is performant and does not become bottleneck while doing concurrent high load read/update from different task nodes . A distributed database like Cassandra,dynamoDB can solve this problem and ensure they are highly available and super fast to access.
Still there is an issue which can degrade the performance: the network bandwidth issue while accessing a remote database.
Latency to access a remote database over network is fixed and can limit throughput of a processing engine no matter how much throughput it has been designed for. 
The throughput of the stream processing job will end up being determined by its slowest component, which will almost always end up being the remote random access lookups or updates.
This is where idea of local db or embedded storage kicks in.
Over time, number of cores in machines have been increased significantly, SSDs in place of magnetic disks are used which makes processing and access inside a single machine super fast today.

Just like Hadoop solved network bandwidth problem in Batch processing by data locality by moving processing close to data, in the same fashion local/embedded store seeks to solve network bandwidth problem of retrieving/updating database for stream/realtime processing by moving database closer to processing.
Local store essentially means co-partitioning the processing and database .

Facebook provided an open source option of embedded db: RocksDB , which is implemented on top of Google LevelDB.

Some instances of RocksDB being used in today’s streaming frameworks (referenced from talks & posts):
Apache Flink
Jamie Grier, the Flink guy talks in this video to use rocksDB to use as local store to use db as source of streams.

Kafka Streams:
The Confluent Kafka Streams says about using RocksDB as local store for saving state for tasks :

Apache Samza:
LinkedIn talks about using RocksDB for local store in Samza to store upto 1.5 TB data in a single node.

RocksDB solves problem of network bandwidth but creates another concern.

What if that node itself fails and another node needs to take over the responsibility ? 
For such cases, the local store also needs to be persisted somewhere. One good way is to record the changes in the local store as stream of changes called changelog (or commitlog) in Kafka world. If you are not exactly sure what a changelog is, I highly recommend to go through this post of Kafka creator,Jay Kreps explaining very well in detail: 
This changelog is written to a replicated Kafka topic internally which can be used to bring up a new node with same local state as the previous one. 
Also this changelog is not allowed to ever grow using a known Kafka feature called Log Compaction ( which ensures only the latest value is maintained for each key over a period of time.
In this way both Reads and Writes can be made super fast by using local persistent store along with the fault tolerance being guaranteed by the changelog. The approach above may sound specific to Kafka which has emerged as de-facto distributed messaging system for almost every distributed processing system out there but the intent and concept can be applied to other messaging systems as well.

One thing to notice in this discussion is that there is no mention of Apache Spark Streaming and Apache Storm.
The reason is, as far as I know, these 2 solutions don't use any persistent embedded store currently. 
Spark Streaming until 2.0 release is not a streaming frameworks and is actually micro batch processing with state checkpointing and WAL (write ahead log) mechanisms which are not very robust especially in cases of restart with code changes. With complete revamp in approach and implementation post 2.0 release, Spark Streaming looks like true streaming but is not production ready and have a long way to go before it looks comparable to its competitor Apache Flink .
Apache Storm, as per 2.0 docs ( ) have checkpoint mechanism in bolt through Redis key-value store but again its not an embedded storage solution.
Finally its time to sum up the advantages and disadvantages discussed above in bullet points :
Local data access is faster than remote store :  avoids network bandwidth issue
Remote database is not overloaded by concurrent requests . No concurrent issues as every writer is making updates to its own local store.
Local data store works in isolation so any issue with the local store does not affect other local stores and remote store.
True parallelism of data access and data updates with fault tolerance ensured. 

Significant understanding ,effort and set up required than simply calling the remote database for each access request.
Data duplication as same data is there in remote db as well as local store.
If local state size is big, time to restore the state during a restart also takes time.


The discussion I made in this post are inspired from this Facebook post and this Oreilly post .
More and more streaming frameworks adopting these ideas in their implementation. The objective of this post is to make things clear to people who are new to streaming world concepts. I hope this will help them someway.
RocksDB is the only open source solution for embedded data store at this time as no company other than Facebook has invested much in this area. But there will be definitely more solutions to come in future. Important thing is to understand the problems they are trying to solve and keep a swatch.


1 comment:

  1. Nice and Helpful article.. After reading this article i got more useful and new information about hadoop from this article.. thank you for sharing..

    big data training in chennai | big data training and placement