Thursday, 9 June 2016

Spark Streaming : Performance Tuning With Kafka and Mesos

After spending some time in Spark Streaming , i feel doing setup and coding in Spark is the easiest part of development . The challenging  but interesting part lies in tuning and stabilising the application which also takes most of the time. Its never just 1-2 fancy parameters which can make the application performant rather its the approach which helps to achieve that. In this post,based on my experience with spark 1.5.1,  will discuss how to tune performance of spark streaming on Mesos cluster with kafka for data ingestion.


How to start with Tuning: 

The best place to start with tuning is Spark official docs itself :
The doc is very well documented covering lot of cases and tuning parameters. Everyone should read the doc twice before starting.
However, when you actually start working you will be looking to tune parameters which are relevant to your use case only . Sometimes lot of parameters might confuse you as well as not all parameters will be relevant to your set up. I will discuss them in detail explaining wherever possible which one when to use and when not to use.

Doing the Tuning :

Its almost impossible to get the expected throughput with all parameters correctly tuned in first attempt. It takes some patience and perseverance to arrive at correct configurations. Its better to start with incremental tuning. Which means, write your code and run first time with all default configurations but keep the parameters suggested in spark doc in mind . Keep observing Spark UI. Obviously things will go wrong in initial attempts like some memory issue,higher processing time,sceduling delay,etc will come.
At those times, logging properly in file will come handy to debug root cause so its essential to have proper logging in place beforehand (for proper logging, read my last post). Start tuning parameters one by one and keep observing. You will come to know which parameters are important and which are not.
I will be explaining one by one as per my experience.

Kafka Ingestion Approach :
There are 2 approaches to ingest data from kafka: 1. Receiver based   2. Direct kafka
The Direct Kafka approach is new and we should prefer it over Receiver based approach for better efficiency,parallelism and exactly-once semantics. I chose Direct Kafka with default checkpointing (of offsets in a specified directory for recovery from failures) as my processing had idempotent behaviour  which means repeated updates for same set of records in database for few offsets in case of application restart don't have any side-effects (for details read : http://spark.apache.org/docs/latest/streaming-kafka-integration.html)
The Direct kafka approach keeps 1-1 mapping of each kafka partition to RDD partition in streaming processing. So its better to have total cores less than total partitions (noOfTopics*partitionsPerTopic > totalCoresInCluster) so that all cpu cores are fully exploited. Ideally the spark doc suggests total no of partitions should be around 2-3 times of number of cores depending on the processing. Also to reduce network calls between kafka broker and consumer, we can pass many log lines together in one kafka message . For example, each kafka message can contain 100 log lines which we can split once received inside spark before doing the actual processing.

Batch Interval Parameter :
Start with some intuitive batch interval say 5 or 10 seconds. Try to play around the parameter trying different values and observe the spark UI. Will get idea what batch interval gives faster processing time. For example, in my case 15 seconds suited my processing.
val ssc = new StreamingContext(sc, Seconds(15))
There is another parameter called blockInterval  (default -200ms) which i found confusing initially. This parameter is relevant for receiver based approach in kafka and not the direct approach. it determines the parallelism in receiver based approach, so better to avoid this parameter if using direct kafka.
sparkConf.set("spark.streaming.blockInterval", "200ms")

ConcurrentJobs Parameter :
Next i checked whether all cores were being properly exploited (top command and mesos slave UI) and found they were under-utilised. I found a very useful parameter which significantly increased the processing.
sparkConf.set("spark.streaming.concurrentJobs","4")
By default the number of concurrent jobs is 1 which means at a time only 1 job will be active and till its not finished,other jobs will be queued up even if the resources are available and idle. However this parameter is intentionally not documented in Spark docs as sometimes it may cause weird behaviour as Spark Streaming creator Tathagata discussed in this useful thread : http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Tune it accordingly keeping side-effects in mind. Running concurrent jobs brings down the overall processing time and scheduling delay even if a batch takes processing time slightly more than batch interval. 

maxRatePerPartition Parameter :
Still for some batches you might observe very long processing time . It might happen because of the reason that a batch processing cannot be faster than the slowest partition it has. A particular partition can contain significantly more data compared to other partitions in its batch and so it will take longer time to get processed than it peer partitions which means it will alone increase the batch processing time even though other partitions are processed. In worst case, it might happen that all the concurrent jobs are taking longer processing time because of few skewed partitions while new jobs are queued up and cpu cores are sitting idle. To avoid this, always use this parameter to limit the maximum rate of messages/sec in a partition . 
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", “25”)  
So with batch interval of 10 sec, the above parameter with value 25 will allow a partition to have maximum 25*10=250 messages. Tune the maximum rate at which your application can process without inducing delay.

Uniform Data Distribution:
One important thing is that we should be aware of our kafka topics/partitions very well. And sometimes it might happen that some kafka topics/partitions send data at very high rate while some send at very low. Such scenarios if possible should be avoided as skewed data distribution in any distributed environment leads to weird behaviours. Its ideal to have uniform data rate in all partitions. For example in my case, i was consuming data from many kafka topics and nearly half of them were sending data at nearly 10 times the rate of the other slower half. And i had no control of modifying kafka eco-system as it was used by many other applications. So instead, i tried running 2 separate spark streaming jobs on same cluster, one for slower topics and another for faster topics and assigned different maxRatePerPartition to them. It clearly showed performance improvement. But it will also mean that we have to maintain 2 similar code base .

Mesos Parameters :
As I started running 2 streaming app on the same Mesos cluster, it was needed to assign maximum cores to each application otherwise Mesos by default assigns all the cores to the first streaming app submitted . In that case, Mesos will not allow the 2nd app to run saying not enough resources available to offer. Assign max cores to each application after tuning.
sparkConf.set("spark.cores.max", “40")
Also for streaming application, its advisable to run Mesos in coarse mode instead of fine grained mode.
sparkConf.set("spark.mesos.coarse", “true")
Its important to know that Mesos in coarse mode allows only 1 executor to run on each node machine (unlike Yarn) and so below parameters executor cores and executor instances are irrelevant in Mesos.
//sparkConf.set("spark.executor.instances", "4")
//sparkConf.set("spark.executor.cores", “10")
Read more about Spark on Mesos : http://spark.apache.org/docs/latest/running-on-mesos.html

Memory Parameters :
We should tune driver and executor memory keeping machine configuration and processing needs in mind.
sparkConf.set("spark.driver.memory", "8g")
sparkConf.set("spark.executor.memory", “15g")
The executor memory determines the memory assigned for each executor and since Mesos allows only 1 executor per machine, we can keep it relatively higher as per machine RAM (unlike Yarn where multiple executors runs on same machine and each executor will have its own separate memory)
Its good to unpersist the dstream as soon as processing ends for the associated batch if you don't need to keep data in memory post processing.
sparkConf.set(“spark.streaming.unpersist","true")
Also don't call the persist() method on dstream if the same RDD is not going to be used multiple times in processing otherwise it will significantly increase memory consumption unnecessarily.
There are some other memory parameters like spark.memory.fraction,spark.memory.storageFraction,etc which are recommended at default value. They should be changed only after reading : http://spark.apache.org/docs/latest/tuning.html#memory-management-overview

Backpressure Parameter :
Spark gives a very powerful feature called backpressure . Having this property enabled means spark streaming will tell kafka to slow down rate of sending messages if the processing time is coming more than batch interval and scheduling delay is increasing. Its helpful in cases like when there is sudden surge in data flow and is a must have property to have in production to avoid application being over burdened. However this property should be disabled during development and staging phase otherwise we cannot test the limit of the maximum load our application can and should handle.
sparkConf.set("spark.streaming.backpressure.enabled","true")

Additional Parameters :
In addition to above parameters, most of times, its suggested to use kryo seralizer and G1GC for garbage collection in driver/executor. I too have used both although i did not notice as such difference possibly because my processing does not involve shuffling  currently:
sparkConf.set(“spark.serializer”,"org.apache.spark.serializer.KryoSerializer")
sparkConf.set("spark.driver.extraJavaOptions", “-XX:+UseG1GC ”)
sparkConf.set("spark.executor.extraJavaOptions", “-XX:+UseG1GC”)

Spark Streaming on Mesos with Kafka

 



Summary in bullet points :

  • If using Kafka, choose Direct Kafka approach.
  • Maintain around  2* totalCoresInCluster  <  noOfTopics*partitionsPerTopic  < 3*totalCoresInCluster
  • Set batch interval after hit and trial . Avoid blockInterval.
  • Use spark.streaming.concurrentJobs to set concurrent jobs to process
  • Use spark.streaming.kafka.maxRatePerPartition to limit the incoming data rate/sec per partition
  • Assign driver and executor memory properly to avoid memory issues.
  • Ensure uniform data rate among kafka partitions . Else can try multiple streaming apps.
  • Configure Mesos parameters properly.  
  • Set spark.streaming.unpersist to true
  • Set spark.streaming.backpressure.enabled to true in production.
  • Use KryoSerializer and G1GC as garbage collector.

The discussion above is based on my experience with Direct Kafka approach on Mesos cluster. Please feel free to comment if you come across something i missed, will be glad to add.
Happy Streaming....!!

7 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. Chandan you are very good at your job.Keep it up

    ReplyDelete
  3. Hi Chandan,
    Is backpressure required for directstream

    ReplyDelete
  4. Nice Article. Chandan, I am facing an issue. In you last pic, all the active batches are in processing state. I have tried many configurations, like Fair scheduler, increased queue capacity, executors, cores, whatever I could find and think of, but only one job is in processing state and rest are in queued. Can you please guide regarding this, what I am doing wrong. Please tell me what information you need for this?

    ReplyDelete