Having trouble optimizing your Spark application? If yes, then this blog will surely guide you on how you can optimize it and what parameters should be tuned so that our spark application gives the best performance.
Spark applications can cause a bottleneck due to resources such as CPU, memory, network etc. We need to tune our memory usage, data structures tuning, how RDDs need to be stored i.e. data serialization.
Following parameters must be kept in mind while tuning the Spark application –
Data Serialization – Serialization plays an important role in increasing the performance of any application. Spark provides two serialization libraries –
- Java Serialization: By default, spark uses Java’s ObjectOutputStream framework which can work with any class that implements java.io.serializable. This serialization is flexible but slow and creates large serialized formats for many classes.
- Kryo Serialization: Spark can use Kryo library to serialize objects. It is much faster and compact but does not support all serializable types. So we must register those classes which we want to be serialized. Therefore, Kryo uses indices instead of full class names to identify data types which reduce the size of the serialized data thereby increasing performance. We can initialize our spark conf by setting the value of the property spark.serializer to org.apache.spark.serializer.KryoSerializer. This serializer has a major impact on performance when we are shuffling or caching a large amount of data. To know more about this serializer, refer Kryo documentation
Memory Tuning – Memory tuning depends on the amount of memory used by objects, cost of accessing them and overhead of GC. First, determine the memory used by RDD on the storage tab in Spark UI. Points to remember while tuning GC can be read from point 3. We can also tune the data structures used in the application by keeping the following points in mind :
- Avoid nested data structures.
- Prefer using an array of objects or primitive types rather than Java or Scala collections.
- Try to use numeric IDs or enumeration objects for keys instead of String.
- Set the JVM flag UseCompressedOops to make pointers to be four bytes instead of eight if you have RAM less than 32G.
Garbage Collection Tuning – Following points must be kept in mind –
- Use data structures with fewer objects as the cost of GC is directly proportional to the number of objects.
- Persist objects in the serialized form.
- Check if there are multiple full GC before task completes then there is not enough memory available to execute the task.
- Use G1GC garbage collector.
- Set the size of Eden space according to how much memory each task needs. You can use the formula -Xmn=4/3*E to set Young generation space where E stands for Eden space.
- Reduce the amount of memory being used for caching by lowering spark.memory.fraction, if OldGen is getting full. Try caching few objects than slowing task execution.
Level of Parallelism – A cluster will not be fully utilized until the level of parallelism for each operation is not high enough. It is recommended to use 2-3 tasks per CPU core. This can be done by setting the value of the configuration property spark.default.parallelism. By default, if we use “reduce” operations, then the value of this property is the number of parent RDD partitions. Also for cluster mode, it should be equal to the total number of cores in the cluster.
Data Locality – This parameter refers to that both data and code must be together to increase performance. Data locality means that how close data is to the code. Spark handles the locality level itself but sometimes it needs to wait for a CPU to be free to start a task or move the data far away to start a new task. This wait timeout can be configured at each locality level by using the configuration property spark.locality. There are
different levels of locality based on data’s current location which is defined below from closest to farthest:
- The best locality is PROCESS_LOCAL as the data is in the same JVM as the code.
- NODE_LOCAL which means data is on the same node.
- NO_PREF which means data is accessed equally quickly from anywhere.
- RACK_LOCAL which means data is on the same rack of servers. Data can be on a different server on the same rack which is sent over the network.
- ANY which means data is anywhere on the network, not on the same rack.
Broadcasting – Use a broadcast variable to store large objects which have the size larger than 20KB. Broadcasting functionality reduces the size of each serialized task and it also allows the broadcast variable to be read-only and cached on each node once which becomes helpful in case of large objects.
Partitioning – This parameter plays a major role in performance enhancement because if we have
- Too few partitions, resources will be underutilized.
- Too many partitions, overhead in managing small tasks
Therefore, we need to partition our data reasonably so that both the resources can be utilized as well as there is no overhead in managing small tasks. We must divide our data into enough number of partitions such that each of them has close enough size.
For example, if you partition your data based on user’s country, this will lead to unequal partitions as some countries will have more users which will result in longer processing of tasks as compared to those partitions with less no. of users. They will stay idle as they would have completed there processing.
So choose appropriate key based on your use case so that the partitions have almost similar size.
So above are the few parameters which one can remember while tuning spark application. Also one can only achieve an optimized performance of their spark application by continuously monitoring it and tuning it based on the use case and resources available.
Hope this blog was helpful. Thanks for reading it..!!
- Spark documentation – https://spark.apache.org/docs/latest/tuning.html