Spark: Type Safety in Dataset vs DataFrame

fetching data from different sources using Spark 2.1
Reading Time: 4 minutes

With type safety, programming languages prevents type errors, or we can say that type safety means the compiler will validate type while compiling, and throw an error when we try to assign a wrong type to a variable. Spark, a unified analytics engine for big data processing provides two very useful API’s DataFrame and Dataset that is easy to use, and are intuitive and expressive which makes developer productive. One major difference between these two API’s is Dataset is type-safe whereas DataFrame is not type-safe.

In this blog, we will see why Dataframes are not type-safe but Dataset API provides type safety. We will see how it affects a spark application developers in three contexts:

  1. When they apply lambda expression in filter or map function.
  2. Querying on non-existing columns, and
  3. Whether DataFrames and Datasets preserve schema if converted back to RDD(Resilient Distributed Dataset).

Why Dataframes are not type safe whereas Datasets are type safe.

In Apache Spark 2.0, these two APIs are unified and said we can consider Dataframe as an alias for a collection of generic objects Dataset[Row], where a Row is a generic untyped JVM object. Dataset, by contrast, is a collection of strongly-typed JVM objects.

Spark checks DataFrame type align to those of that are in given schema or not, in run time and not in compile time. It is because elements in DataFrame are of Row type and Row type cannot be parameterized by a type by a compiler in compile time so the compiler cannot check its type. Because of that DataFrame is untyped and it is not type-safe.

Datasets on the other hand check whether types conform to the specification at compile time. That’s why Datasets are type safe.

Now let’s see how type safety affects a spark application developers when they apply lambda expression in filter or map function, querying on non-existing column, and whether these two API’s preserve schema or not when converted back to RDD with coding examples.

case class Employ(name: String, age: Int, id: Int, department: String)

Created a case class Employ with attributes name, age, id, and department. Now create sample employ data.

val empData = Seq(Employ("A", 24, 132, "HR"), Employ("B", 26, 131, "Engineering"), Employ("C", 25, 135, "Data Science"))

Let’s create a RDD from empData.

val empRDD = spark.sparkContext.makeRDD(empData)

Now, Let’s create DataFrame and Dataset from the RDD.

val empDataFrame = empRDD.toDf()
empDataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int ... 2 more fields]
val empDataset = empRDD.toDS()
empDataset: org.apache.spark.sql.Dataset[Employ] = [name: string, age: int ... 2 more fields]

we are now ready with DataFrame and Dataset so, let’s see the different scenario’s a spark developer faces while using these two API’s.

Applying lambda function on DataFrame and Dataset

val empDatasetResult = empDataset.filter(employ => employ.age > 24)

Above, we have just filter employ Dataset with age greater than 24. See the output, it work fine.

Let’s see how DataFrame reacts when applying lambda function on it.

val empDataFrameResult = empDataFrame.filter(employ => employ.age > 24)

Oops! we get an error, value is not a member of Row object. In the case of DataFrame when we apply lambda function it returns a Row object and to access column value from Row object we need to typecast out there, simply giving column name won’t allow us to access column value out there. But we can access column value by lambda function and in the case of DataFrame need a change in code something like:

val empDataFrameResult = empDataFrame.filter(employ => employ.getAs[Int]("age") > 24)

Querying on non existing column

Now, let’s see how DataFrame and Dataset behave differently when querying on non-existing column.

Let’s query on a salary column which is not present in DataFrame.

val empDataFrameResult1 = empDataFrame.select("salary")
org.apache.spark.sql.AnalysisException: cannot resolve '`salary`' given input columns: [age, department, id, name];;
'Project ['salary]

And we will get a Runtime error, salary cannot be resolve in given input columns: [age, department, id, name] and thrown AnalysisException.

In the case of Dataset we have the opportunity to get that error in compile time itself.

val empDatasetResult1 = empDataset.map(employ => employ.salary)
<console>:25: error: value salary is not a member of Employ
       val empDatasetResult1 = empDataset.map(employ => employ.salary)
                                                               ^

It throws a compile time error, value salary is not a member of Employ.

Preserving schema or not when converted back to RDD

Now let’s have a look whether DataFrame and Dataset Preserve schema when converted back to RDD.

Let’s Create RDD from the DataFrame

val rddFromDataFrame = empDataFrame.rdd
rddFromDataFrame: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[11] at rdd at <console>:25

It returns a Row of RDD and if we try to do any normal operation on RDD we should not be able to do it in a normal way for instance,

rddFromDataFrame.map(employ => employ.name).foreach(println)
<console>:26: error: value name is not a member of org.apache.spark.sql.Row

It gives an error, value name is not a member of Row object. So, in this case DataFrame couldn’t preserve schema.

Now let’s see what happen when we do same thing with Dataset.

val rddFromDataset = empDataset.rdd
rddFromDataset: org.apache.spark.rdd.RDD[Employ] = MapPartitionsRDD[14] at rdd at <console>:25

It returns RDD of Employ so, in this case we should be able to do normal RDD operations on that RDD.

rddFromDataset.map(employ => employ.name).foreach(println)

We will get output:
A
B
C

So, Dataset will preserve schema when converting back to RDD.

Happy Reading !!

References