In today’s Big Data world, we process large amounts of data continuously and store the resulting data into data lake. This keeps changing the state of the data lake. But, sometimes we would like to access a historical version of our data. This requires versioning of data. Such kinds of data management simplifies our data pipeline by making it easy for professionals or organizations to audit the data changes, roll back to the previous version in case of accidental bad writes or deletes and so on. Apache Spark, alone can’t provide these kinds of capabilities but with the help of Databricks Delta, the next-gen unified analytics engine built on top of Apache Spark, introduces such unique Time Travel capabilities.
Time Traveling using Delta Lake
When we write our data into a Delta table, every operation is automatically versioned and we can access any version of data. This allows us to travel back to a different version of the current delta table.
This time-traveling can be achieved using 2 approaches:
1. Using a version number
2. Using a timestamp
Let’s understand these approaches with an example:
//Job-1 writing data into delta lake Dataset<Long> data = sparkSession.range(100, 200); data.write().mode("overwrite").format("delta").save(FILE_PATH); LOGGER.info("records created after job-1: " + sparkSession.read().format("delta").load(FILE_PATH).count()); Dataset<Row> job1DeltaTable = sparkSession.read() .format("delta").load(FILE_PATH); job1DeltaTable.show();
In the above code snippet, Job-1 writes a record of 100 integers from 100 to 200 into the delta table. Similarly, Job-2 updates the record created by Job-1 with 100 integers from 1 to 100.
//Job-2 updating record created by Job-1 sparkSession.range(100).write().mode("overwrite") .format("delta").option("overwriteSchema", "true").save(FILE_PATH); LOGGER.info("records created after job-2: " + sparkSession.read().format("delta").load(FILE_PATH).count()); Dataset<Row> job2DeltaTable = sparkSession.read() .format("delta").load(FILE_PATH); job2DeltaTable.show();
Now, let’s find the latest history of the delta table.
DeltaTable deltaTable = DeltaTable.forPath(FILE_PATH); Dataset<Row> latestHistory = deltaTable.history(); latestHistory.select("version","timestamp","operation","operationParameters").show(false);
As seen above, both operations have unique version numbers and time stamps. This helps to audit the data changes in the data lake.
1. Using a version number
//Read first version of table using version number. Dataset<Row> version_0_DeltaLakeTable = sparkSession.read().option("versionAsOf", NumberUtils.INTEGER_ZERO) .format("delta").load(FILE_PATH); version_0_DeltaLakeTable.show();
In the above code snippet, read() reads version 0 of the current delta table. In the same manner, we can pass our required version number to read(). Thus, using the version number of the operation, we can read the required version of the data table.
2. Using a timestamp
First, find the timestamp of Job-1
//find timestamp for first version of delta table Dataset<Row> load = sparkSession.read().format("json").option("multiline", true).load(FIRST_COMMIT_FILE_PATH); Dataset<Row> timestampDataSet = load.select("commitInfo.timestamp"); String timestamp = new Timestamp(timestampDataSet.first().getLong(0)).toString(code );
Second, using the timestamp read the historical version of the delta table
//Read first version of table using its time Stamp. Dataset<Row> version_0_DeltaLakeTable = sparkSession.read().option("timestampAsOf", timeStamp) .format("delta").load(FILE_PATH); version_0_DeltaLakeTable.show();
Time Travel Use Cases
Delta Lake time travel allows us to query an older snapshot of a Delta Lake table. Time travel has many use cases, including:
- Time travel makes it easy to do rollbacks in case of bad writes, playing an important role in fixing mistakes in our data.
- It helps in re-creating analysis, reports, or outputs (for example, the output of a machine learning model). This could be useful for debugging or auditing, especially in regulated industries.
- It also simplifies time-series analytics. For instance, in finding out how many new customers were added over the last week.
The source code can be found here
Thanks for reading!!
4 thoughts on “Time Travel: Data versioning in Delta Lake4 min read”
The best post on Delta Lake I have ever read. Nice article!
Nice article very helpful
Very nicely written. It helped in understanding.
But do we really need `option(“overwriteSchema”, “true”)` when Job2 is overwriting the data? As the schema is still of Integer type!
Hi Sarfaraz, Thanks for the comment. Actually there is no need of `option(“overwriteSchema”, “true”)` for the above example as schema will be same for both the jobs (id: long). It will be helpful to overcome schema mismatch when we write job-2 that results with different schema type only.
Comments are closed.