Delta Lake: Schema Enforcement & Evolution

Reading Time: 4 minutes

Nowadays data is constantly evolving and changing. As well as the business problems and requirements are evolving, the shape or the structure of the data is also changing. When that happens, we want to be in control of how the data or schema changes. But how we can achieve this? Delta Lake has good ways to control how schema changes. With Delta Lake, users have access to simple semantics to control the schema of their tables. Delta Lake facilitates schema management out of the box with two features i.e Schema Enforcement and Scheme Evolution.

Schema Enforcement

Every DataFrame in Apache Spark contains a schema, that defines the shape of the data such as data types, column names, and metadata. Let’s assume we are writing to a DataFrame having a certain schema. Suddenly the schema got changed such as a new unknown columns were added or the type of any column got changed. But Spark doesn’t enforce a schema and doesn’t throw any exception too. So, accidentally we polluted our table with garbage data which degraded the data quality. Then schema enforcement comes into the picture that is an excellent tool to use as a gatekeeper of a clean and fully transformed data set that is ready for production or consumption.

Schema enforcement, also known as schema validation, is a safeguard in Delta Lake that ensures data quality by rejecting writes to a table that doesn’t match the table’s schema. To determine whether a write to a table is compatible, Delta Lake uses the following there rule:

1. All Dataframe Columns must exist in the target table. If there is a column in Dataframe that is not present in the target table, an exception will raise. Columns present in the target table but not in the Dataframe are set to null.
2. Dataframe column data types must match the column data types in the target table. If they don’t match, an exception is raised.
3. Dataframe column names cannot differ only by case. This means that we cannot have columns such as ‘Foo’ and  ‘foo’ defined in the same table. While Spark can be used in case sensitive or insensitive (default) mode, Delta Lake is case-preserving but insensitive when storing the schema.

Example

  public static void main(String[] args) {
        // Turn off spark's default logger
        Logger.getLogger("org.apache").setLevel(Level.WARN);

        // Create Spark Session
        SparkSession sparkSession = SparkSession.builder().appName(SPARK_APPLICATION_NAME)
                .master(SPARK_APPLICATION_RUNNING_MODE)
                .getOrCreate();

        //Reading csv file and creating salaryDataFrame table.
        Dataset<Row> salaryDataFrame = sparkSession.read().option("header", true).csv(FILE_PATH);

        //Creating Temporary view.
        salaryDataFrame.createOrReplaceTempView("salarytable");

        // writing salaryDataFrame to STORE_FILE_PATH
        salaryDataFrame.write().option("header", true).format("delta").save(STORE_FILE_PATH);
        salaryDataFrame.printSchema();

        // Query to select all columns with new column TotalSalary.
        Dataset<Row> totalSalaryDataFrame = sparkSession.sql("select SalaryDataID, EmployeeName, Department, JobTitle, " +
              "AnnualRate, RegularRate, OvertimeRate, IncentiveAllowance, Other, YearToDate," +
              " COALESCE(AnnualRate,0) + COALESCE(YearToDate,0)  AS TotalSalary from salarytable");
        totalSalaryDataFrame.printSchema();

        // Appending  totalSalaryDataFrame to  STORE_FILE_PATH
       totalSalaryDataFrame.write().mode("append").format("delta").save(STORE_FILE_PATH);

        //close Spark Session
        sparkSession.close();
    }

In the above code snippet after creating a spark session we are reading a CSV file and creating salaryDataFrame whose schema looks like below:

root
-- SalaryDataID: string (nullable = true)
-- CalendarYear: string (nullable = true)
-- EmployeeName: string (nullable = true)
-- Department: string (nullable = true)
-- JobTitle: string (nullable = true)
-- AnnualRate: string (nullable = true)
-- RegularRate: string (nullable = true)
-- OvertimeRate: string (nullable = true)
-- IncentiveAllowance: string (nullable = true)
-- Other: string (nullable = true)
-- YearToDate: string (nullable = true)

Then we are creating a temporary view of salaryDataFrame with name salarytable. After that, writing the salaryDataFrame to a STORE_FILE_PATH directory into Delta Table which is now our target table. Then we have a query that selects all columns from the salary table with the addition of new column TotalSalary which is calculated by the sum of AnnualRate and YearToDate. By running this query we will get totalSalaryDataFrame whose schema looks like below:

root
-- SalaryDataID: string (nullable = true)
-- EmployeeName: string (nullable = true)
-- Department: string (nullable = true)
-- JobTitle: string (nullable = true)
-- AnnualRate: string (nullable = true)
-- RegularRate: string (nullable = true)
-- OvertimeRate: string (nullable = true)
-- IncentiveAllowance: string (nullable = true)
-- Other: string (nullable = true)
-- YearToDate: string (nullable = true)
-- TotalSalary: double (nullable = true)

Then we are trying to append totalSalaryDataFrame to the target Delta Table. But this line will throw an exception – A schema mismatch detected and will print the schema of both target and source table. As there is no any TotalSalary column in the target delta table, it throws an exception. So, this way schema enforcement works with delta lake by rejecting writes to a table that doesn’t match the table’s schema.

Exception in thread "main" org.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table.
To enable schema migration, please set:
'.option("mergeSchema", "true")'.

But upon further review, we decide that we really did mean to add that new TotalSalary column, it’s an easy, one line fix with Delta Lake. The solution is schema evolution!

Schema Evolution

Schema evolution is a feature that allows users to easily change a table’s current schema to accommodate data that is changing over time. Most commonly, it’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns. For the above example if we want to add the new column TotalSalary in the target delta table, we need one line change. While appending totalSalaryDataFrame just add an option for mergeSchema.

totalSalaryDataFrame.write().mode("append").format("delta").option("mergeShema", true).save(STORE_FILE_PATH);

We know if the Dataframe column data types don’t match the column data types in the target table while writing, it will raise an exception. But if we want to allow to do that, just have to use option(“overwriteSchema”, true).

You can find complete code here
Happy Blogging!!