How To Read From HDFS & Persist In PostgreSQL Via Spark?

process-diagram
Reading Time: 4 minutes

In this post, we will be creating a Spark application that reads and parses CSV file stored in HDFS and persists the data in a PostgreSQL table.

So, let’s begin!

Firstly, we need to get the following setup done –

  • Running HDFS on standalone mode (version 3.2)
  • Running Spark on a standalone cluster (version 3)
  • PostgreSQL server and pgAdmin UI

SETUP :

HDFS

Running HDFS on standalone mode is simple. You just have to give the proper entries in hdfs-site.xml and core-site.xml config files. After you’ve done with that, all Hadoop services can be started via the following command from the Hadoop base folder –

./sbin/start-all.sh

Once the name node and data node along with other services are up, you should be able to see the following on the web-hdfs url you configured. Default web-hdfs URL is – localhost:9870/ for newer versions of Hadoop.

HDFS web UI

CSV File

Next, we need to copy our CSV file from our local system to HDFS. My CSV file looks like this when viewed as a spreadsheet. In fact, the values in each line of this username.csv file are separated by semi-colons.

UsernameIdentifierFirst nameLast name
booker129012RachelBooker
grey072070LauraGrey
johnson814081CraigJohnson
jenkins469346MaryJenkins
smith795079JamieSmith

The first line represents the column names that will eventually become the columns in our PostgreSQL table. We can use the following command to copy the file to HDFS directory.

hdfs dfs -put /Users/rahulagrawal/Desktop/username.csv /user/username.csv

Here, the first argument is the location of the file on local and the second argument is the directory path on HDFS (in my case this is /user/). After doing this you should be able to see the file in the directory you specified. You can navigate to it by browsing the file system on the Web UI.

SPARK

Now, we have to run a standalone spark cluster. But first let’s add the PostgreSQL JDBC jar in SPARK_DIR/jars folder so that the driver may be recognised. Download the latest JDBC driver(I’m using version 42.2.14) here – https://jdbc.postgresql.org and paste in the jars folder. Now to start spark, just use the following commands to start the master and worker processes from your spark base directory.

./sbin/start-master.sh

Once started, the master will print out a spark://HOST:PORT URL for itself, which you can use to connect workers to it, or pass as the “master” argument to SparkContext. You can also find this URL on the master’s web UI, which is localhost:8080 by default. Using this, create a worker as follows :

./sbin/start-slave.sh <master-spark-URL>

In my case this master-spark-URL is : spark://Rahuls-MacBook-Air.local:7077

So now our spark cluster is also up and we can use it to run jobs.

PostgreSQL

Next, you should start your pgAdmin for starting PostgreSQL server that our spark app will connect to. Just double click on pgAdmin in applications and login as your user. (default user is postgres and password you must have set during installation).

Finally, all our setup is done and we can move to coding.

CODING:

Create a sbt project named SparkApp in Intellij. Add the following dependencies to build.sbt file –

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "3.0.0",
  "org.apache.spark" %% "spark-sql" % "3.0.0",
  "org.postgresql" % "postgresql" % "42.2.14"
)

Next, create a Scala class named SparkApp in src/main/scala directory. Paste the following in the class –

import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scala.util.Properties

object SparkApp {
  
  def main(args: Array[String]): Unit = {

    val spark_conf = new SparkConf()
      .setMaster("spark://Rahuls-MacBook-Air.local:7077")
      .setAppName("SparkApp")


    val spark_session = SparkSession.builder()
      .config(spark_conf)
      .getOrCreate()

    //CSV file location on HDFS
    val hdfs_file_location = "hdfs://localhost:9000/user/username.csv"

    //Postgres jdbc connection URL
    val postgres_url = "jdbc:postgresql://localhost:5432/postgres"

    //Reading from HDFS
    val username_df = readFromHDFS(hdfs_file_location, spark_session)

    //Writing to PostgreSQL
    writeToPostgreSQL(postgres_url, spark_session, username_df)

  }

  def readFromHDFS(file_location: String, spark_session: SparkSession): DataFrame ={

    val username_df = spark_session.read.format("csv")
      .option("delimiter",";")
      .option("header","true")
      .option("inferSchema", "true")
      .load(file_location)

    username_df

  }

  def writeToPostgreSQL(postgres_url: String , spark_session: SparkSession, username_df: DataFrame): Unit = {

    val connection_props = new Properties()
    connection_props.setProperty("driver", "org.postgresql.Driver")
    connection_props.setProperty("user", "postgres")
    connection_props.setProperty("password", "postgres")

    val table_name = "public.Username"

    //Passing in the URL, table in which data will be written and relevant connection properties
    username_df.write.mode(SaveMode.Append).jdbc(postgres_url,table_name,connection_props)

  }
  
}

First, we create a spark-config object where we mention our app name and spark master url which is the same you used to run spark worker. Put this url in setMaster() function so that our app can connect to the spark master running on our standalone cluster.

We then pass on this context to create spark-session object that will then read from HDFS and write to PostgreSQL. For this I have created 2 functions – readFromHDFS and writeToPostgreSQL and passed the relevant arguments.

Read From HDFS (readFromHDFS):

This function takes in the url of the csv file located on HDFS. In my case this url is: hdfs://localhost:9000/user/username.csv. The function then uses data frame reader to read a csv file to a Dataframe with many options. The header option tells spark that the first line will consist of headers and not data. The delimiter option is specified to the delimiter used in the CSV file and the inferSchema is a helpful option which when true, allows spark to infer the schema of the data in CSV according to the values present. We finally return this Dataframe to be passed to the other function.

Write to PostgreSQL (writeToPostgreSQL):

This function takes in the Dataframe created above, the JDBC url for your PostgreSQL database which in my case is jdbc:postgresql://localhost:5432/postgres along with a spark-session object. The function then uses the jdbc method from data frame writer like username_df.write.mode(SaveMode.Append).jdbc(postgres_url,table_name,connection_props) of the Dataframe that takes the URL passed, table name with schema and the connection properties of the database. This jdbc function saves the content of the Dataframe to external database table. The append save mode tells Spark to append the data if the table already exists.

Finally, you should right click on SparkApp.scala and select Run. This should build the application and run it on the spark cluster. You also should be able to see an application id under Completed Applications on Spark master UI.

RESULT:

Now, open your pgAdmin, refresh the schema and hit the query shown below. You can see that your CSV data is now persisted to a table in PostgreSQL.

PostgreSQL table

CONCLUSION:

In this blog, we saw how we can run a Spark job that takes CSV data from HDFS and dumps it to a table in PostgreSQL.

REFERENCES:

My references include:

https://blog.knoldus.com/spark-createdataframe-vs-todf/
https://spark.apache.org/docs/latest/spark-standalone.html
https://severalnines.com/database-blog/big-data-postgresql-and-apache-spark

Knoldus-blog-footer-image