Apache Spark: Reading csv using custom timestamp format


In this blog, we are considering a situation where I wanted to read a CSV through spark, but the CSV contains some timestamp columns in it. Is this going to be a problem while inferring schema at the time of reading the csv using spark?

Well, the answer may be No, if the csv have the timestamp field in the specific yyyy-MM-dd hh:mm:ss format. In this particular case, the spark csv reader can infer it to timestamp considering it as the default format.

id,name,age,joining_date,wedding_date
1,Joseph,25,1999-09-04 45:50:46,2014-11-22 00:00:00

val csvDataFrame = session.sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("mode","DROPMALFORMED")
.load("<path of the csv file>")

csvDataframe.printSchema()

When you read the schema of the dataframe after reading the csv, you will see that every field have been inferred correctly by the csv,

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- joining_date: timestamp (nullable = true)
 |-- wedding_date: timestamp (nullable = true)

But, what if the timestamp fields in the csv are in some other timestamp format, for e.g: MM-dd-yyyy hh mm ss format.

The content of the csv file will be:

id,name,age,joining_date,wedding_date
1,Joseph,25,09-04-1999 45 50 46,11-22-2014 00 00 00

In this case, the spark does not get the timestamp field, It will not be able to infer that csv field/column correctly considering that column to be of string type.

When you see dataframe schema this time, it will give the timestamp field as string:

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- joining_date: string (nullable = true)
 |-- wedding_date: string (nullable = true)

So, above mentioned issues have solution to them depending on the version of the spark we are using.

Solution 1: When we are using Spark version 2.0.1 and above

Here, you have straight forward option timestampFormat, to give any timestamp format while reading csv.We have to just add an extra option defining the custom timestamp format, like option(“timestampFormat”, “MM-dd-yyyy hh mm ss”)


val csvDataframe = session.sqlContext.read.format("com.databricks.spark.csv") .option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.option("timestampFormat", "MM-dd-yyyy hh mm ss")
.load("<path of the csv file>")

csvDataframe.printSchema()

In this way, you will have the timestamp field correctly inferred when we even have some other timestamp format in csv file.

root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- joining_date: timestamp (nullable = true)
|-- wedding_date: timestamp (nullable = true)

But, just to remind you this solution will work only in spark versions greater than 2.0.0 (2.0.1 and above). However, If we have spark version 2.0.0 or older, then lets see the Solution 2 with a work around.

Solution 2: When spark version 2.0.0 or older is used.

In older versions of spark, the above option for timestampFormat does not exist. Though we have the way to do so. Let it be inferred as string, and cast the string field having the timestamp value explicitly to the timestamp.

For this, you must know the columns that need to be converted to the timestamp.

For example I know all my timestamp field ends with “_date”. Then those fields can be explicitly casted to any timestamp format.


val csvDataframe = session.sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("inferSchema", "true")
.option("mode", "DROPMALFORMED")
.load("<path of the csv file>")

val updatedDF = csvDataframe.columns.filter(colName =>colName.endsWith("_date"))
.foldLeft(csvDataframe) { (outputDF, columnName) =>
outputDF.withColumn(columnName, unix_timestamp(col(columnName), "MM-dd-yyyy hh mm ss").cast("timestamp"))
}
updatedDF. printSchema ()

This way you will able to get the correct data type for timestamp fields with other formats as well.

root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- joining_date: timestamp (nullable = true)
|-- wedding_date: timestamp (nullable = true)

Conclusion: While you read csv using spark you may have problems while reading timestamp field having timestamp format other than the default one, i.e yyyy-MM-dd hh:mm:ss. This blog have the solution to this timestamp format issue that comes while reading the csv in spark for both spark versions 2.0.1 or newer, and also for spark versions 2.0.0 or older.

Thanks!


KNOLDUS-advt-sticker

Advertisements
This entry was posted in apache spark, big data, Functional Programming, Scala. Bookmark the permalink.

One Response to Apache Spark: Reading csv using custom timestamp format

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s