Spark provides two types of serialization libraries: Java serialization and (default) Kryo serialization.
For faster serialization and deserialization spark itself recommends to use Kryo serialization in any network-intensive application. Then why is it not set to default :
Why Kryo is not set to default in Spark?
The only reason Kryo is not set to default is because it requires custom registration. Although, Kryo is supported for RDD caching and shuffling, it’s not natively supported to serialize to the disk. Both the methods, saveAsObjectFile on RDD and objectFile method on SparkContext supports only java serialization.
If you need a performance boost and also need to reduce memory usage, Kryo is definitely for you. The join operations and the grouping operations are where serialization has an impact on and they usually have data shuffling. Now lesser the amount of data to be shuffled, the faster will be the operation.
Caching also have an impact when caching to disk or when data is spilled over from memory to disk.
Also, if we look at the size metrics below for both Java and Kryo, we can see the difference.
Registering a class in Kryo:
To register a class, we simply have to pass the name of the class in the registerKryoClasses method. i.e :
.registerKryoClasses( Array(classOf[Person], classOf[Furniture]) )
What if we don’t register ?
When an unregistered class is encountered, a serializer is automatically choosen from a list of “default serializers” that maps a class to a serializer. Kryo has 50+ default serializers for various JRE classes. If no default serializers match a class, then the global default serializer is used. The global default serializer is set to FieldSerializer by default.
But if you don’t register the classes, you have two major drawbacks, from the documentation:
- There are security implications because it allows deserialization to create instances of any class. Classes with side effects during construction or finalization could be used for malicious purposes.
- Instead of writing a varint class ID (often 1-2 bytes), the fully qualified class name is written the first time an unregistered class appears in the object graph which subsequently increases the serialize size.
So to make sure everything is registered , you can pass this property into the spark config:
Lets look with a simple example to see the difference with the default Java Serialization in practical.
Starting off by registering the required classes.
//class which needs to be registered case class Person(name: String, age: Int) val conf = new SparkConf() .setAppName("kyroExample") .setMaster("local[*]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrationRequired", "true") .registerKryoClasses( Array(classOf[Person],classOf[Array[Person]], Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage")) ) val sparkContext = new SparkContext(conf)
Now, lets create an array of Person and parallelize it to make an RDD out of it and persist it in memory.
val personList: Array[Person] = (1 to 9999999) .map(value => Person("p"+value, value)).toArray //creating RDD of Person val rddPerson: RDD[Person] = sparkContext.parallelize(personList,5) val evenAgePerson: RDD[Person] = rddPerson.filter(_.age % 2 == 0) //persisting evenAgePerson RDD into memory evenAgePerson.persist(StorageLevel.MEMORY_ONLY_SER) evenAgePerson.take(50).foreach(x=>println(x.name,x.age))
After running it, if we look into the storage section of Spark UI and compare both the serialization, we can see the difference in memory usage.
Kryo is using 20.1 MB and Java is using 13.3 MB. So we can say its uses 30-40 % less memory than the default one.
Now, considering that 40% reduce in memory(say 40% of 5 GB, i.e. 2 GB) when looked into the Bigdata world , it will save a lot of cost in the first place and obviously it will help in reducing the processing time.
Metrics for default Java Serialization:
Metrics for Kryo Serialization:
We can see the Duration, Task Deserialization Time and GC Time are lesser in Kryo and these metrics are just for a small dataset. So, when used in the larger datasets we can see more differences.
For example code : https://github.com/pinkusrg/spark-kryo-example