Skip to main content

Automatically Evolve Your Nested Column Schema, Stream From a Delta Table Version, and Check Your Constraints

Delta Lake makes MERGE great with version 0.8.0

delta-8-0-og2

Published: February 10, 2021

Data Engineering8 min read

We recently announced the release of Delta Lake 0.8.0, which introduces schema evolution and performance improvements in merge and operational metrics in table history. The key features in this release are:

  • Unlimited MATCHED and NOT MATCHED clauses for merge operations in Scala, Java, and Python. Merge operations now support any number of whenMatched and whenNotMatched clauses. In addition, merge queries that unconditionally delete matched rows no longer throw errors on multiple matches. This will be supported using SQL with Spark 3.1.
  • MERGE operation now supports schema evolution of nested columns. Schema evolution of nested columns now has the same semantics as that of top-level columns. For example, new nested columns can be automatically added to a StructType column.
  • MERGE INTO and UPDATE operations now resolve nested struct columns by name. Update operations UPDATE and MERGE INTO commands now resolve nested struct columns by name, meaning that when comparing or assigning columns of type StructType, the order of the nested columns does not matter (exactly in the same way as the order of top-level columns). To revert to resolving by position, set the following Spark configuration to false: spark.databricks.delta.resolveMergeUpdateStructsByName.enabled.
  • Check constraints on Delta tables. Delta now supports CHECK constraints. When supplied, Delta automatically verifies that data added to a table satisfies the specified constraint expression. To add CHECK constraints, use the ALTER TABLE ADD CONSTRAINTS command.
  • Start streaming a table from a specific version (#474). When using Delta as a streaming source, you can use the options startingTimestamp or startingVersion to start processing the table from a given version and onwards. You can also set startingVersion to latest to skip existing data in the table and stream from the new incoming data.
  • Ability to perform parallel deletes with VACUUM (#395). When using `VACUUM`, you can set the session configuration spark.databricks.delta.vacuum.parallelDelete.enabled to true in order to use Spark to perform the deletion of files in parallel (based on the number of shuffle partitions).
  • Use Scala implicits to simplify read and write APIs. You can import io.delta.implicits. to use the `delta` method with Spark read and write APIs such as spark.read.delta(“/my/table/path”).

In addition, we also highlight that you can now read a Delta table without using Spark via the Delta Standalone Reader and Delta Rust API. See Use Delta Standalone Reader and the Delta Rust API to query your Delta Lake without Apache Spark™ to learn more.

 Get an early preview of <a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=evolvenestedcolumnblog-textpromo-oreillydlupandrunning">O'Reilly's new ebook</a> for the step-by-step guidance you need to start using Delta Lake.</p><h2>Automatically evolve your nested column schema</h2><p>As noted in previous releases, Delta Lake includes the ability to:</p><ul><li><a href="https://www.databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html" rel="noopener" target="_blank">execute merge operations</a>,</li><li>simplify your <a href="https://www.youtube.com/watch?v=7ewmcdrylsA" rel="noopener" target="_blank">insert/update/delete operations in a single atomic operation</a>,</li><li><a href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" rel="noopener" target="_blank">enforce and evolve your schema</a> (more details can also be found in <a href="https://www.youtube.com/watch?v=tjb10n5wVs8" rel="noopener" target="_blank">this tech talk</a>),</li><li><a href="https://www.databricks.com/blog/2020/05/19/schema-evolution-in-merge-operations-and-operational-metrics-in-delta-lake.html" rel="noopener" target="_blank">evolve your schema within a merge operation</a>.</li></ul><p>With Delta Lake 0.8.0, you can automatically evolve nested columns within your Delta table with <strong>UPDATE</strong> and <strong>MERGE</strong> operations.</p><p>Let’s showcase this by using a simple coffee espresso example. We will create our first Delta table using the following code snippet.</p><pre># espresso1 JSON string json_espresso1 = [ ... ] # create RDD espresso1_rdd = sc.parallelize(json_espresso1) # read JSON from RDD espresso1 = spark.read.json(espresso1_rdd) # Write Delta table espresso1.write.format("delta").save(espresso_table_path)</pre><p>The following is a view of the espresso table:<br><a href="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-rev.png" data-lightbox><img class="alignleft size-large wp-image-142917" style="width:1024px;" src="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-rev-1024x231.png" alt="DataFrame table in Delta Lake 8.0.0" height="231"></a></p><p><strong>The following code snippet creates the espresso_updates DataFrame:</strong></p><pre># Create DataFrame from JSON string json_espresso2 = [...] espresso2_rdd = sc.parallelize(json_espresso2) espresso2 = spark.read.json(espresso2_rdd) espresso2.createOrReplaceTempView("espresso_updates")</pre><p><strong>with this table view:</strong><br><a href="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-2.png" data-lightbox><img class="alignleft size-large wp-image-142709" style="width:1024px;" src="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-2-1024x205.png" alt="DataFrame table in Delta Lake 0.8.0" height="205"></a></p><p>Observe that the espresso_updates DataFrame has a different coffee_profile column, which includes a new flavor_notes column.</p><pre># espresso Delta Table `coffee_profile` schema |-- coffee_profile: struct (nullable = true) | |-- temp: double (nullable = true) | |-- weight: double (nullable = true)</pre><pre># espresso_updates DataFrame `coffee_profile` schema |-- coffee_profile: struct (nullable = true) | |-- flavor_notes: array (nullable = true) | | |-- element: string (containsNull = true) | |-- temp: double (nullable = true) | |-- weight: double (nullable = true)</pre><p>To run a MERGE operation between these two tables, run the following Spark SQL code snippet:</p><pre>MERGE INTO espresso AS t USING espresso_updates u ON u.espresso_id = t.espresso_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *</pre><p>By default, this snippet will have the following error since the coffee_profile columns between espresso and espresso_updates are different.</p><pre>Error in SQL statement: AnalysisException: Cannot cast struct,temp:double,weight:double&gt; to struct. All nested columns must match.</pre><h2>AutoMerge to the rescue</h2><p>To work around this issue, enable <strong>autoMerge</strong> using the below code snippet; the espresso Delta table will automatically merge the two tables with different schemas including nested columns.</p><pre>-- Enable automatic schema evolution SET spark.databricks.delta.schema.autoMerge.enabled=true;</pre><p><strong>In a single atomic operation, MERGE performs the following:</strong></p><ul><li><code>UPDATE: espresso_id = 100 </code>has been updated with the new <code>flavor_notes</code> from the <code>espresso_changes </code>DataFrame.</li><li><code>espresso_id = (101, 102)</code> no changes have been made to the data as appropriate.</li><li><code>INSERT: espresso_id = 103</code> is a new row that has been inserted from the <code>espresso_changes</code> DataFrame.</li></ul><figure><a href="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-3.png" data-lightbox><img class="size-full wp-image-142711" style="width:2652px;" src="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-3.png" alt="Tabular View displaying nested columns of the coffee_profile column." height="600"></a><figcaption>Tabular View displaying nested columns of the coffee_profile column.</figcaption></figure><h2>Simplify read and write APIs with Scala Implicits</h2><p>You can import <code>io.delta.implicits.</code> to use the <code>delta</code> method with Spark read and write APIs such as <code>spark.read.delta("/my/table/path")</code>.</p><pre>// Traditionally, to read the Delta table using Scala, you would execute the following spark .read .format("delta") .load("/tmp/espresso/") .show() </pre><pre>// With Scala implicts, the format is a little simpler import io.delta.implicits. spark .read .delta("/tmp/espresso/") .show()</pre><h2>Check Constraints</h2><p>You can now add CHECK constraints to your tables, which not only checks the existing data, but also enforces future data modifications. For example, to ensure that the <code>espresso_id &gt;= 100</code>, run this SQL statement:</p><pre>-- Ensure that espresso_id &gt;= 100 -- This constraint will both check and enforce future modifications of data to your table ALTER TABLE espresso ADD CONSTRAINT idCheck CHECK (espresso_id &gt;= 100);</pre><pre>-- Drop the constraint from the table if you do not need it ALTER TABLE espresso DROP CONSTRAINT idCheck;</pre><p>The following constraint will fail as the `milk-based_espresso` column has both True and False values.</p><pre>- Check if the column has only True values; NOTE, this constraint will fail. ALTER TABLE espresso ADD CONSTRAINT milkBaseCheck CHECK (`milk-based_espresso` IN (True));</pre><pre>-- Error output Error in SQL statement: AnalysisException: 1 rows in profitecpro.espresso violate the new CHECK constraint (`milk-based_espresso` IN ( True ))</pre><p>The addition or dropping of <strong>CHECK </strong>constraints will also appear in the transaction log (via <strong>DESCRIBE HISTORY espresso</strong>) of your Delta table with the operationalParameters articulating the constraint.</p><figure><a href="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-4.png" data-lightbox><img class="size-large wp-image-142712" style="width:1024px;" src="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-4-1024x177.png" alt="Tabular View displaying the constraint operations within the transaction log history" height="177"></a><figcaption>Tabular View displaying the constraint operations within the transaction log history</figcaption></figure>

Start streaming a table from a specific version

When using Delta as a streaming source, you can use the options startingTimestamp or startingVersionto start processing the table from a given version and onwards. You can also set startingVersion to latestto skip existing data in the table and stream from the new incoming data.

Within the notebook, we will generate an artificial stream:

And then generate a new Delta table using this code snippet:

The code in the notebook will run the stream for approximately 20 seconds to create the following iterator table with the below transaction log history. In this case, this table has 10 transactions.

Tabular View displaying the iterator table transaction log history

Review iterator output

The iterator table has 10 transactions over a duration of approximately 20 seconds. To view this data over a duration, we will run the next SQL statement that calculates the timestamp of each insert into the iterator table rounded to the second (ts). Note that the value of ts = 0 is the minimum timestamp, and e want to bucket by duration (ts) via a group by running the following:

The preceding statement produces this bar graph with time buckets (ts) by row count (cnt).

Notice for the 20 second stream write performed with ten distinct transactions, there are 19 distinct time-buckets.

Notice for the 20-second stream write performed with ten distinct transactions, there are 19 distinct time-buckets.

Start the Delta stream from a specific version

Using .option("startingVersion", "6"), we can specify which version of the table we will want to start our readStream from (inclusive).

The following graph is generated by re-running the previous SQL query against the new reiterator table.

 Notice for the reiterator table, now there are 10 distinct time-buckets as we’re starting from a later transaction version of the table.

Notice for the reiterator table, there are 10 distinct time-buckets, as we’re starting from a later transaction version of the table.

Get Started with Delta Lake 0.8.0

Try out Delta Lake with the preceding code snippets on your Apache Spark 3.1 (or greater) instance (on Databricks, try this with DBR 8.0+). Delta Lake makes your data lakes more reliable--whether you create a new one or migrate an existing data lake. To learn more, refer to https://delta.io/, and join the Delta Lake community via the Slack and Google Group. You can track all the upcoming releases and planned features in GitHub milestones and try out Managed Delta Lake on Databricks with a free account.

Credits

We want to thank the following contributors for updates, doc changes, and contributions in Delta Lake 0.8.0: Adam Binford, Alan Jin, Alex Liu, Ali Afroozeh, Andrew Fogarty, Burak Yavuz, David Lewis, Gengliang Wang, HyukjinKwon, Jacek Laskowski, Jose Torres, Kian Ghodoussi, Linhong Liu, Liwen Sun, Mahmoud Mahdi, Maryann Xue, Michael Armbrust, Mike Dias, Pranav Anand, Rahul Mahadev, Scott Sandre, Shixiong Zhu, Stephanie Bodoff, Tathagata Das, Wenchen Fan, Wesley Hoffman, Xiao Li, Yijia Cui, Yuanjian Li, Zach Schuermann, contrun, ekoifman, and Yi Wu.

Never miss a Databricks post

Subscribe to our blog and get the latest posts delivered to your inbox