How to convert Spark RDD into DataFrame and Dataset

Developing programming and coding technologies working in a software engineers.
Reading Time: 4 minutes

In this blog, we will be talking about Spark RDD, Dataframe, Datasets, and how we can transform RDD into Dataframes and Datasets.

What is RDD?

A RDD is an immutable distributed collection of elements of your data. It’s partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.

RDDs are so integral to the function of Spark that the entire Spark API can be considered to be a collection of operations to create, transform, and export RDDs. Every algorithm implemented in Spark is effectively a series of transformative operations performed upon data represented as an RDD.

What is Dataframe?

A DataFrame is a Dataset that is organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. In Scala and Java, a DataFrame is represented by a Dataset of Rows. In the Scala API, DataFrame is simply a type alias of Dataset[Row]. While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

What is Dataset?

A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.

Dataset can be constructed from JVM objects and then manipulated using functional transformations (mapflatMapfilter, etc.). The Dataset API is available in Scala and Java. Python does not have support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName).

Working with RDD

Prerequisites: In order to work with RDD we need to create a SparkContext object

val conf: SparkConf =

  new SparkConf()

   .setMaster("local[*]")

   .setAppName("AppName")

   .set("spark.driver.host", "localhost")

val sc: SparkContext = new SparkContext(conf)

There are 2 common ways to build the RDD:

* Pass your existing collection to SparkContext.parallelize method (you will do it mostly for tests or POC)

scala> val data = Array(1, 2, 3, 4, 5)

data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(data)

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize 

at <console>:26

* Read from external sources

val lines = sc.textFile("data.txt")

val lineLengths = lines.map(s => s.length)

https://blog.knoldus.com/?p=187442&preview=true

val totalLength = lineLengths.reduce((a, b) => a + b

Things are getting interesting when you want to convert your Spark RDD to DataFrame. It might not be obvious why you want to switch to Spark DataFrame or Dataset. You will write less code, the code itself will be more expressive, and there are a lot of out-of-the-box optimizations available for DataFrames and Datasets.

Working with Dataframe:-

DataFrame has two main advantages over RDD:

Prerequisites: To work with DataFrames we will need SparkSession

val spark: SparkSession =

  SparkSession

    .builder()

    .appName("AppName")

    .config("spark.master", "local")

    .getOrCreate()

First, let’s sum up the main ways of creating the DataFrame:

  • From existing RDD using a reflection

In case you have structured or semi-structured data with simple unambiguous data types, you can infer a schema using a reflection.

import spark.implicits._

// for implicit conversions from Spark RDD to Dataframe

val dataFrame = rdd.toDF()
  • From existing RDD by programmatically specifying the schema
def dfSchema(columnNames: List[String]): StructType =

  StructType(

    Seq(

      StructField(name = "name", dataType = StringType, nullable = false),

      StructField(name = "age", dataType = IntegerType, nullable = false)
    )

  )

def row(line: List[String]): Row = Row(line(0), line(1).toInt)

val rdd: RDD[String] = ...

val schema = dfSchema(Seq("name", "age"))

val data = rdd.map(_.split(",").to[List]).map(row)

val dataFrame = spark.createDataFrame(data, schema)
  • Loading data from a structured file (JSON, Parquet, CSV)
val dataFrame = spark.read.json("example.json")

val dataFrame = spark.read.csv("example.csv")

val dataFrame = spark.read.parquet("example.parquet")
  • External database via JDBC
val dataFrame = spark.read.jdbc(url,"person",prop)

The DataFrame API is radically different from the RDD API because it is an API for building a relational query plan that Spark’s Catalyst optimizer can then execute.

Working with Dataset

The Dataset API aims to provide the best of both worlds: the familiar object-oriented programming style and compile-time type-safety of the RDD API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API.

The idea behind Dataset “is to provide an API that allows users to easily perform transformations on domain objects, while also providing the performance and robustness advantages of the Spark SQL execution engine”. It represents competition to RDDs as they have overlapping functions.

Let’s say we have a case class, you can create Dataset By implicit conversion, By hand.

case class FeedbackRow(manager_name: String, response_time: Double, 

satisfaction_level: Double)
  • By implicit conversion
// create Dataset via implicit conversions

val ds: Dataset[FeedbackRow] = dataFrame.as[FeedbackRow]

val theSameDS = spark.read.parquet("example.parquet").as[FeedbackRow]
  • By hand
// create Dataset by hand

val ds1: Dataset[FeedbackRow] = dataFrame.map {

  row => FeedbackRow(row.getAs[String](0), row.getAs[Double](4), 

row.getAs[Double](5))

}
  • From collection
import spark.implicits._

case class Person(name: String, age: Long)

val data = Seq(Person("Bob", 21), Person("Mandy", 22), Person("Julia", 19))

val ds = spark.createDataset(data)
  • From RDD
val rdd = sc.textFile("data.txt")

val ds = spark.createDataset(rdd)

Leave a Reply