Sunday, 21 August 2016

Upgrading a Running Spark Streaming Application with Code Changes with Zero-Data-Loss



Spark Streaming is one of the most reliable (near) real time processing solutions available in the streaming world these days. It comes with ease of development , continuous evolving features with very good community support. 
One of the most important concern area of any streaming solution is how it tackles failure situations like application going down due to some issue/error. Because the data is realtime and keeps on coming even while application is down, its really important to handle backlog data before processing current time data in order to avoid data loss once application is back to life. Together with Kafka , which is distributed messaging system capable of replaying past data, Spark Streaming handles failure situations quite nicely .
Restart of the system is normal and not just the case of failures. Upgrading a running streaming application with code changes,configuration changes,new features,etc is needed time to time . It requires graceful shutdown of the streaming application , deployment of changes and then restart of the application to continue from the same point where it had left. 
There are quite some challenges involved in doing such upgradations compared to failure events. 
In this post, want to discuss my experience and learning dealing with the same.

Background : Fault Tolerance, Checkpointing, DirectKafka  

Its always better to start first with some background before jumping directly to the problem statement itself.
Fault tolerance means dealing with failure scenarios where application goes down and has to be restarted for processing. Upgrading means intentional restart of the system to deploy new changes . One common thing between both situations is that both need System Restart . But the caveat is both cannot be done same way.
For Fault Tolerance, Spark Streaming came up with Checkpointing : https://spark.apache.org/docs/1.5.1/streaming-programming-guide.html#checkpointing 
Checkpointing keeps on persisting necessary information of the streaming system like metadata and state data to reliable storage system (disk/hdfs/S3) at regular intervals. Using Kafka with DirectKafka approach for ingestion makes the system robust as it avoids storing actual data and stores just the Kafka offsets which can be used to replay the kafka messages post restart. (Unlike inefficient receiver based approach, more details at : http://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html )
For enabling Checkpointing and DirectKafka, user needs to define a checkpoint directory in code , call KafkaUtils.createDirectStream API to get kafka messages and then call StreamingContext.getOrCreate API to get context either from the checkpoint directory or create a new one. 
The code for Checkpointing + DirectKafka looks like this :

With the above minimal code change, Spark Streaming can give Exactly-Once processing provided the nature of processing is idempotent or like atomic transaction . For example in my case, the processing nature was idempotent as the stream processing was doing computation and then doing insert/update in database based on primary key.


Challenge with Upgradation :

The reason i started with that much details on Fault Tolerance and Checkpointing is because many people while trying to do Upgradation first time in running Spark Streaming can be mistaken. I did the initial mistake myself by assuming I can just stop the application gracefully, deploy the new jar with code/configuration changes and checkpointing should take care of the rest. This is a wrong assumption. The code changes may/may not be reflected or even worse some exception can come. The configuration changes will not be picked up at all.
Saw similar questions on stackOverFlow where people were trying to use checkpointing for upgradation :
Its important to understand that sole objective of Checkpointing is fault tolerance. When we enable checkpointing, it basically means that Spark Streaming will save the Kafka Offsets as well as entire metadata like configuration changes and stateful data like updateStateByKey in persistent storage . So on restart, Kafka partition Offsets  will be fetched from storage and executors will ask Kafka brokers to start replaying data from those offsets. But at the same time, SparkConf object will also be built by deserialising the saved metadata from the disk ( check StreamingContext.getOrCreate() method implementation: https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ) . Streaming Context will be created using the same deserialised SparkConf object . Once streaming context is up, no changes can be done in SparkConf to be that can reflect in streaming context. So basically updated configuration changes can not be considered.
In short, we can say that we can rely on checkpointing for Failure Restart but we cannot rely on checkpointing for Upgradation Restart. 


Solution for Upgradation :

So How can we do Upgradation without checkpointing ?
Easiest way is to delete the checkpoint directory itself before deployment so that fresh streaming context is created. But its not a good solution as kafka offsets will also be deleted and then data from kafka will be fetched either from the latest offset or the smallest offset in kafka broker depending upon auto.offset.reset parameter. Either it will mean hell lot of re-computations or substantial data loss which might not be acceptable.

So How can we do Upgradation efficiently with Zero-Data-Loss Exactly-Once semantics ?
Start new application in parallel with the old one and after sometime bring down the old one. 
Shutdown the old application gracefully and start new one from the same point where it left with help of source-side buffering support from Kafka.
The 1st solution is not practically feasible in resource limited environments. Also too much data might get doubly processed in parallel systems to avoid zero data loss. In short, a bad solution.
The 2nd solution is the correct approach but there is no concrete solution example given in the documentation. It basically means to write API for custom management of Kafka offsets for persisting while processing and reading during a restart .
I am sharing here code for 2nd solution which uses overridden KafkaUtils.createDirectStream API.
We  also need to configure light weight Zookeeper which will be used for persisting and reading Kafka offsets reliably. In my case, writing kafka offsets of all the partitions to zookeeper was taking around 10-20 milliseconds which was quite acceptable.



Code Explanation :

  • Create a zookeeper client with necessary details.
  • Try reading kafka offset from Zookeeper. If its first time and there is no kafka offset in zookeeper, default KafkaUtils.createDirectStream API will be called . Data from latest kafka offsets for topic partitions will be fetched from Kafka brokers .
  • Save the starting offsets for each RDD partition in each batch of the stream right away to zookeeper before starting processing for the corresponding batch. Its important to know that Kafka topic partition is one to one mapped with Spark RDD partition.
  • On subsequent application restarts, starting offsets will be retrieved from zookeeper just like it used to be fetched from checkpoint directory but this time, without any additional metadata info. Streaming Context will be created with the fresh SparkConf object and all code/configuration changes will be considered.

However this solution too has limitations which should be kept in mind:
  • Cannot be used if state management like updateSateByKey functionality is required. Because these additional info need to be saved essentially means checkpointing . In my case, state management was not needed so this solution fitted quite well. 
  • Solution for this also seems possible by having a flag at start which can decide whether to use checkpointing or custom offset management. But I have not tried it personally as it was not needed in my case.
  • Additional persistent system like zookeeper comes in picture and needs to be configured.
  • Cannot be used if Kafka topics/number of partitions change as offsets of each spark partition is one to one mapped with kafka partition.
  • Can only be used with advanced ingestion system like Kafka,flume with source side buffering support.


References:
http://aseigneurin.github.io/2016/05/07/spark-kafka-achieving-zero-data-loss.html
https://spark.apache.org/docs/1.5.1/streaming-programming-guide.html#upgrading-application-code
https://spark.apache.org/docs/1.5.1/streaming-programming-guide.html#checkpointing
http://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala






5 comments:

  1. Hello Chandan,
    Good articulation. This helps to go in step by step. Keep up the great work and I wish many more to come. People who are looking for advanced interview questions on Spark Go here

    ReplyDelete
  2. Hi Chandan,

    Can you please explain me .. how to read data from multiple topics...

    first i was think about for loop ..but problem is different-2 kafka direct stream according to topic.. am i right?..

    If possible, so please can you explain little..bit...

    Thanks In Advance...

    ReplyDelete
  3. Now i can handle zero data loss in multiple topic env...

    ReplyDelete
  4. Hi Chandan Prakash...Suppose If I want to re-process the data from particular offset like 0, then How do I achieve it using this kind of Zookeeper Storage ?

    ReplyDelete
  5. Hi Chandan,
    Nice explanation on offset management, but while reading the blog something came up in my mind. For example there are 100 records which we consumed from kafka topic and as soon as we received it stored the offset information to the zookeeper and looped RDD to implement our own computation, what in case if the computation failed at 50th record? How we gonna handle this situation?
    Please help me in understanding this scenarion.

    Thanking you in advanace

    ReplyDelete