Optimizations In Spark: For BETTER OR For WORSE

Reading Time: 5 minutes

NO need to guess, Yes, we are talking about Spark SQL catalyst optimizer here. I am sure all of you have read through various blogs that describe the advantages of Spark SQL in a very neat and pretty way (at least those who are interested in working with Spark and Structured Data). In precise, what I want to tell you is that there are lots of blogs available that are there to tell you why should you be using the Spark SQL and what optimizations bring to you out of the box.

It seems too much one-sided, isn’t it? Like Spark is some magic thing and just writing SparkSession in your code will be enough to process that “Big Data” you gathered from IoT devices. So today, we will be looking into some issues that I faced while using Spark SQL. So let’s get started with it.

This image has an empty alt attribute; its file name is untitled-diagram.png

What is Spark SQL?

Let’s begin with a brief basic overview of Spark SQL. If you already possess a familiarity with it then you can skip this part.

Spark SQL is a module in the Spark ecosystem which is powered by its core engine: Spark Core. It handles the processing of structured data (streaming and batch). The Spark SQL inherits the functionality of lazy evaluation from the Spark Core module. Which lets it create the logical plan including all the other dependent queries for the query to be executed. The creation of the logical plan gives the Spark SQL a scope for adding an optimization using Catalyst Optimizer throughout the long logical plan and optimize it to create multiple optimized physical plans and choosing the least costly physical plan among them. The below image briefly touches the phases of query execution in the Spark SQL

Optimizations In Spark

So what are these optimizations that we are talking about, and What is Catalyst Optimizer?

Features!

The good results that we are about to get =>

  1. Lazy Evaluation
  2. Filter pushdown
  3. Column Pruning
  4. Adaptive Execution

Issues!

I am sure the optimizations make the calculation time very short and these optimizations are implemented in such a way that you just have to provide the logic and everything else will be done in abstraction. But as my friend and colleague Ramandeep says “Abstract features come with abstract issues”. So following are the few issues which I have faced in my recent interaction with Spark SQL:

  1. Too large of a query to be stored in memory
  2. Implicit optimizations interfere with partitioning

1. Query being too large

When there are numerous joins and filtering happening for the resulting DataFrame, the query gets huge. What I observed in such instances, when the action is called, there was no job created on the Spark UI for a while. This signified that execution didn’t start. After a few minutes, I found the following exception on the logs:

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$generateTreeString$1.apply(TreeNode.scala:544)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$generateTreeString$1.apply(TreeNode.scala:543)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:543)
	at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570)
	at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570)
	at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570)
	at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570)
	at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570)

Debug

The above exception occurred because the query was simply too large and it couldn’t be stored in memory. This was the result of adding the executions block into a single pipeline and hoping for it to be optimized. Increasing the memory would be a good solution many would say but this still won’t make up for the time it was taking just to build that large query. Imagine if this time will be added to the SLA of application where the cluster is sitting idle and waiting till Spark creates the query and optimize it and only after that execution will start.

Solution

Simple solution for it to break the DAG in multiple parts. Following are some simple ideas for it:

Checkpoint: Use the checkpoint feature of DataFrame which creates, executes the query, and saves the result till the checkpoint on some storage (HDFS/S3 etc). To use this feature a checkpoint directory has to be provided to spark in advance.

spark.sparkContext.setCheckpointDir(path)

Trade-offs of using Checkpoints are obvious. You’re breaking DAG but I/O is happening to some storage. Large data => More I/O
This type of checkpointing is also called Reliable Checkpointing as it uses a reliable data storage.

LocalCheckpoint: Another way to break DAG into parts is to use localCheckpoint on a DataFrame. It is similar to the first point but it saves the output to local storage of an executor. Hence it is called Local Checkpointing. This removes the need of setting a checkpoint directory in the Spark. Also, this gives the advantage over fist point as the data doesn’t travel over the network and stays on local storage.

df.localCheckpoint //local checkpointing
//or
df.checkpoint //reliable checkpointing

Recreating DF using underlying RDD: One more, a bit simpler way is to recreate the DataFrame from the underlying RDD of a DataFrame and its schema. Using the following snippet:

spark.createDataFrame(df.rdd, df.schema)

Above creates a DataFrame using the RDD of previous DataFrame and this breaks the DAG at this point.

2. Implicit optimizations interfere with partitioning

If the input data is in one single partition and you want to run some transformation in parallel, using a Spark cluster, on it which are heavy but not complex and save the data in a single partition. To process it in parallel, we have to repartition it into, let’s say, 20 partitions then after processing we have to coalesce it into single partition. The code would be something like this:

df2.repartition(20).select(col("score") * col("score") as "data").coalesce(1).count

Now if we see the Spark UI:

In the above screenshot we can see that there are only 2 tasks in the Job Id 1. But shouldn’t it be around 20 or something? This simply implies that when we will be running the code on the cluster, it will not partition the data and all the execution will happen on a single core sequentially.

Debug

Let’s dig a bit deeper and see the execution plan:

== Parsed Logical Plan ==
Aggregate [count(1) AS count#114L]
+- Repartition 1, false
   +- Project [(cast(score#10 as double) * cast(score#10 as double)) AS data#110]
      +- Repartition 20, true
         +- Relation[......C#33,... 22 more fields] csv

In logical plan we can see that it is well according to the execution in mind but now a glance on the Physical Plan:

== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[count(1)], output=[count#114L])
+- *(2) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#117L])
   +- Coalesce 1
      +- Exchange RoundRobinPartitioning(20)
         +- *(1) FileScan csv [] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/.../temp-spark1], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<>

Here in the physical plan we can see that the coalesce is pushed downward. What happens here is that optimizer pushes the coalesce downward as the operation happening just before the coalesce isn’t complex (narrow transformation).

Solution

Repartition the dataframe on the basis of some id could be one way to workaround this problem:

df2.repartition(20, col("id")).select(col("score") * col("score") as "data").coalesce(1).count

Conclusion

The Spark SQL really optimizes the queries with some built-in features but there are some limitations and we have seen a few examples. That means, we can’t depend on the abstracted optimizations provided by Spark SQL in every situation. I’ll be extending the blogs and will be adding more issues.

This blog post intends to share some of the problems I faced while using Spark SQL. Please share the issues you faced and your thoughts too in comments.

Follow me for more such blog notifications on:

Thank You 🙂

Written by 

Anuj Saxena is a software consultant having more than 1.5 years of experience. Anuj has worked on functional programming languages like Scala and functional Java and is also familiar with other programming languages such as Java, C, C++, HTML. He is currently working on reactive technologies like Spark, Kafka, Akka, Lagom, Cassandra and also used DevOps tools like DC/OS and Mesos for Deployments. His hobbies include watching movies, anime and he also loves travelling a lot.

3 thoughts on “Optimizations In Spark: For BETTER OR For WORSE7 min read

  1. You mentioned as Operation happening before coalesce is not complex. What do u mean by complex ? And why coalesce is pushed down ,u didnot specify there

Comments are closed.

%d bloggers like this: