Spark SQL in Delta Lake 0.7.0

Reading Time: 3 minutes

Nowadays Delta lake is a buzz word in the Big Data world, especially among the spark developers because it relegates lots of issues found in the Big Data domain. Delta Lake is an open-source storage layer that brings reliability to data lakes. Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing. It is evolving day by day and adds cool features in its every release. On 19th June 2020, Delta lake version 0.7.0 was released and this is the first release on Spark 3.x. This release involves important key features that can make the spark developer’s work easy.

One of the interesting key features in this release is the support for metastore-defined tables and SQL DDLs. So now we can define Delta tables in the Hive metastore and use the table name in all SQL operations. We can perform SQL DDLs to create tables, insert into tables, explicitly alter the schema of the tables, and so on. So in this blog, we will learn how we can perform SQL DDLs/DMLS/DQL in Delta Lake 0.7.0.

Configure Spark Session

For many Delta Lake DDL operations, we must have to enable Delta lake integration with Apache Spark DataSourceV2 and Catalog APIs (since 3.0) by setting the following configurations when creating a new SparkSession for a spark application.

SparkSession sparkSession = SparkSession.builder().appName("DeltaLakeSQL ")
        .master("local")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate();

If running spark-shell, pass the above two configurations and delta lake package. Now let’s explore some of the DDLs, DMLs, and DQL, we can perform in delta lake 0.7.0.

Creating a Delta Table

Create a delta table by running the query:

sparkSession.sql("CREATE TABLE customer(c_id Long, c_name String, c_city String) USING DELTA")
sparkSession.sql("CREATE TABLE newCustomer(c_id Long, c_name String, c_city String) USING DELTA")

Here “customer” and newCustomer are delta table names.

Insert data into delta table

Insert data into your delta table by running the query:

sparkSession.sql("INSERT INTO  customer VALUES(1, 'John', 'New York'), (2, 'Shawn', 'California')")
sparkSession.sql("INSERT INTO newCustomer VALUES(2, 'Shawn', 'Texas'), (3, 'Redish', 'California')")
Read Delta Table

Read your delta table by running the query:

sparkSession.sql("SELECT * FROM customer").show()
sparkSession.sql("SELECT * FROM newCustomer").show()
Update delta table data

In the below query updating the city of customer from New York to California in customer table whose id is 1.

sparkSession.sql("UPDATE customer SET c_city = 'California'  WHERE c_id == 1")
Delete data from Delta table

In the below query deleting the record of John from the customer delta table.

sparkSession.sql("DELETE FROM customer WHERE c_name == 'John' ")
Merge new data in Delta table

In the below command an Upsert(Update + Insert) operation is performed if the customer id is matched in both customer and newCustomer table then changes the city name of the customer in the customer table to the city of the customer in the newCustomer table otherwise add new records in newCustomer table into customer table.

sparkSession.sql(""" MERGE INTO customer USING newCustomer ON customer.c_id = newCustomer.c_id  WHEN MATCHED THEN                                 UPDATE SET customer.c_city = newCustomer.c_city  WHEN NOT MATCHED THEN INSERT * """)
Getting history of Delta table.

To get the history of a Delta table run query:

sparkSession.sql("DESCRIBE HISTORY customer").show()

This will give the information of delta table like what operations so far are performed on this delta table with each unique version number and time stamp. For each operation that we performed on the delta table, a new version data is saved for that Delta table.

Read old version of the data using time travel

To read a old version of data of a Delta table run the below query:

sparkSession.read().format("delta").option("versionAsOf", versionNumber).table(customer).show();

Here, versionNumber is a value like 0, 1, 2 and so on.

Drop a delta table

To Drop a particular delta table run the below query:

sparkSession.sql("DROP TABLE IF EXISTS customer")

So far, we have gone through many SQL DDLs, DMLs, and DQL operation you can refer this to learn more about Delta lake.

You can find a sample menu driven spark application that demonstrated the SQL DDLs, DMLs, and DQL operation in Delta Lake.

Keep Learning!!

1 thought on “Spark SQL in Delta Lake 0.7.04 min read

Leave a Reply