Using Spark as a Database

Reading Time: 4 minutes

You must have heard that Apache Spark is a powerful distributed data processing engine. But do you know that Spark (with the help of Hive) can also act as a database? So, in this blog, we will learn how Apache Spark can be leveraged as a database by creating tables in it and querying upon them.

Introduction

Since Spark is a database in itself, we can create databases in Spark. Once we have a database we can create tables and views in that database.

The table has got two parts – Table Data and Table Metadata. The table data resides as data files in your distributed storage. The metadata is stored in a meta-store called catalog. It includes schema info, table name, database name, column names, partitions and the physical location of actual data. By default, Spark comes with an in-memory catalog which is maintained per session. To persist it, Spark uses Apache Hive meta-store.

Spark Tables

In Spark, we have 2 types of tables-
1. Managed Tables
2. Unmanaged or External Tables

For managed tables, Spark manages both the table data and the metadata. It means that it creates the metadata in the meta-store and then writes the data inside a predefined directory location. This directory is the Spark SQL warehouse directory which is the base location for all the managed tables.
If we delete a managed table, Spark deletes both the metadata and table data.

Now, let’s come to the unmanaged tables. These are same as managed tables w.r.t metadata but differ in terms of data storage location. Spark only creates the metadata for these in the meta-store. When creating unmanaged tables, we must specify the location of the data directory for our table. This gives us the flexibility to store the data at a preferred location. These are useful when we want to use Spark SQL on some pre-existing data. If we delete an unmanaged table, Spark only deletes the metadata and doesn’t touch the table data. Makes sense!

We have some added benefits with managed tables such as bucketing and sorting. Future Spark SQL enhancements will also target managed tables and not unmanaged tables. As such, the rest of our discussion will only target managed tables.

Why use Spark Tables?

In Spark, when we ingest data from a data source, we have 2 options –
1. Save the data as a data file in formats such as Parquet, Avro etc.
2. Save the data in a Table

In the first case, whenever we want to re-access the data we must use the DataFrameReader API and read it as a DataFrame. However, Spark is a database also. So, if you create a managed table in Spark, your data will be available to a whole lot of SQL compliant tools. Spark database tables can be accessed using SQL expressions over JDBC-ODBC connectors. So you can use other third-party tools such as Tableau, Talend, Power BI and others. However, plain data files such as Parquet, Avro, CSV, JSON etc. are not accessible through JDBC-ODBC interface.

Finally, let’s create a Spark managed table and query upon it using Spark SQL.

Coding

First of all, create a Scala sbt project and include the following dependencies in build.sbt file. We are using Spark version 3 for this project and Scala version 2.12.10 that goes along with it.

name := "SparkTableDemo"
version := "0.1"
scalaVersion := "2.12.10"

autoScalaLibrary := false
val sparkVersion = "3.0.0-preview2"

val sparkDependencies = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion
)

libraryDependencies ++= sparkDependencies

We have also include spark-hive dependency as we would need it for the meta-store. Remember?

Next, let’s come to the actual code –

import org.apache.log4j.Logger
import org.apache.spark.sql.{SaveMode, SparkSession}

object SparkSQLTableDemo extends Serializable {
  @transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("Spark SQL Table Demo")
      .master("local[3]")
      .enableHiveSupport()
      .getOrCreate()

    val userInfoDF = spark.read
      .format("csv")
      .option("path", "dataSource/")
      .option("delimiter",";")
      .option("header","true")
      .option("inferSchema", "true")
      .load()

    import spark.sql
    sql("CREATE DATABASE IF NOT EXISTS MY_DB")
    sql("USE MY_DB")

    userInfoDF.write
      .mode(SaveMode.Overwrite)
      .format("csv")
      .saveAsTable("MY_DB.user_info")

    logger.info("Now you can query whatever you want from the table...!")
    sql("Select * from MY_DB.user_info").show()

    spark.stop()
  }
}

Explanation

Starting off, we create a Spark session on a local cluster with 3 threads. We have named the application appropriately and also enable Hive support for the meta-store. Next, we read in a simple CSV file that contains user details and password recovery information for all the users.
Download the CSV file from here: https://support.staffbase.com/hc/en-us/article_attachments/360009197011/username-password-recovery-code.csv. Go through this file so you get an idea of the data we’re dealing with. We have set the format of the data as “csv” as this is a CSV file and path of the directory in which the file resides, which in our case, is dataSource directly inside the project root. We have also used some options such as delimiter that separates one value in the CSV file from next, a header that recognizes the first line as the column names and inferSchema that automatically sets the column types for simple values.

Once we have created a DF out of our data, we need to create a DB in which we will create our managed table. For this, we have used Spark SQL implicit and using that created a DB called “MY_DB”. We also have to set it as the current DB. After this, we use the DataFrameWriter API to save the DF as a table in that DB. The save mode here is “Overwrite” so any pre-existing data in the specified table will be cleared before writing. Then we simply use the .saveAsTable method and pass it the table name(appended to the DB name) which, in this case, is “user_info”. This will create the table and populate it with the DF data. Great!

Lastly, we just need to perform queries on the data using Spark SQL. For eg. I have just queried for all the data using “Select * from ….” query but you are free to perform whatever query you like. Awesome!

Result

Let me just show you the results of the query –

You will also find two new directories – “metastore-db” that contains the metadata that Spark generated for this managed table and “spark-warehouse” that contains one directory for the DB and one for the table, nested inside that inner directory. The table directory shows the data stored in the table.

I hope you found this blog helpful in understanding how we can create managed tables in Spark and query upon them.
Keep learning!

References

  1. https://www.udemy.com/course/apache-spark-programming-in-scala/
  2. https://docs.databricks.com/data/tables.htmlhttps://docs.databricks.com/data/tables.html
  3. https://blog.knoldus.com/getting-started-with-apache-spark-basic/

Knoldus-blog-footer-image