Creating a DataFrame in Apache Spark from scratch

Reading Time: 3 minutes

In Apache Spark, we have what’s called a DataFrame which is the primary abstraction that Spark provides for use. In this blog, we will learn how to create a DataFrame in Spark from scratch.

Introduction

In broad terms, a DataFrame(DF) is a distributed, table-like structure with rows and columns and has a well-defined schema. DataFrames can be constructed from a wide variety of sources such as structured data files, tables in Hive, external databases, or existing RDDs.

What exactly is a DataFrame?

Since the release of the Spark Structured API, a DF is just an alias for a DataSet[Row]. A Row is a generic object that allows us to attach a schema at runtime. So, if we replace the Row object with a specific object then we get a DataSet. The DataSet API is strongly-typed and offers compile-time safety as a result. It also offers lambda based transformations such as map, reduce and foreach. On the other hand, the DataFrame API is a generic-typed API and offers expression based transformations such as select, where, join and orderBy etc. The line between the two is blurred by the Spark implicit conversion. We can also switch between the two using toDF() and toDS() methods. However, it is recommended to use the DataFrame API wherever possible.

Now, that we have understood what exactly is a DF, let’s move onto creating one. This is particularly helpful while writing unit tests since we don’t want to load a lot of records from a data file and maybe need only a few to test our code using a sample DF.

Steps to create a DataFrame from scratch

Following are the 4 steps to create a DF from scratch –

  1. Create a Schema for the DF
  2. Create a list of Row objects
  3. For parallel processing, parallelize the rows to RDD
  4. Create a DF using the above created RDD and Schema

Coding

Let’s start by creating a Scala SBT project with basic Spark dependencies. Here’s a view of what the build.sbt file should look like-

name := "DFDemo"
version := "0.1"
scalaVersion := "2.12.10"
autoScalaLibrary := false
val sparkVersion = "3.0.0-preview2"

val sparkDependencies = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion
)

val testDependencies = Seq(
  "org.scalatest" %% "scalatest" % "3.0.8" % Test
)

libraryDependencies ++= sparkDependencies ++ testDependencies

Next, here is the main code for building our Spark DF from scratch. This is in accordance with the steps mentioned above.

import java.sql.Date
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{DateType, IntegerType, StringType, StructField, StructType}

object DFDemo extends Serializable{

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Dataframe Demo")
      .master("local[3]")
      .getOrCreate()

    //Step 1 
    val studentSchema = StructType(List(
      StructField("Name", StringType),
      StructField("DOB", DateType),
      StructField("Marks", IntegerType)))

    //Step 2
    val studentRows = List(
               Row("Bob", Date.valueOf("1995-10-10"), 93),
               Row("Alice", Date.valueOf("1997-05-04"), 90),
               Row("Jake", Date.valueOf("1998-07-05"), 97),
               Row("Mark", Date.valueOf("1996-04-06"), 92))

    //Step 3
    val studentRDD = spark.sparkContext.parallelize(studentRows, 2)

    //Step 4
    val studentDF = spark.createDataFrame(studentRDD, studentSchema)

    studentDF.printSchema
    studentDF.show

    spark.stop()
  }

}

Explanation

First of all, we create a SparkSession (which is the entry point in our code with 3 threads on a local cluster. Then comes the first step, to create a schema for our student DF (since a DF has a well-defined schema). We do this using a StructType with a list of the type StructField with each object mentioning the name of the column and its datatype. Once we have the schema, we move to the next step and create a list of Rows that acts as the data for our DF. We simply create a list of Row objects, each having 3 values corresponding to the 3 fields – Name, Date of Birth and Marks. So, now we have both the schema and a list of Rows for our DF.

Onto the next step. Since we are running this code on our local, we need to parallelize this using spark.SparkContext.parallelize() method. It accepts 2 parameters. First is the list of Rows which is to be broken down to smaller partitions and second is the number of partitions it needs to be broken down into. Here, we have 4 Rows and we have broken them down into 2 partitions, each having 2 Rows. So, this becomes our student RDD.

Finally, we apply the spark.createDataFrame method which accepts the student schema and the student RDD previously created and generates a DF out of the inputs. We have printed both the schema of the DF and its records.

The schema of the DF is shown below.

And here are the records that the DF contains.

I hope you found this blog helpful. Keep learning!

References

  1. https://spark.apache.org/docs/latest/sql-programming-guide.html
  2. https://blog.knoldus.com/getting-started-apache-spark/

1 thought on “Creating a DataFrame in Apache Spark from scratch4 min read

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading