Engineering blog

Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs

Delta Lake 0.4.0 includes Python APIs and In-place Conversion of Parquet to Delta Lake table
Share this post

Try this Jupyter notebook

We are excited to announce the release of Delta Lake 0.4.0 which introduces Python APIs for manipulating and managing data in Delta tables. The key features in this release are:

  • Python APIs for DML and utility operations (#89) - You can now use Python APIs to update/delete/merge data in Delta Lake tables and to run utility operations (i.e., vacuum, history) on them. These are great for building complex workloads in Python, e.g., Slowly Changing Dimension (SCD) operations, merging change data for replication, and upserts from streaming queries. See the documentation for more details.
  • Convert-to-Delta (#78) - You can now convert a Parquet table in place to a Delta Lake table without rewriting any of the data. This is great for converting very large Parquet tables which would be costly to rewrite as a Delta table. Furthermore, this process is reversible - you can convert a Parquet table to Delta Lake table, operate on it (e.g., delete or merge), and easily convert it back to a Parquet table. See the documentation for more details.
  • SQL for utility operations - You can now use SQL to run utility operations vacuum and history. See the documentation for more details on how to configure Spark to execute these Delta-specific SQL commands.

For more information, please refer to the Delta Lake 0.4.0 release notes and Delta Lake Documentation > Table Deletes, Updates, and Merges.

https://www.youtube.com/watch?v=R4f6SKOetB4

In this blog, we will demonstrate on Apache Spark™ 2.4.3 how to use Python and the new Python APIs in Delta Lake 0.4.0 within the context of an on-time flight performance scenario.  We will show how to upsert and delete data, query old versions of data with time travel and vacuum older versions for cleanup.

How to start using Delta Lake

The Delta Lake package is available as with the --packages option. In our example, we will also demonstrate the ability to VACUUM files and execute Delta Lake SQL commands within Apache Spark.  As this is a short demonstration, we will also enable the following configurations:

  • spark.databricks.delta.retentionDurationCheck.enabled=false to allow us to vacuum files shorter than the default retention duration of 7 days.  Note, this is only required for the SQL command VACUUM.
  • spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension to enable Delta Lake SQL commands within Apache Spark; this is not required for Python or Scala API calls.
# Using Spark Packages
./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

Loading and saving our Delta Lake data

This scenario will be using the On-time flight performance or Departure Delays dataset generated from the RITA BTS Flight Departure Statistics; some examples of this data in action include the 2014 Flight Departure Performance via d3.js Crossfilter and On-Time Flight Performance with GraphFrames for Apache Spark™.  This dataset can be downloaded locally from this github location.  Within pyspark, start by reading the dataset.

# Location variables
tripdelaysFilePath = "/root/data/departuredelays.csv"
pathToEventsTable = "/root/deltalake/departureDelays.delta"

# Read flight delay data
departureDelays = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv(tripdelaysFilePath)

Next, let’s save our departureDelays dataset to a Delta Lake table.  By saving this table to Delta Lake storage, we will be able to take advantage of its features including ACID transactions, unified batch and streaming, and time travel.

# Save flight delay data into Delta Lake format
departureDelays \
.write \
.format("delta") \
.mode("overwrite") \
.save("departureDelays.delta")

Note, this approach is similar to how you would normally save Parquet data; instead of specifying format("parquet"), you will now specify format("delta"). If you were to take a look at the underlying file system, you will notice four files created for the departureDelays Delta Lake table.

/departureDelays.delta$ ls -l
.
..
_delta_log
part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet
part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet
part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet
part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
Note, the _delta_log is the folder that contains the Delta Lake transaction log.  For more information, refer to Diving Into Delta Lake: Unpacking The Transaction Log.

Now, let’s reload the data but this time our DataFrame will be backed by Delta Lake.

# Load flight delay data in Delta Lake format
delays_delta = spark \
.read \
.format("delta") \
.load("departureDelays.delta")

# Create temporary view
delays_delta.createOrReplaceTempView("delays_delta")
 
# How many flights are between Seattle and San Francisco
spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()

Finally, let’s determine the number of flights originating from Seattle to San Francisco; in this dataset, there are 1698 flights.

In-place Conversion to Delta Lake

If you have existing Parquet tables, you have the ability to perform in-place conversions your tables to Delta Lake thus not needing to rewrite your table. To convert the table, you can run the following commands.

from delta.tables import *

# Convert non partitioned parquet table at path '/path/to/table'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`")

# Convert partitioned parquet table at path '/path/to/table' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")

For more information, including how to do this conversion in Scala and SQL, refer to Convert to Delta Lake.

Delete our Flight Data

To delete data from your traditional Data Lake table, you will need to:

  1. Select all of the data from your table not including the rows you want to delete
  2. Create a new table based on the previous query
  3. Delete the original table
  4. Rename the new table to the original table name for downstream dependencies.

Instead of performing all of these steps, with Delta Lake, we can simplify this process by running a DELETE statement.  To show this, let’s delete all of the flights that had arrived early or on-time (i.e. delay ).

from delta.tables import *
from pyspark.sql.functions import *

# Access the Delta Lake table
deltaTable = DeltaTable.forPath(spark, pathToEventsTable
)
# Delete all on-time and early flights
deltaTable.delete("delay 



versiontimestampoperationoperationParameters
22019-09-29 15:41:22UPDATE[predicate -> (or...
12019-09-29 15:40:45DELETE[predicate -> ["(...
02019-09-29 15:40:14WRITE[mode -> Overwrit...

Travel Back in Time with Table History

With Time Travel, you can see review the Delta Lake table as of the version or timestamp.   For more information, refer to Delta Lake documentation > Read older versions of data using Time Travel.  To view historical data, specify the version or Timestamp option; in the code snippet below, we will specify the version option

# Load DataFrames for each version
dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta")
dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta")
dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta")

# Calculate the SEA to SFO flight counts for each version of history
cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count()
cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count()

# Print out the value
print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2))

## Output
SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986

Whether for governance, risk management, and compliance (GRC) or rolling back errors, the Delta Lake table contains both the metadata (e.g. recording the fact that a delete had occurred with these operators) and data (e.g. the actual rows deleted). But how do we remove the data files either for compliance or size reasons?

Cleanup Old Table Versions with Vacuum

The Delta Lake vacuum method will delete all of the rows (and files) by default that are older than 7 days (reference: Delta Lake Vacuum).   If you were to view the file system, you’ll notice the 11 files for your table.

/departureDelays.delta$ ls -l
_delta_log
part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet
part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet
part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet
part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet

To delete all of the files so that you only keep the current snapshot of data, you will specify a small value for the vacuum method (instead of the default retention of 7 days).

# Remove all files older than 0 hours old.
deltaTable.vacuum(0) 
Note, you perform the same task via SQL syntax:¸ # Remove all files older than 0 hours old spark.sql("VACUUM '" + pathToEventsTable + "' RETAIN 0 HOURS")

Once the vacuum has completed, when you review the file system you will notice fewer files as the historical data has been removed.

/departureDelays.delta$ ls -l
_delta_log
part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet
part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet
part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet
part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
Note, the ability to time travel back to a version older than the retention period is lost after running vacuum.

What’s Next

Try out Delta Lake today by trying out the preceding code snippets on your Apache Spark 2.4.3 (or greater) instance. By using Delta Lake, you can make your data lakes more reliable (whether you create a new one or migrate an existing data lake).  To learn more, refer to https://delta.io/ and join the Delta Lake community via Slack and Google Group.  You can track all the upcoming releases and planned features in github milestones.

Coming up, we’re also excited to have Spark AI Summit Europe from October 15th to 17th. At the summit, we’ll have a training session dedicated to Delta Lake.

Credits

We want to thank the following contributors for updates, doc changes, and contributions in Delta Lake 0.4.0: Andreas Neumann, Burak Yavuz, Jose Torres, Jules Damji, Jungtaek Lim, Liwen Sun, Michael Armbrust, Mukul Murthy, Pranav Anand, Rahul Mahadev, Shixiong Zhu, Tathagata Das, Terry Kim, Wenchen Fan, Wesley Hoffman, Yishuang Lu, Yucai Yu, lys0716.

Try Databricks for free

Related posts

Engineering blog

Simple, Reliable Upserts and Deletes on Delta Lake Tables using Python APIs

October 3, 2019 by Tathagata Das in Engineering Blog
Try this Jupyter notebook We are excited to announce the release of Delta Lake 0.4.0 which introduces Python APIs for manipulating and managing...
See all Engineering Blog posts