NO need to guess, Yes, we are talking about Spark SQL catalyst optimizer here. I am sure all of you have read through various blogs that describe the advantages of Spark SQL in a very neat and pretty way (at least those who are interested in working with Spark and Structured Data). In precise, what I want to tell you is that there are lots of blogs available that are there to tell you why should you be using the Spark SQL and what optimizations bring to you out of the box.
It seems too much one-sided, isn’t it? Like Spark is some magic thing and just writing SparkSession in your code will be enough to process that “Big Data” you gathered from IoT devices. So today, we will be looking into some issues that I faced while using Spark SQL. So let’s get started with it.
What is Spark SQL?
Let’s begin with a brief basic overview of Spark SQL. If you already possess a familiarity with it then you can skip this part.
Spark SQL is a module in the Spark ecosystem which is powered by its core engine: Spark Core. It handles the processing of structured data (streaming and batch). The Spark SQL inherits the functionality of lazy evaluation from the Spark Core module. Which lets it create the logical plan including all the other dependent queries for the query to be executed. The creation of the logical plan gives the Spark SQL a scope for adding an optimization using Catalyst Optimizer throughout the long logical plan and optimize it to create multiple optimized physical plans and choosing the least costly physical plan among them. The below image briefly touches the phases of query execution in the Spark SQL
Optimizations In Spark
So what are these optimizations that we are talking about, and What is Catalyst Optimizer?
The good results that we are about to get =>
- Lazy Evaluation
- Filter pushdown
- Column Pruning
- Adaptive Execution
I am sure the optimizations make the calculation time very short and these optimizations are implemented in such a way that you just have to provide the logic and everything else will be done in abstraction. But as my friend and colleague Ramandeep says “Abstract features come with abstract issues”. So following are the few issues which I have faced in my recent interaction with Spark SQL:
- Too large of a query to be stored in memory
- Implicit optimizations interfere with partitioning
1. Query being too large
When there are numerous joins and filtering happening for the resulting DataFrame, the query gets huge. What I observed in such instances, when the action is called, there was no job created on the Spark UI for a while. This signified that execution didn’t start. After a few minutes, I found the following exception on the logs:
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) at java.lang.StringBuilder.append(StringBuilder.java:136) at scala.collection.mutable.StringBuilder.append(StringBuilder.scala:210) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$generateTreeString$1.apply(TreeNode.scala:544) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$generateTreeString$1.apply(TreeNode.scala:543) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:543) at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570) at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570) at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570) at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570) at org.apache.spark.sql.catalyst.trees.TreeNode.generateTreeString(TreeNode.scala:570)
The above exception occurred because the query was simply too large and it couldn’t be stored in memory. This was the result of adding the executions block into a single pipeline and hoping for it to be optimized. Increasing the memory would be a good solution many would say but this still won’t make up for the time it was taking just to build that large query. Imagine if this time will be added to the SLA of application where the cluster is sitting idle and waiting till Spark creates the query and optimize it and only after that execution will start.
Simple solution for it to break the DAG in multiple parts. Following are some simple ideas for it:
Checkpoint: Use the checkpoint feature of DataFrame which creates, executes the query, and saves the result till the checkpoint on some storage (HDFS/S3 etc). To use this feature a checkpoint directory has to be provided to spark in advance.
Trade-offs of using Checkpoints are obvious. You’re breaking DAG but I/O is happening to some storage. Large data => More I/O
This type of checkpointing is also called Reliable Checkpointing as it uses a reliable data storage.
LocalCheckpoint: Another way to break DAG into parts is to use localCheckpoint on a DataFrame. It is similar to the first point but it saves the output to local storage of an executor. Hence it is called Local Checkpointing. This removes the need of setting a checkpoint directory in the Spark. Also, this gives the advantage over fist point as the data doesn’t travel over the network and stays on local storage.
df.localCheckpoint //local checkpointing //or df.checkpoint //reliable checkpointing
Recreating DF using underlying RDD: One more, a bit simpler way is to recreate the DataFrame from the underlying RDD of a DataFrame and its schema. Using the following snippet:
Above creates a DataFrame using the RDD of previous DataFrame and this breaks the DAG at this point.
2. Implicit optimizations interfere with partitioning
If the input data is in one single partition and you want to run some transformation in parallel, using a Spark cluster, on it which are heavy but not complex and save the data in a single partition. To process it in parallel, we have to repartition it into, let’s say, 20 partitions then after processing we have to coalesce it into single partition. The code would be something like this:
df2.repartition(20).select(col("score") * col("score") as "data").coalesce(1).count
Now if we see the Spark UI:
In the above screenshot we can see that there are only 2 tasks in the Job Id 1. But shouldn’t it be around 20 or something? This simply implies that when we will be running the code on the cluster, it will not partition the data and all the execution will happen on a single core sequentially.
Let’s dig a bit deeper and see the execution plan:
== Parsed Logical Plan == Aggregate [count(1) AS count#114L] +- Repartition 1, false +- Project [(cast(score#10 as double) * cast(score#10 as double)) AS data#110] +- Repartition 20, true +- Relation[......C#33,... 22 more fields] csv
In logical plan we can see that it is well according to the execution in mind but now a glance on the Physical Plan:
== Physical Plan == *(2) HashAggregate(keys=, functions=[count(1)], output=[count#114L]) +- *(2) HashAggregate(keys=, functions=[partial_count(1)], output=[count#117L]) +- Coalesce 1 +- Exchange RoundRobinPartitioning(20) +- *(1) FileScan csv  Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/.../temp-spark1], PartitionFilters: , PushedFilters: , ReadSchema: struct<>
Here in the physical plan we can see that the coalesce is pushed downward. What happens here is that optimizer pushes the coalesce downward as the operation happening just before the coalesce isn’t complex (narrow transformation).
Repartition the dataframe on the basis of some id could be one way to workaround this problem:
df2.repartition(20, col("id")).select(col("score") * col("score") as "data").coalesce(1).count
The Spark SQL really optimizes the queries with some built-in features but there are some limitations and we have seen a few examples. That means, we can’t depend on the abstracted optimizations provided by Spark SQL in every situation. I’ll be extending the blogs and will be adding more issues.
This blog post intends to share some of the problems I faced while using Spark SQL. Please share the issues you faced and your thoughts too in comments.
Follow me for more such blog notifications on:
Thank You 🙂