Saturday, 31 December 2016

Why Distributed Real-Time Stream Processing must have a Persistent Local/Embedded Store like RocksDB

Sometime back in year 2013, Facebook announced to open source their embedded storage:  RocksDB  
Initially, I could not realise the significance of that and thought its just another solution for their set of problems.
Lately, started noticing RocksDB being mentioned a lot frequent in recent times during discussions of distributed stream processing frameworks like Apache Flink, Kafka Streams, Samza ,etc.
This got me curious.

Whats making RocksDB so desirable in Distributed Stream Processing?
The answer I got after some analysis was: Need of Persistent Local Store in Distributed Stream Processing. RocksDb seems to be the only open source solution for that at this point of time.

This post is about understanding the Need of Persistent Local Store in Stream Processing world . (will be using terms “persistent local store” and “embedded store” interchangeably)

What is Embedded Store?

Embedded Store in distributed computing framework is essentially a local database in each of the nodes where the actual processing is happening. Its like moving the database adjacent to processing. The tasks running in each of the processing node can locally read and update the database instead of communicating to a remote database over the network.

When to have Embedded Store ?

Embedded Store is needed when a distributed stream processing :
1. is stateful : when processing of a record depends upon the result of previous computed records and will affect the future records' processing. In this case state of records need to be preserved . e.g. computing url hit counts till now since start.2. need to process lot of events/sec with high throughput and low latency. e.g. procesing request,impression,click type of events from millions of mobile devices

Why to have Embedded Store ?

In a Distributed Stateful Stream processing engine with high throughput like Flink or Kafka Streams, there are multiple worker nodes where partitioned data streams keep on coming from data sources like Kafka for processing. The tasks on the nodes do the meaningful computation (like url click count: url as key, count as value) over period of time for which they need to maintain state for each key.
This state they need to get from (and save to) persistent storage. Also they may need metadata info stored in database they need to attribute in records while processing.
These things require a database, a database which is performant and does not become bottleneck while doing concurrent high load read/update from different task nodes . A distributed database like Cassandra,dynamoDB can solve this problem and ensure they are highly available and super fast to access.
Still there is an issue which can degrade the performance: the network bandwidth issue while accessing a remote database.
Latency to access a remote database over network is fixed and can limit throughput of a processing engine no matter how much throughput it has been designed for. 
The throughput of the stream processing job will end up being determined by its slowest component, which will almost always end up being the remote random access lookups or updates.
This is where idea of local db or embedded storage kicks in.
Over time, number of cores in machines have been increased significantly, SSDs in place of magnetic disks are used which makes processing and access inside a single machine super fast today.

Just like Hadoop solved network bandwidth problem in Batch processing by data locality by moving processing close to data, in the same fashion local/embedded store seeks to solve network bandwidth problem of retrieving/updating database for stream/realtime processing by moving database closer to processing.
Local store essentially means co-partitioning the processing and database .

Facebook provided an open source option of embedded db: RocksDB , which is implemented on top of Google LevelDB.

Some instances of RocksDB being used in today’s streaming frameworks (referenced from talks & posts):
Apache Flink
Jamie Grier, the Flink guy talks in this video to use rocksDB to use as local store to use db as source of streams.

Kafka Streams:
The Confluent Kafka Streams says about using RocksDB as local store for saving state for tasks :

Apache Samza:
LinkedIn talks about using RocksDB for local store in Samza to store upto 1.5 TB data in a single node.

RocksDB solves problem of network bandwidth but creates another concern.

What if that node itself fails and another node needs to take over the responsibility ? 
For such cases, the local store also needs to be persisted somewhere. One good way is to record the changes in the local store as stream of changes called changelog (or commitlog) in Kafka world. If you are not exactly sure what a changelog is, I highly recommend to go through this post of Kafka creator,Jay Kreps explaining very well in detail: 
This changelog is written to a replicated Kafka topic internally which can be used to bring up a new node with same local state as the previous one. 
Also this changelog is not allowed to ever grow using a known Kafka feature called Log Compaction ( which ensures only the latest value is maintained for each key over a period of time.
In this way both Reads and Writes can be made super fast by using local persistent store along with the fault tolerance being guaranteed by the changelog. The approach above may sound specific to Kafka which has emerged as de-facto distributed messaging system for almost every distributed processing system out there but the intent and concept can be applied to other messaging systems as well.

One thing to notice in this discussion is that there is no mention of Apache Spark Streaming and Apache Storm.
The reason is, as far as I know, these 2 solutions don't use any persistent embedded store currently. 
Spark Streaming until 2.0 release is not a streaming frameworks and is actually micro batch processing with state checkpointing and WAL (write ahead log) mechanisms which are not very robust especially in cases of restart with code changes. With complete revamp in approach and implementation post 2.0 release, Spark Streaming looks like true streaming but is not production ready and have a long way to go before it looks comparable to its competitor Apache Flink .
Apache Storm, as per 2.0 docs ( ) have checkpoint mechanism in bolt through Redis key-value store but again its not an embedded storage solution.
Finally its time to sum up the advantages and disadvantages discussed above in bullet points :
Local data access is faster than remote store :  avoids network bandwidth issue
Remote database is not overloaded by concurrent requests . No concurrent issues as every writer is making updates to its own local store.
Local data store works in isolation so any issue with the local store does not affect other local stores and remote store.
True parallelism of data access and data updates with fault tolerance ensured. 

Significant understanding ,effort and set up required than simply calling the remote database for each access request.
Data duplication as same data is there in remote db as well as local store.
If local state size is big, time to restore the state during a restart also takes time.


The discussion I made in this post are inspired from this Facebook post and this Oreilly post .
More and more streaming frameworks adopting these ideas in their implementation. The objective of this post is to make things clear to people who are new to streaming world concepts. I hope this will help them someway.
RocksDB is the only open source solution for embedded data store at this time as no company other than Facebook has invested much in this area. But there will be definitely more solutions to come in future. Important thing is to understand the problems they are trying to solve and keep a swatch.


Sunday, 11 September 2016

AVRO Data Processing in Spark Streaming + Kafka

For a high load RealTime Processing requirement , Kafka-Spark Streaming combination is an obvious choice today as ingestion-processing pipeline. 
Adding an efficient data serialisation like AVRO to this combination can significantly reduce network traffic from Kafka broker to Spark worker to improve bandwidth usage and can ensure faster processing in Spark Executor to improve CPU resources utilisation.
In current project, I recently switched from Json format input data to Avro data for the above stated reasons and can observe much better throughput.
In this post, will explain how to ingest and process binary Avro data in a SparkStreaming-Kafka system along with code sample.

Short Intro :

Apache Avro is a data serialisation framework developed for big data distributed systems. Avro defines a Schema in advance using Json and subsequently encodes data records in compact binary format keeping the known schema in mind. Its similar to framework like Thrift but with advantages like dynamic typing,untagged data. More details at :
A sample Avro Schema looks like (copied shamelessly from wiki : ) :
   "namespace": "example.avro",
   "type": "record",
   "name": "User",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]}

Schema is instantiated as :
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString); //schemaString is json as described above

The Schema defines the field names and data types. The receiver of Avro data needs to know this Schema one time before starting processing, there are 2 ways for that :

  1. When writing Avro data to a file, Schema is stored too in the file which can be read while processing file.
  2. When doing RPC data transfer like Kafka to Spark, Schema is exchanged one time during initial handshake and subsequently only binary Avro data is exchanged.
We will cover the relevant 2nd case in detail in this post.
For better understanding of SparkStreaming and Kafka separately, would recommend to go through my earlier posts on Spark Streaming.

Important Utilities for Avro:

There are 2 useful utilities which help in easy and configurable Avro implementation in Spark Streaming. We should definitely get an idea about them first :
  1. Avro Schema Registry 
  2. Twitter Bijection library 

Avro Schema Registry :

Will keep this section concise. Avro Schema needs to be configurable and not statically defined as in last section. This is needed for easy sharing and handling changes in Schema. There is a module named schema-repo on github by LinkedIn which provides a RESTful service to store and retrieve Schemas   :
In SparkSreaming, we can fetch the Avro Schema from the REST service using httpclient. I have shared the code, please refer to getLatestSchemaByTopic() method at :

Confluent has also open sourced RESTful interface for storing and retrieving Avro Schema, you can access details at :
No matter which library we use, ultimately Avro Schema registry should be configurable to support changes evolving with time.

Twitter Bijection library :

Avro provides APIs to serialize/deserialize data but thats not very friendly. Instead, Twitter provides a Bijection library with APIs to convert back and forth between 2 data types and it is much easier to use in case of Avro as well. Lets see an example to understand.
First, we need to instantiate an Injection object which provides 2 APIs : invert() and apply() . The apply() method converts original record in binary format while invert() method does the opposite.

//creating Injection object for avro
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

// serializing generic avro data record into binary format  :
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "Chandan");
avroRecord.put("favorite_number", 10);
avroRecord.put("favorite_color", "Black");
byte[] bytes = recordInjection.apply(record);

//de-serializing compiled avro data record :
GenericRecord record = recordInjection.invert(bytes).get();
String name = (String) record.get("name");
String color = (String) record.get("favorite_color");
int number = (int) record.get(“favorite_number”);

If you are willing to know more about Bijection, refer  :

Avro + Spark Streaming + Kafka :

Now that we have idea of Avro and associated utilities, its time to write the Spark Streaming program to consume and process Avro data from Kafka. For performance reasons, I have used DirectKafka approach with custom offset management using Zookeeper (for details ,refer my last post : )
First , we need to include Avro dependencies in pom.xml in addition to Spark and Kafka ones  :

Then we need to define a method to get or create DirectKafkaStream . 
Important point to note here is that since the values in kafka messages are now binary, so we need to use DefaultDeocder instead of StringDecoder in method signature and Byte class in place of String class for Kafka message values.

The actual method where all the processing of Avro data will happen will be something like below. 

The above processing logic will be executed for each record under each RDD inside a Spark executor. 
Its important to note here in code is that for each RDD, Avro Schema is fetched and Injection object is instantiated only once before processing the records in that RDD. Also, the code above assumes that even if multiple topics are there, all topics represent same Avro Schema .  

The complete code can be accessed from github repo :

Conclusion :

This was all about how to fetch and process Avro data in Spark Streaming using Kafka. One area which i have not covered is how to add Avro Kafka messages on Kafka Producer side. The code for that should be simple on the same line as serializing generic avro data record into binary format . 

Sunday, 21 August 2016

Upgrading a Running Spark Streaming Application with Code Changes with Zero-Data-Loss

Spark Streaming is one of the most reliable (near) real time processing solutions available in the streaming world these days. It comes with ease of development , continuous evolving features with very good community support. 
One of the most important concern area of any streaming solution is how it tackles failure situations like application going down due to some issue/error. Because the data is realtime and keeps on coming even while application is down, its really important to handle backlog data before processing current time data in order to avoid data loss once application is back to life. Together with Kafka , which is distributed messaging system capable of replaying past data, Spark Streaming handles failure situations quite nicely .
Restart of the system is normal and not just the case of failures. Upgrading a running streaming application with code changes,configuration changes,new features,etc is needed time to time . It requires graceful shutdown of the streaming application , deployment of changes and then restart of the application to continue from the same point where it had left. 
There are quite some challenges involved in doing such upgradations compared to failure events. 
In this post, want to discuss my experience and learning dealing with the same.

Background : Fault Tolerance, Checkpointing, DirectKafka  

Its always better to start first with some background before jumping directly to the problem statement itself.
Fault tolerance means dealing with failure scenarios where application goes down and has to be restarted for processing. Upgrading means intentional restart of the system to deploy new changes . One common thing between both situations is that both need System Restart . But the caveat is both cannot be done same way.
For Fault Tolerance, Spark Streaming came up with Checkpointing : 
Checkpointing keeps on persisting necessary information of the streaming system like metadata and state data to reliable storage system (disk/hdfs/S3) at regular intervals. Using Kafka with DirectKafka approach for ingestion makes the system robust as it avoids storing actual data and stores just the Kafka offsets which can be used to replay the kafka messages post restart. (Unlike inefficient receiver based approach, more details at : )
For enabling Checkpointing and DirectKafka, user needs to define a checkpoint directory in code , call KafkaUtils.createDirectStream API to get kafka messages and then call StreamingContext.getOrCreate API to get context either from the checkpoint directory or create a new one. 
The code for Checkpointing + DirectKafka looks like this :

With the above minimal code change, Spark Streaming can give Exactly-Once processing provided the nature of processing is idempotent or like atomic transaction . For example in my case, the processing nature was idempotent as the stream processing was doing computation and then doing insert/update in database based on primary key.

Challenge with Upgradation :

The reason i started with that much details on Fault Tolerance and Checkpointing is because many people while trying to do Upgradation first time in running Spark Streaming can be mistaken. I did the initial mistake myself by assuming I can just stop the application gracefully, deploy the new jar with code/configuration changes and checkpointing should take care of the rest. This is a wrong assumption. The code changes may/may not be reflected or even worse some exception can come. The configuration changes will not be picked up at all.
Saw similar questions on stackOverFlow where people were trying to use checkpointing for upgradation :
Its important to understand that sole objective of Checkpointing is fault tolerance. When we enable checkpointing, it basically means that Spark Streaming will save the Kafka Offsets as well as entire metadata like configuration changes and stateful data like updateStateByKey in persistent storage . So on restart, Kafka partition Offsets  will be fetched from storage and executors will ask Kafka brokers to start replaying data from those offsets. But at the same time, SparkConf object will also be built by deserialising the saved metadata from the disk ( check StreamingContext.getOrCreate() method implementation: ) . Streaming Context will be created using the same deserialised SparkConf object . Once streaming context is up, no changes can be done in SparkConf to be that can reflect in streaming context. So basically updated configuration changes can not be considered.
In short, we can say that we can rely on checkpointing for Failure Restart but we cannot rely on checkpointing for Upgradation Restart. 

Solution for Upgradation :

So How can we do Upgradation without checkpointing ?
Easiest way is to delete the checkpoint directory itself before deployment so that fresh streaming context is created. But its not a good solution as kafka offsets will also be deleted and then data from kafka will be fetched either from the latest offset or the smallest offset in kafka broker depending upon auto.offset.reset parameter. Either it will mean hell lot of re-computations or substantial data loss which might not be acceptable.

So How can we do Upgradation efficiently with Zero-Data-Loss Exactly-Once semantics ?
Start new application in parallel with the old one and after sometime bring down the old one. 
Shutdown the old application gracefully and start new one from the same point where it left with help of source-side buffering support from Kafka.
The 1st solution is not practically feasible in resource limited environments. Also too much data might get doubly processed in parallel systems to avoid zero data loss. In short, a bad solution.
The 2nd solution is the correct approach but there is no concrete solution example given in the documentation. It basically means to write API for custom management of Kafka offsets for persisting while processing and reading during a restart .
I am sharing here code for 2nd solution which uses overridden KafkaUtils.createDirectStream API.
We  also need to configure light weight Zookeeper which will be used for persisting and reading Kafka offsets reliably. In my case, writing kafka offsets of all the partitions to zookeeper was taking around 10-20 milliseconds which was quite acceptable.

Code Explanation :

  • Create a zookeeper client with necessary details.
  • Try reading kafka offset from Zookeeper. If its first time and there is no kafka offset in zookeeper, default KafkaUtils.createDirectStream API will be called . Data from latest kafka offsets for topic partitions will be fetched from Kafka brokers .
  • Save the starting offsets for each RDD partition in each batch of the stream right away to zookeeper before starting processing for the corresponding batch. Its important to know that Kafka topic partition is one to one mapped with Spark RDD partition.
  • On subsequent application restarts, starting offsets will be retrieved from zookeeper just like it used to be fetched from checkpoint directory but this time, without any additional metadata info. Streaming Context will be created with the fresh SparkConf object and all code/configuration changes will be considered.

However this solution too has limitations which should be kept in mind:
  • Cannot be used if state management like updateSateByKey functionality is required. Because these additional info need to be saved essentially means checkpointing . In my case, state management was not needed so this solution fitted quite well. 
  • Solution for this also seems possible by having a flag at start which can decide whether to use checkpointing or custom offset management. But I have not tried it personally as it was not needed in my case.
  • Additional persistent system like zookeeper comes in picture and needs to be configured.
  • Cannot be used if Kafka topics/number of partitions change as offsets of each spark partition is one to one mapped with kafka partition.
  • Can only be used with advanced ingestion system like Kafka,flume with source side buffering support.


Sunday, 31 July 2016

Apache Spark : RDD vs DataFrame vs Dataset

With Spark2.0 release, there are 3 types of data abstractions which Spark officially provides now to use : RDD,DataFrame and DataSet .
For a new user, it might be confusing to understand relevance of each one and decide which one to use and which one not to. In this post, will discuss each one of them in detail with their differences and pros-cons.

Short Combined Intro :
Before i discuss each one in detail separately, want to start with a short combined intro.
Evolution of these abstractions happened in this way :
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
RDD being the oldest available from 1.0 version to Dataset being the newest available from 1.6 version.
Given same data, each of the 3 abstraction will compute and give same results to user. But they differ in performance and the ways they compute.
RDD lets us decide HOW we want to do which limits the optimisation Spark can do on processing underneath where as dataframe/dataset lets us decide WHAT we want to do and leave everything on Spark to decide how to do computation. 
We will understand this in 2 minutes what is meant by HOW & WHAT .
Dataframe came as a major performance improvement over RDD but not without some downsides.
This led to development of Dataset which is an effort  to unify best of RDD and data frame.
In future, Dataset will eventually replace RDD and Dataframe to become the only API spark users should be using in code.
Lets understand them in detail one by one.

-Its building block of spark. No matter which abstraction Dataframe or Dataset we use, internally final computation is done on RDDs. 
-RDD is lazily evaluated immutable parallel collection of objects exposed with lambda functions.
-The best part about RDD is that it is simple. It provides familiar OOPs style APIs with compile time safety. We can load any data from a source,convert them into RDD and store in memory to compute results. RDD can be easily cached if same set of data needs to recomputed.
-But the disadvantage is performance limitations. Being in-memory jvm objects, RDDs involve overhead of Garbage Collection and Java(or little better Kryo) Serialisation which are expensive when data grows.

RDD example:

-DataFrame is an abstraction which gives a schema view of data.  Which means it gives us a view of data as columns with column name and types info, We can think data in data frame like a table in database.
-Like RDD, execution in Dataframe too is lazy triggered .
-offers huge performance improvement over RDDs because of 2 powerful features it has: 
1. Custom Memory management  (aka Project Tungsten)
Data is stored in off-heap memory in binary format. This saves a lot of memory space. Also there is no Garbage Collection overhead involved. By knowing the schema of data in advance and storing efficiently in binary format, expensive java Serialization is also avoided.
2. Optimized Execution Plans        (aka Catalyst Optimizer)
       Query plans are created for execution using Spark catalyst optimiser. After an optimised execution plan is prepared going through some steps, the final execution happens internally on RDDs only but thats completely hidden from the users.
Execution plan stages

Just to give an example of optimisation with respect to the above picture, lets consider a query as below  :
inefficient query: filter after join

optimization at run time during execution 

In the above query, filter is used before join which is a costly shuffle operation. The logical plan sees that and in optimised logical plan, this filter is pushed to execute before join. In the optimised execution plan, it can leverage datasource capabilities also and push that filter further down to datasource so that it can apply that filter on the disk level rather than bringing all data in memory and doing filter in memory (which is not possible while directly using RDDs). So filter method now effectively works like a WHERE clause in a database query. Also with optimised data sources like parquet , if Spark sees that you need only few columns to compute the results , it will read and fetch only those columns from parquet saving both disk IO and memory.
-Drawback : Lack of Type Safety. As a developer, i will not like using dataframe as it doesn't seem developer friendly. Referring attribute by String names means no compile time safety. Things can fail at runtime. Also APIs doesn't look programmatic and more of sql kind.
Dataframe example:  
 2 ways to define:    1. Expression BuilderStyle          2. SQL Style

As discussed, If we try using some columns not present in schema, we will get problem only at runtime . For example, if we try accessing salary when only name and age are present in the schema will exception like below:

-It is an extension to Dataframe API, the latest abstraction which tries to provide best of both RDD and Dataframe.
-comes with OOPs style and developer friendly compile time safety like RDD as well as performance boosting features of Dataframe : Catalyst optimiser and custom memory management.
-How dataset scores over Dataframe is an additional feature it has: Encoders .
-Encoders act as interface between JVM objects and off-heap custom memory binary format data. 
-Encoders generate byte code to interact with off-heap data and provide on-demand access to individual attributes without having to de-serialize an entire object.
-case class is used to define the structure of data schema in Dataset. Using case class, its very easy to work with dataset. Names of different attributes in case class is directly mapped to attributes in Dataset . It gives feeling like working with RDD but actually underneath it works same as Dataframe.
Dataframe is infact treated as dataset of generic row objects. DataFrame=Dataset[Row] . So we can always convert a data frame at any point of time into a dataset by calling ‘as’ method on Dataframe.
Dataset Example :

Important point  to remember is that both Dataset and DataFrame internally does final execution on RDD objects only but the difference is users do not write code to create the RDD collections and have no control as such over RDDs.  RDDs are created in the execution plan as last stage after deciding and going through all the optimizations (see Execution Plan Diagram). 
Thats why at the beginning of this post i emphasized on……..RDD let us decide HOW we want to do where as Dataframe/Dataset lets us decide WHAT we want to do.  
And all these optimisations could have been possible because data is structured and Spark knows about the schema of data in advance. So it can apply all the powerful features like tungsten custom memory off-heap binary storage,catalyst optimiser and encoders to get the performance which was not possible if users would have been directly working on RDD.

In short, Spark is moving from unstructured computation(RDDs) towards structured computation because of many performance optimisations it allows . Data frame was a step in direction of structured computation but lacked developer friendliness of compile time safety,lambda functions. Finally Dataset is the unification of Dataframe and RDD to bring the best abstraction out of two.
Going forward developers should only be concerned about DataSet while Dataframe and RDD will be discouraged to use. But its always better to be aware of the legacy for better understanding of internals.
Interestingly,most of these new concepts like custom memory management(tungsten),logical/physical plans(catalyst optimizer),encoders(dataset),etc seems to be inspired from its competitor Apache Flink which inherently supports these since inception.There are other new powerful feature enhancements like windowing,sessions,etc coming in Spark which are already in Flink. So its better to keep a close watch on both Spark and Flink in coming days.
Meanwhile i would recommend to :
1. watch this excellent talk from Spark summit on dataframe and dataset :     

Thursday, 9 June 2016

Spark Streaming : Performance Tuning With Kafka and Mesos

After spending some time in Spark Streaming , i feel doing setup and coding in Spark is the easiest part of development . The challenging  but interesting part lies in tuning and stabilising the application which also takes most of the time. Its never just 1-2 fancy parameters which can make the application performant rather its the approach which helps to achieve that. In this post,based on my experience with spark 1.5.1,  will discuss how to tune performance of spark streaming on Mesos cluster with kafka for data ingestion.

How to start with Tuning: 

The best place to start with tuning is Spark official docs itself :
The doc is very well documented covering lot of cases and tuning parameters. Everyone should read the doc twice before starting.
However, when you actually start working you will be looking to tune parameters which are relevant to your use case only . Sometimes lot of parameters might confuse you as well as not all parameters will be relevant to your set up. I will discuss them in detail explaining wherever possible which one when to use and when not to use.

Doing the Tuning :

Its almost impossible to get the expected throughput with all parameters correctly tuned in first attempt. It takes some patience and perseverance to arrive at correct configurations. Its better to start with incremental tuning. Which means, write your code and run first time with all default configurations but keep the parameters suggested in spark doc in mind . Keep observing Spark UI. Obviously things will go wrong in initial attempts like some memory issue,higher processing time,sceduling delay,etc will come.
At those times, logging properly in file will come handy to debug root cause so its essential to have proper logging in place beforehand (for proper logging, read my last post). Start tuning parameters one by one and keep observing. You will come to know which parameters are important and which are not.
I will be explaining one by one as per my experience.

Kafka Ingestion Approach :
There are 2 approaches to ingest data from kafka: 1. Receiver based   2. Direct kafka
The Direct Kafka approach is new and we should prefer it over Receiver based approach for better efficiency,parallelism and exactly-once semantics. I chose Direct Kafka with default checkpointing (of offsets in a specified directory for recovery from failures) as my processing had idempotent behaviour  which means repeated updates for same set of records in database for few offsets in case of application restart don't have any side-effects (for details read :
The Direct kafka approach keeps 1-1 mapping of each kafka partition to RDD partition in streaming processing. So its better to have total cores less than total partitions (noOfTopics*partitionsPerTopic > totalCoresInCluster) so that all cpu cores are fully exploited. Ideally the spark doc suggests total no of partitions should be around 2-3 times of number of cores depending on the processing. Also to reduce network calls between kafka broker and consumer, we can pass many log lines together in one kafka message . For example, each kafka message can contain 100 log lines which we can split once received inside spark before doing the actual processing.

Batch Interval Parameter :
Start with some intuitive batch interval say 5 or 10 seconds. Try to play around the parameter trying different values and observe the spark UI. Will get idea what batch interval gives faster processing time. For example, in my case 15 seconds suited my processing.
val ssc = new StreamingContext(sc, Seconds(15))
There is another parameter called blockInterval  (default -200ms) which i found confusing initially. This parameter is relevant for receiver based approach in kafka and not the direct approach. it determines the parallelism in receiver based approach, so better to avoid this parameter if using direct kafka.
sparkConf.set("spark.streaming.blockInterval", "200ms")

ConcurrentJobs Parameter :
Next i checked whether all cores were being properly exploited (top command and mesos slave UI) and found they were under-utilised. I found a very useful parameter which significantly increased the processing.
By default the number of concurrent jobs is 1 which means at a time only 1 job will be active and till its not finished,other jobs will be queued up even if the resources are available and idle. However this parameter is intentionally not documented in Spark docs as sometimes it may cause weird behaviour as Spark Streaming creator Tathagata discussed in this useful thread : . Tune it accordingly keeping side-effects in mind. Running concurrent jobs brings down the overall processing time and scheduling delay even if a batch takes processing time slightly more than batch interval. 

maxRatePerPartition Parameter :
Still for some batches you might observe very long processing time . It might happen because of the reason that a batch processing cannot be faster than the slowest partition it has. A particular partition can contain significantly more data compared to other partitions in its batch and so it will take longer time to get processed than it peer partitions which means it will alone increase the batch processing time even though other partitions are processed. In worst case, it might happen that all the concurrent jobs are taking longer processing time because of few skewed partitions while new jobs are queued up and cpu cores are sitting idle. To avoid this, always use this parameter to limit the maximum rate of messages/sec in a partition . 
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", “25”)  
So with batch interval of 10 sec, the above parameter with value 25 will allow a partition to have maximum 25*10=250 messages. Tune the maximum rate at which your application can process without inducing delay.

Uniform Data Distribution:
One important thing is that we should be aware of our kafka topics/partitions very well. And sometimes it might happen that some kafka topics/partitions send data at very high rate while some send at very low. Such scenarios if possible should be avoided as skewed data distribution in any distributed environment leads to weird behaviours. Its ideal to have uniform data rate in all partitions. For example in my case, i was consuming data from many kafka topics and nearly half of them were sending data at nearly 10 times the rate of the other slower half. And i had no control of modifying kafka eco-system as it was used by many other applications. So instead, i tried running 2 separate spark streaming jobs on same cluster, one for slower topics and another for faster topics and assigned different maxRatePerPartition to them. It clearly showed performance improvement. But it will also mean that we have to maintain 2 similar code base .

Mesos Parameters :
As I started running 2 streaming app on the same Mesos cluster, it was needed to assign maximum cores to each application otherwise Mesos by default assigns all the cores to the first streaming app submitted . In that case, Mesos will not allow the 2nd app to run saying not enough resources available to offer. Assign max cores to each application after tuning.
sparkConf.set("spark.cores.max", “40")
Also for streaming application, its advisable to run Mesos in coarse mode instead of fine grained mode.
sparkConf.set("spark.mesos.coarse", “true")
Its important to know that Mesos in coarse mode allows only 1 executor to run on each node machine (unlike Yarn) and so below parameters executor cores and executor instances are irrelevant in Mesos.
//sparkConf.set("spark.executor.instances", "4")
//sparkConf.set("spark.executor.cores", “10")
Read more about Spark on Mesos :

Memory Parameters :
We should tune driver and executor memory keeping machine configuration and processing needs in mind.
sparkConf.set("spark.driver.memory", "8g")
sparkConf.set("spark.executor.memory", “15g")
The executor memory determines the memory assigned for each executor and since Mesos allows only 1 executor per machine, we can keep it relatively higher as per machine RAM (unlike Yarn where multiple executors runs on same machine and each executor will have its own separate memory)
Its good to unpersist the dstream as soon as processing ends for the associated batch if you don't need to keep data in memory post processing.
Also don't call the persist() method on dstream if the same RDD is not going to be used multiple times in processing otherwise it will significantly increase memory consumption unnecessarily.
There are some other memory parameters like spark.memory.fraction,spark.memory.storageFraction,etc which are recommended at default value. They should be changed only after reading :

Backpressure Parameter :
Spark gives a very powerful feature called backpressure . Having this property enabled means spark streaming will tell kafka to slow down rate of sending messages if the processing time is coming more than batch interval and scheduling delay is increasing. Its helpful in cases like when there is sudden surge in data flow and is a must have property to have in production to avoid application being over burdened. However this property should be disabled during development and staging phase otherwise we cannot test the limit of the maximum load our application can and should handle.

Additional Parameters :
In addition to above parameters, most of times, its suggested to use kryo seralizer and G1GC for garbage collection in driver/executor. I too have used both although i did not notice as such difference possibly because my processing does not involve shuffling  currently:
sparkConf.set("spark.driver.extraJavaOptions", “-XX:+UseG1GC ”)
sparkConf.set("spark.executor.extraJavaOptions", “-XX:+UseG1GC”)

Spark Streaming on Mesos with Kafka


Summary in bullet points :

  • If using Kafka, choose Direct Kafka approach.
  • Maintain around  2* totalCoresInCluster  <  noOfTopics*partitionsPerTopic  < 3*totalCoresInCluster
  • Set batch interval after hit and trial . Avoid blockInterval.
  • Use spark.streaming.concurrentJobs to set concurrent jobs to process
  • Use spark.streaming.kafka.maxRatePerPartition to limit the incoming data rate/sec per partition
  • Assign driver and executor memory properly to avoid memory issues.
  • Ensure uniform data rate among kafka partitions . Else can try multiple streaming apps.
  • Configure Mesos parameters properly.  
  • Set spark.streaming.unpersist to true
  • Set spark.streaming.backpressure.enabled to true in production.
  • Use KryoSerializer and G1GC as garbage collector.

The discussion above is based on my experience with Direct Kafka approach on Mesos cluster. Please feel free to comment if you come across something i missed, will be glad to add.
Happy Streaming....!!

Saturday, 4 June 2016

Apache Spark Streaming : Logging Driver Executor logs the right way

From my experience, i feel logging properly is one of the most important thing to do first when starting Spark Streaming development especially when you are running on cluster with multiple worker machines.
Reason is simple : Streaming is a continuous running process and the exception/error may arrive after many hours/days and it can be because of driver or can be because of executor. It will be hard to debug the root cause as driver logs are coming in console cannot be seen after application shuts down while executor logs come in std out/err files ( i am using Mesos as cluster manager) which is tedious to download and see. So when some issue comes, like in my case an out-of-memory issue came after 2 days of running and application went down. I had to be sure whether driver or executor was the actual culprit where issue came first. So i first did this logging configuration properly before debugging the issue.
Also with logging, we can control how much retention/days of logs we want to keep for driver and executor so that disk space is not eat up by logs generated by ever running application. And if we are running multiple spark streaming applications on the same cluster , we can enable logging to separate log files for different executors even if multiple executors happen to run on same worker machine.

Logging Configuration Steps :
I am using standard apache logger library for logging with appropriate logging levels in code. Default spark log4j properties template can be found in the spark conf directory. For example in my case, its at /usr/local/spark-1.5.1-bin-hadoop2.6/conf/ :
1.  Create separate log4j configuration files for driver and executor and place them at conf/ directory. Each of them should be configured with different file name and other rolling properties as per use case. In case of multiple applications, create the same separately and differently for each application . For example, i have 2 streaming application named Request and Event. So, have created 4 files as:

Contents of log4j file should be like :
log4j.rootCategory=INFO, FILE
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

log4j.appender.FILE.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

2. Copy the above files to similar conf/ directory on each worker machine and create the log directory for logs on each of them. I have specified log dir as temp/ as of now, which is already there. As shown above, i am keeping only last 10 recent log files of max size 500 mb each for my driver and executor log on each worker machine as it suffices for 4-5 days of logs retention for my specific use case.
3. Include the log dir for driver and executor in spark submit command as below :
      spark-submit   --driver-java-options "-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/c
onf/" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/log4j_RequestLogExecutor.prope
rties"   --master mesos://sparkclusterip:5050  --jars     ……so on

Troubleshooting logging issue :
  Ideally, as per spark documentation, this configuration is enough for separate logging of driver and executor to log file.
However for some unknown reason, in my case, my driver logs were getting written properly to file but executor logs were not going into file and were still coming on Mesos cluster standard std out/err files .
Setting the same above driver and executor java options in spark conf object in code through application cont properties file did the trick for me which is another way of setting spark properties if you don't want to include ins park submit command.
application.conf :
request-driver-options = "-Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/"
request-executor-options = "-XX:+UseG1GC -Dlog4j.configuration=file:/usr/local/spark-1.5.1-bin-hadoop2.6/conf/"
Code :
sparkConf.set("spark.driver.extraJavaOptions",conf.getString("application.request-driver-options") )
sparkConf.set("spark.executor.extraJavaOptions",conf.getString("application.request-executor-options") )Doing above, now i can see all logs getting written to log files only. On Mesos UI , std err/out files are now empty which used to contain executor logs earlier.

Although this post described logging configuration based on my experience on Spark Streaming with Mesos cluster, it will be similar for batch spark jobs and other clusters like standalone,Yarn. Also i am using spark 1.5.1 version right now, so behaviour might change in future releases as spark evolves further e.g. in cases like graceful shutdown,see my last post.

Tuesday, 31 May 2016

Apache Spark Streaming : How to do Graceful Shutdown

In my current project, I am using Spark Streaming as processing engine , Kafka as data source and Mesos as cluster /resource manager.
To be precise, i am using Direct Kafka Approach in spark for data ingestion.
Once a streaming application is up and running, there will be multiple things to do to make it stable ,consistent and seamless.
One of them is ensuring Graceful Shutdown to avoid data loss. In cases of restarting Streaming application, deploying changes, etc we have to ensure that the shutdown happens gracefully and in consistent state. It means that once the application receives shutdown signal, it should not accept any more data for processing but at the same time, it should make sure to process all the data/jobs for the current Kafka offsets in memory to get processed before bringing the application down. When the application restarts, it will read the Kafka offset from the checkpoint directory and start getting the data from kafka accordingly for processing.

In this post, i am going to share details how to do graceful shutdown of Spark Streaming application.
There are 2 ways   :
1. Explicitly calling the shutdown hook in driver program : 

  "Gracefully stopping Spark Streaming Application")
            ssc.stop(true, true)
  "Application stopped")

The ssc.stop method’s 1st boolean argument is for stopping the associated spark context while the 2nd boolean argument is for graceful shutdown of streaming context.
      I tried this above approach in my spark application with version 1.5.1 but it did not work. The streaming application was shutting down gracefully but the spark context remained alive or lets say hung. The driver and executor processes were not getting exited. I had to use kill -9 command to forcefully terminate the spark context(which kills driver and executors ).
Later, i found out that this approach is old and was used for spark version before 1.4 . For new spark versions, we use the 2nd approach.

2. spark.streaming.stopGracefullyOnShutdown parameter :
        Setting this parameter to True in spark configuration ensures the proper graceful shutdown in new Spark version (1.4 onwards) applications. Also we should not use 1st explicit shutdown hook approach or call the ssc.stop method in the driver along with this parameter . We can just set this parameter, and then call methods ssc.start() and
ssc.awaitTermination() . No need to call ssc.stop method. Otherwise application might hung during shutdown.
Please look at the spark source code for knowing how this parameter is used internally :

How to pass Shutdown Signal :
Now we know how to ensure graceful shutdown in spark streaming. But how can we pass the shutdown signal to spark streaming. One naive option is to use CTRL+C command at the screen terminal where we run driver program but obviously its not a good option.
One solution , which i am using is , grep the driver process of spark streaming and send a SIGTERM signal . When driver gets this signal, it initiates the graceful shutdown of the application.
We can write the command as below in some shell script  and run the script to pass shutdown signal :
ps -ef | grep spark |  grep <DriverProgramName> | awk '{print $2}'   | xargs kill  -SIGTERM
e.g. ps -ef | grep spark |  grep DataPipelineStreamDriver | awk '{print $2}'   | xargs kill  -SIGTERM

One limitation of this approach is that it can be run only on the same machine on which driver program was run and not on any other node machine of the spark cluster.

If you come to know any of the better approach, please do share.