Apache Spark : Handle null timestamp while reading csv in Spark 2.0.0


Hello folks, Hope you all are doing good !!!

In this blog, I will discuss a problem which I faced some days back. One thing to keep in mind that this problem is specifically related to Spark version 2.0.0. Other than this version, this problem does not occur.

Problem : Spark code was reading CSV file. This particular CSV file had one timestamp column that might have null values as well. So when Spark tried to read the CSV, it was throwing error whenever it gets null values for the timestamp field. So I needed the solution which can handle null timestamp fields.

You can find the code snippet below :


import org.apache.spark.sql.SparkSession

object CsvReader extends App {

val sparkSession = SparkSession.builder()
 .master("local")
 .appName("POC")
 .getOrCreate()

val df = sparkSession.read
 .format("com.databricks.spark.csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("test.csv")

df.printSchema()
 df.show()
}

You can see easily that the above code is inferring the schema while reading the csv file.

Solution: To solve the above problem, we need to follow the below approach:

  1. Need to provide custom schema where timestamp field must be read as String type.
  2. Then, Cast the timestamp field explicitly.

By using the above approach, we can solve the null timestamp field issue. But there is one thing to notice that we must have known already the field which is timestamp in CSV and the schema for the whole CSV file. Only then we would be able to cast that field from String to timestamp expicitly and would maintain the original schema for the file.

In my case, I am taking below CSV file : test.csv

a

The schema for the CSV file is as :

ID : String, PHONE : Integer, BIRTH_DT : Timestamp

The soultion code must be as follows :

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, unix_timestamp}
import org.apache.spark.sql.types._

object CsvReader extends App {

  val sparkSession = SparkSession.builder()
    .master("local")
    .appName("POC")
    .getOrCreate()

  val schema = StructType(List(
    StructField("ID", StringType),
    StructField("PHONE", IntegerType),
    StructField("BIRTH_DT", StringType)
  ))

  val df = sparkSession.read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .load("test.csv")

  val columnName = "BIRTH_DT"
  val updatedDF = df.withColumn(columnName, unix_timestamp(col(columnName), "yyyy-MM-dd HH:mm:ss").cast("timestamp"))

  updatedDF.printSchema()
  updatedDF.show()
}

 

That’s it. I hope this blog will be helpful to you as well.

Happy Blogging !!!


KNOLDUS-advt-sticker

Advertisements

About Rishi Khandelwal

Sr. Software Consultant having more than 6 years industry experience. He has working experience in various technologies such as Scala, Java, Play, Akka, Spark, Hive, Cassandra, Akka-http, ElasticSearch, Backbone.js, html5, javascript, Less, Amazon EC2, WebRTC, SBT
This entry was posted in apache spark, big data, Scala, Spark. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s