Dealing With Deltas In Amazon Redshift


Hi, In this blog I would like to discuss a scenario of Deltas implementation in Amazon Redshift using spark-redshift. Prior to that I would like to make you aware of Amazon Redshift, spark-redshift library and integration of Spark with Redshift. It is assumed that you have a fair knowledge of programming in Apache Spark and Spark SQL. You may refer to the documentation links provided below for increasing your knowledge:

  1. Apache Spark Programming Guide
  2. Spark SQL Documentation

Amazon Redshift

Amazon Redshift is a fast , fully managed datawarehousing service. It helps to analyze the data using standard SQL and the existing Business Intelligence Tools in an easy and cost – effective manner. Redshift differs from Amazon’s other hosted database offering, Amazon RDS, in its ability to handle analytics workloads on big data sets stored by a column-oriented DBMS principle, i.e. it stores data in columns rather than rows, so the query performance is increased as the database can more precisely access the data it needs to answer a query rather than scanning and discarding unwanted data in rows. To be able to handle large scale data sets and database migrations, amazon makes use of massive parallel processing. Amazon Redshift is based on PostgreSQL 8.0.2.

Spark-Redshift

It is a library which is used to load data from Redshift into Spark SQL Dataframes and then write them back into Redshift Tables. It uses Amazon S3 to transfer data in and out of Redshift and uses JDBC to automatically trigger COPY and UNLOAD commands on Redshift. It is comomonly used in ETL applications wherein each query execution results in extraction of huge data to S3.
Before getting into much details about the library, lets begin with some practical work in order to understand things more clearly. You may refer to the link for detailed understanding and knowledge of spark-redshift. Lets begin with the installation:


The installation may vary depending on your Spark cluster’s cluster image version:
I used Spark 2.1.0. The spark-redshift library is not included in this version, so I used version 3.0.0-preview1 of spark-redshift library from the databricks maven repository. In addition, you must configure a Redshift-compatible JDBC driver. There are two ways of doing this:

1. Use the bundled PostgreSQL JDBC driver: Databricks cluster image automatically includes PostgreSQL JDBC driver, so you may specify a url of the form jdbc://postgresql://… in order to use that driver.
2. Download and install the official Redshift JDBC driver: download the official JDBC driver and include its dependency from here into your build.sbt. Then use urls of the form jdbc://redshift://..

So in your build.sbt file for the Scala project, you should have the spark dependency, spark-redshift dependency and redshift jdbc driver dependency.
You need to configure your AWS credentials (Secret Key + Access Key) as the library uses temporary S3 bucket for loading and unloading data from redshift into the Amazon S3 bucket. You can specify AWS keys in Spark Session using Spark Context.

val sparkSession = SparkSession.builder
.master(“local”)
.appName(“spark with redshift”)
.getOrCreate()

val sc = sparkSession.sparkContext

If you are using s3n filesystem then add:

sc.hadoopConfiguration.set(“fs.s3n.awsAccessKeyId”, “YOUR_KEY_ID”)
sc.hadoopConfiguration.set(“fs.s3n.awsSecretAccessKey”,”YOUR_SECRET_ACCESS_KEY”)

Now we are all ready to write our delta code

Reading from Redshift:

val df = sparkSession.read
.format(“com.databricks.spark.redshift”)
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("forward_spark-s3_credentials", true)
.option("tempdir", "s3n://path/for/temp/data")
.load()

Writing into Redshift tables

df.write
.format("com.databricks.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("forward_spark_s3_credentials", true)
.option("tempdir", "s3n://path/for/temp/data")
.mode(SaveMode.Overwrite)
.save()

Implementing Deltas

Deltas refer to updating the existing record, or inserting a new record into an already existing table.
There often comes a situation in an ETL application where in we want to implement Deltas on the basis of some condition, in the application. In order to implement the deltas, we are considering a scenario where we want to load the data from a CSV file into the dataframe, and we don’t want to overwrite all the rows of the existing redshift table. We only want to update or add the records which are updated or are new records in the extract CSV on the basis of primary key.

Spark SQL offers various options. Some of them are listed below:

  • query: helps to query (read in from Redshift). It is required unless dbtable is specified.
  • dbtable: table to create or read in from redshift. It is required unless query is specified.
  • preactions: perform some preactions before writing the dataframe into the redshift table.
  • postactions: perform some postactions after the dataframe is written into the table,
    and many more.

So here is the way to proceed for the above mentioned scenario.

Firstly we’ll upload the CSV contents into a dataframe,

def readFromCSV(sparkSession: SparkSession, jdbcURL: String, tempS3Dir: String, filePath: String): DataFrame = {
sparkSession.read.format(“com.databricks.spark.csv”)
.option(“header”,true)
.option(“inferSchema”, true)
.option(“treatEmptyValuesAsNulls”, true)
.option(“nullValue”, “”)
.option(“mode”, “DROPMALFORMED”)
.load(filePath)
}

After reading the extract CSV into the dataframe, we’ll upload the updated and the records to be inserted into the redshift table. We are assuming that the table in which the new data will be stored is already stored in the database.
We are using the conventions of source, staging and target tables for storing the updated and inserted records into the database.
Source table refers to the data from the extract CSV. Target table is the table already stored in the database, a table containing the old data. Staging table is a temporary table(having same schema as that of target) where in the updated records from the source are saved. This table is achieved by Merging source and target tables.

def writeIntoTable(sourceDF: DataFrame, jdbcURL: String, tempS3Dir: String, targetTable: String): Unit = {
val stagingTable = s”{targetTable}Staging”
val sourceTable = s”{targetTable}Source”
val insertUpdatedRecordsInStaging = “””Insert into %$3s Select %2$s.* from %2$s join %1$s on %1$s.id = %2$s.id;“””
val insertUpdatedRecordsInTarget = “””Delete from %1$s using %3$s where %1$s.id = %3$s.id;
Insert into %1$s Select * from %3$s;“””

val insertNewRecordsInTarget = “””Delete from %2$s using %1$s where %2$s.id = %1$s.id;
Insert into %1$s Select * from %2$s;“””

val postActionsQuery = s“””${insertUpdatedRecordsInStaging}begin transaction;${insertUpdatedRecordsInTarget}${insertNewRecordsInTarget}end transaction;DROP tables $sourceTable,$stagingTable;“””
val deltaQuery = postActionsQuery.format(targetTable, sourceTable, stagingTable)
sourceDF.write.format(“com.databricks.spark.redshift”)
.option(“url”, jdbcURL)
.option(“tempdir”, tempS3Dir)
.option(“forward_spark_s3_credentials”, true)
.option(“preactions”,s”CREATE TABLE $stagingTable (LIKE $targetTable)”)
.option(“dbtable”,s”$sourceTable”)
.option(“postactions”, s”$deltaQuery”)
.mode(SaveMode.Overwrite)
.save()
}

Assuming that both the tables have id as a primary key, so the comparison is made on the basis of primary key id.

We have wrapped the insert and delete queries in between begin transaction; and end transaction; block so that they are performed in unity, in case any problem arises, the transaction should be rollbacked.

We could also use the following query to select the records to be inserted (new records) into the target table:
val retrieveNewRecords = “””Insert into %1$s using Select * from %2$s where %2$s.id NOT IN (Select id from %1$s);“””

But the above query will require loading of whole target table for just matching up the id’s from source which are not present in the target table. This is not an efficient way for achieving this if the target table happens to be really huge.

This is how we’ll handle the deltas in the above mentioned scenario. I hope this example will help you in implementing your delta situations.

That’s it friends! I hope you enjoyed reading the blog and get an overview of how Amazon Redshift works along with Databricks Spark. I’m sharing the references for detailed study.

References:

  1. Amazon Redshift Documentation
  2. Redshift DataSource for Apache Spark – Spark-Redshift Databricks
Advertisements
This entry was posted in Amazon, apache spark, AWS, AWS Services, database, Scala, Spark and tagged , , . Bookmark the permalink.

2 Responses to Dealing With Deltas In Amazon Redshift

  1. Bina N says:

    There is an exception thrown when executing this code from .scala. % is not a valid character with java.util.Formatter
    Exception in thread “main” java.util.UnknownFormatConversionException: Conversion = ‘$’

    private static void checkText(String s, int start, int end) {
    for (int i = start; i < end; i++) {
    // Any '%' found in the region starts an invalid format specifier.
    if (s.charAt(i) == '%') {
    char c = (i == end – 1) ? '%' : s.charAt(i + 1);
    throw new UnknownFormatConversionException(String.valueOf(c));
    }
    }
    }

  2. Bina N says:

    you have typo in
    val insertUpdatedRecordsInStaging = “””Insert into %$3s Select %2$s.* from %2$s join %1$s on %1$s.id = %2$s.id;“””

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s