We recently announced the release of Delta Lake 0.8.0, which introduces schema evolution and performance improvements in merge and operational metrics in table history. The key features in this release are:
spark.databricks.delta.resolveMergeUpdateStructsByName.enabled
.io.delta.implicits.
to use the `delta` method with Spark read and write APIs such as spark.read.delta(“/my/table/path”)
.In addition, we also highlight that you can now read a Delta table without using Spark via the Delta Standalone Reader and Delta Rust API. See Use Delta Standalone Reader and the Delta Rust API to query your Delta Lake without Apache Spark™ to learn more.
Get an early preview of <a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=evolvenestedcolumnblog-textpromo-oreillydlupandrunning">O'Reilly's new ebook</a> for the step-by-step guidance you need to start using Delta Lake.</p><h2>Automatically evolve your nested column schema</h2><p>As noted in previous releases, Delta Lake includes the ability to:</p><ul><li><a href="https://www.databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html" rel="noopener" target="_blank">execute merge operations</a>,</li><li>simplify your <a href="https://www.youtube.com/watch?v=7ewmcdrylsA" rel="noopener" target="_blank">insert/update/delete operations in a single atomic operation</a>,</li><li><a href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" rel="noopener" target="_blank">enforce and evolve your schema</a> (more details can also be found in <a href="https://www.youtube.com/watch?v=tjb10n5wVs8" rel="noopener" target="_blank">this tech talk</a>),</li><li><a href="https://www.databricks.com/blog/2020/05/19/schema-evolution-in-merge-operations-and-operational-metrics-in-delta-lake.html" rel="noopener" target="_blank">evolve your schema within a merge operation</a>.</li></ul><p>With Delta Lake 0.8.0, you can automatically evolve nested columns within your Delta table with <strong>UPDATE</strong> and <strong>MERGE</strong> operations.</p><p>Let’s showcase this by using a simple coffee espresso example. We will create our first Delta table using the following code snippet.</p><pre># espresso1 JSON string json_espresso1 = [ ... ] # create RDD espresso1_rdd = sc.parallelize(json_espresso1) # read JSON from RDD espresso1 = spark.read.json(espresso1_rdd) # Write Delta table espresso1.write.format("delta").save(espresso_table_path)</pre><p>The following is a view of the espresso table:<br><a href="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-rev.png" data-lightbox><img class="alignleft size-large wp-image-142917" style="width:1024px;" src="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-rev-1024x231.png" alt="DataFrame table in Delta Lake 8.0.0" height="231"></a></p><p><strong>The following code snippet creates the espresso_updates DataFrame:</strong></p><pre># Create DataFrame from JSON string json_espresso2 = [...] espresso2_rdd = sc.parallelize(json_espresso2) espresso2 = spark.read.json(espresso2_rdd) espresso2.createOrReplaceTempView("espresso_updates")</pre><p><strong>with this table view:</strong><br><a href="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-2.png" data-lightbox><img class="alignleft size-large wp-image-142709" style="width:1024px;" src="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-2-1024x205.png" alt="DataFrame table in Delta Lake 0.8.0" height="205"></a></p><p>Observe that the espresso_updates DataFrame has a different coffee_profile column, which includes a new flavor_notes column.</p><pre># espresso Delta Table `coffee_profile` schema |-- coffee_profile: struct (nullable = true) | |-- temp: double (nullable = true) | |-- weight: double (nullable = true)</pre><pre># espresso_updates DataFrame `coffee_profile` schema |-- coffee_profile: struct (nullable = true) | |-- flavor_notes: array (nullable = true) | | |-- element: string (containsNull = true) | |-- temp: double (nullable = true) | |-- weight: double (nullable = true)</pre><p>To run a MERGE operation between these two tables, run the following Spark SQL code snippet:</p><pre>MERGE INTO espresso AS t USING espresso_updates u ON u.espresso_id = t.espresso_id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *</pre><p>By default, this snippet will have the following error since the coffee_profile columns between espresso and espresso_updates are different.</p><pre>Error in SQL statement: AnalysisException: Cannot cast struct,temp:double,weight:double> to struct. All nested columns must match.</pre><h2>AutoMerge to the rescue</h2><p>To work around this issue, enable <strong>autoMerge</strong> using the below code snippet; the espresso Delta table will automatically merge the two tables with different schemas including nested columns.</p><pre>-- Enable automatic schema evolution SET spark.databricks.delta.schema.autoMerge.enabled=true;</pre><p><strong>In a single atomic operation, MERGE performs the following:</strong></p><ul><li><code>UPDATE: espresso_id = 100 </code>has been updated with the new <code>flavor_notes</code> from the <code>espresso_changes </code>DataFrame.</li><li><code>espresso_id = (101, 102)</code> no changes have been made to the data as appropriate.</li><li><code>INSERT: espresso_id = 103</code> is a new row that has been inserted from the <code>espresso_changes</code> DataFrame.</li></ul><figure><a href="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-3.png" data-lightbox><img class="size-full wp-image-142711" style="width:2652px;" src="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-3.png" alt="Tabular View displaying nested columns of the coffee_profile column." height="600"></a><figcaption>Tabular View displaying nested columns of the coffee_profile column.</figcaption></figure><h2>Simplify read and write APIs with Scala Implicits</h2><p>You can import <code>io.delta.implicits.</code> to use the <code>delta</code> method with Spark read and write APIs such as <code>spark.read.delta("/my/table/path")</code>.</p><pre>// Traditionally, to read the Delta table using Scala, you would execute the following spark .read .format("delta") .load("/tmp/espresso/") .show() </pre><pre>// With Scala implicts, the format is a little simpler import io.delta.implicits. spark .read .delta("/tmp/espresso/") .show()</pre><h2>Check Constraints</h2><p>You can now add CHECK constraints to your tables, which not only checks the existing data, but also enforces future data modifications. For example, to ensure that the <code>espresso_id >= 100</code>, run this SQL statement:</p><pre>-- Ensure that espresso_id >= 100 -- This constraint will both check and enforce future modifications of data to your table ALTER TABLE espresso ADD CONSTRAINT idCheck CHECK (espresso_id >= 100);</pre><pre>-- Drop the constraint from the table if you do not need it ALTER TABLE espresso DROP CONSTRAINT idCheck;</pre><p>The following constraint will fail as the `milk-based_espresso` column has both True and False values.</p><pre>- Check if the column has only True values; NOTE, this constraint will fail. ALTER TABLE espresso ADD CONSTRAINT milkBaseCheck CHECK (`milk-based_espresso` IN (True));</pre><pre>-- Error output Error in SQL statement: AnalysisException: 1 rows in profitecpro.espresso violate the new CHECK constraint (`milk-based_espresso` IN ( True ))</pre><p>The addition or dropping of <strong>CHECK </strong>constraints will also appear in the transaction log (via <strong>DESCRIBE HISTORY espresso</strong>) of your Delta table with the operationalParameters articulating the constraint.</p><figure><a href="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-4.png" data-lightbox><img class="size-large wp-image-142712" style="width:1024px;" src="https://www.databricks.com/wp-content/uploads/2021/02/espresso-table-4-1024x177.png" alt="Tabular View displaying the constraint operations within the transaction log history" height="177"></a><figcaption>Tabular View displaying the constraint operations within the transaction log history</figcaption></figure>
When using Delta as a streaming source, you can use the options startingTimestamp
or startingVersion
to start processing the table from a given version and onwards. You can also set startingVersion
to latest
to skip existing data in the table and stream from the new incoming data.
Within the notebook, we will generate an artificial stream:
And then generate a new Delta table using this code snippet:
The code in the notebook will run the stream for approximately 20 seconds to create the following iterator table with the below transaction log history. In this case, this table has 10 transactions.
The iterator table has 10 transactions over a duration of approximately 20 seconds. To view this data over a duration, we will run the next SQL statement that calculates the timestamp of each insert into the iterator table rounded to the second (ts
). Note that the value of ts = 0
is the minimum timestamp, and e want to bucket by duration (ts
) via a group by running the following:
The preceding statement produces this bar graph with time buckets (ts
) by row count (cnt
).
Notice for the 20-second stream write performed with ten distinct transactions, there are 19 distinct time-buckets.
Using .option("startingVersion", "6")
, we can specify which version of the table we will want to start our readStream from (inclusive).
The following graph is generated by re-running the previous SQL query against the new reiterator table.
Notice for the reiterator table, there are 10 distinct time-buckets, as we’re starting from a later transaction version of the table.
Try out Delta Lake with the preceding code snippets on your Apache Spark 3.1 (or greater) instance (on Databricks, try this with DBR 8.0+). Delta Lake makes your data lakes more reliable--whether you create a new one or migrate an existing data lake. To learn more, refer to https://delta.io/, and join the Delta Lake community via the Slack and Google Group. You can track all the upcoming releases and planned features in GitHub milestones and try out Managed Delta Lake on Databricks with a free account.
We want to thank the following contributors for updates, doc changes, and contributions in Delta Lake 0.8.0: Adam Binford, Alan Jin, Alex Liu, Ali Afroozeh, Andrew Fogarty, Burak Yavuz, David Lewis, Gengliang Wang, HyukjinKwon, Jacek Laskowski, Jose Torres, Kian Ghodoussi, Linhong Liu, Liwen Sun, Mahmoud Mahdi, Maryann Xue, Michael Armbrust, Mike Dias, Pranav Anand, Rahul Mahadev, Scott Sandre, Shixiong Zhu, Stephanie Bodoff, Tathagata Das, Wenchen Fan, Wesley Hoffman, Xiao Li, Yijia Cui, Yuanjian Li, Zach Schuermann, contrun, ekoifman, and Yi Wu.