Sunday, 11 September 2016

AVRO Data Processing in Spark Streaming + Kafka

For a high load RealTime Processing requirement , Kafka-Spark Streaming combination is an obvious choice today as ingestion-processing pipeline. 
Adding an efficient data serialisation like AVRO to this combination can significantly reduce network traffic from Kafka broker to Spark worker to improve bandwidth usage and can ensure faster processing in Spark Executor to improve CPU resources utilisation.
In current project, I recently switched from Json format input data to Avro data for the above stated reasons and can observe much better throughput.
In this post, will explain how to ingest and process binary Avro data in a SparkStreaming-Kafka system along with code sample.

Short Intro :

Apache Avro is a data serialisation framework developed for big data distributed systems. Avro defines a Schema in advance using Json and subsequently encodes data records in compact binary format keeping the known schema in mind. Its similar to framework like Thrift but with advantages like dynamic typing,untagged data. More details at :
A sample Avro Schema looks like (copied shamelessly from wiki : ) :
   "namespace": "example.avro",
   "type": "record",
   "name": "User",
   "fields": [
      {"name": "name", "type": "string"},
      {"name": "favorite_number",  "type": ["int", "null"]},
      {"name": "favorite_color", "type": ["string", "null"]}

Schema is instantiated as :
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString); //schemaString is json as described above

The Schema defines the field names and data types. The receiver of Avro data needs to know this Schema one time before starting processing, there are 2 ways for that :

  1. When writing Avro data to a file, Schema is stored too in the file which can be read while processing file.
  2. When doing RPC data transfer like Kafka to Spark, Schema is exchanged one time during initial handshake and subsequently only binary Avro data is exchanged.
We will cover the relevant 2nd case in detail in this post.
For better understanding of SparkStreaming and Kafka separately, would recommend to go through my earlier posts on Spark Streaming.

Important Utilities for Avro:

There are 2 useful utilities which help in easy and configurable Avro implementation in Spark Streaming. We should definitely get an idea about them first :
  1. Avro Schema Registry 
  2. Twitter Bijection library 

Avro Schema Registry :

Will keep this section concise. Avro Schema needs to be configurable and not statically defined as in last section. This is needed for easy sharing and handling changes in Schema. There is a module named schema-repo on github by LinkedIn which provides a RESTful service to store and retrieve Schemas   :
In SparkSreaming, we can fetch the Avro Schema from the REST service using httpclient. I have shared the code, please refer to getLatestSchemaByTopic() method at :

Confluent has also open sourced RESTful interface for storing and retrieving Avro Schema, you can access details at :
No matter which library we use, ultimately Avro Schema registry should be configurable to support changes evolving with time.

Twitter Bijection library :

Avro provides APIs to serialize/deserialize data but thats not very friendly. Instead, Twitter provides a Bijection library with APIs to convert back and forth between 2 data types and it is much easier to use in case of Avro as well. Lets see an example to understand.
First, we need to instantiate an Injection object which provides 2 APIs : invert() and apply() . The apply() method converts original record in binary format while invert() method does the opposite.

//creating Injection object for avro
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

// serializing generic avro data record into binary format  :
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("name", "Chandan");
avroRecord.put("favorite_number", 10);
avroRecord.put("favorite_color", "Black");
byte[] bytes = recordInjection.apply(record);

//de-serializing compiled avro data record :
GenericRecord record = recordInjection.invert(bytes).get();
String name = (String) record.get("name");
String color = (String) record.get("favorite_color");
int number = (int) record.get(“favorite_number”);

If you are willing to know more about Bijection, refer  :

Avro + Spark Streaming + Kafka :

Now that we have idea of Avro and associated utilities, its time to write the Spark Streaming program to consume and process Avro data from Kafka. For performance reasons, I have used DirectKafka approach with custom offset management using Zookeeper (for details ,refer my last post : )
First , we need to include Avro dependencies in pom.xml in addition to Spark and Kafka ones  :

Then we need to define a method to get or create DirectKafkaStream . 
Important point to note here is that since the values in kafka messages are now binary, so we need to use DefaultDeocder instead of StringDecoder in method signature and Byte class in place of String class for Kafka message values.

The actual method where all the processing of Avro data will happen will be something like below. 

The above processing logic will be executed for each record under each RDD inside a Spark executor. 
Its important to note here in code is that for each RDD, Avro Schema is fetched and Injection object is instantiated only once before processing the records in that RDD. Also, the code above assumes that even if multiple topics are there, all topics represent same Avro Schema .  

The complete code can be accessed from github repo :

Conclusion :

This was all about how to fetch and process Avro data in Spark Streaming using Kafka. One area which i have not covered is how to add Avro Kafka messages on Kafka Producer side. The code for that should be simple on the same line as serializing generic avro data record into binary format . 


  1. This comment has been removed by the author.

  2. Should we use schema registry? To get the avro data from kafka

  3. same thing can we do trough scala script via terminal