We are excited to announce the release of Delta Lake 0.4.0 which introduces Python APIs for manipulating and managing data in Delta tables. The key features in this release are:
- Python APIs for DML and utility operations (#89) - You can now use Python APIs to update/delete/merge data in Delta Lake tables and to run utility operations (i.e., vacuum, history) on them. These are great for building complex workloads in Python, e.g., Slowly Changing Dimension (SCD) operations, merging change data for replication, and upserts from streaming queries. See the documentation for more details.
- Convert-to-Delta (#78) - You can now convert a Parquet table in place to a Delta Lake table without rewriting any of the data. This is great for converting very large Parquet tables which would be costly to rewrite as a Delta table. Furthermore, this process is reversible - you can convert a Parquet table to Delta Lake table, operate on it (e.g., delete or merge), and easily convert it back to a Parquet table. See the documentation for more details.
- SQL for utility operations - You can now use SQL to run utility operations vacuum and history. See the documentation for more details on how to configure Spark to execute these Delta-specific SQL commands.
In this blog, we will demonstrate on Apache Spark™ 2.4.3 how to use Python and the new Python APIs in Delta Lake 0.4.0 within the context of an on-time flight performance scenario. We will show how to upsert and delete data, query old versions of data with time travel and vacuum older versions for cleanup.
How to start using Delta Lake
The Delta Lake package is available as with the
--packages option. In our example, we will also demonstrate the ability to VACUUM files and execute Delta Lake SQL commands within Apache Spark. As this is a short demonstration, we will also enable the following configurations:
spark.databricks.delta.retentionDurationCheck.enabled=falseto allow us to vacuum files shorter than the default retention duration of 7 days. Note, this is only required for the SQL command VACUUM.
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtensionto enable Delta Lake SQL commands within Apache Spark; this is not required for Python or Scala API calls.
# Using Spark Packages ./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf "spark.databricks.delta.retentionDurationCheck.enabled=false" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
Loading and saving our Delta Lake data
This scenario will be using the On-time flight performance or Departure Delays dataset generated from the RITA BTS Flight Departure Statistics; some examples of this data in action include the 2014 Flight Departure Performance via d3.js Crossfilter and On-Time Flight Performance with GraphFrames for Apache Spark™. This dataset can be downloaded locally from this github location. Within
pyspark, start by reading the dataset.
# Location variables tripdelaysFilePath = "/root/data/departuredelays.csv" pathToEventsTable = "/root/deltalake/departureDelays.delta" # Read flight delay data departureDelays = spark.read \ .option("header", "true") \ .option("inferSchema", "true") \ .csv(tripdelaysFilePath)
Next, let’s save our departureDelays dataset to a Delta Lake table. By saving this table to Delta Lake storage, we will be able to take advantage of its features including ACID transactions, unified batch and streaming, and time travel.
# Save flight delay data into Delta Lake format departureDelays \ .write \ .format("delta") \ .mode("overwrite") \ .save("departureDelays.delta")
Note, this approach is similar to how you would normally save Parquet data; instead of specifying
format("parquet"), you will now specify
format("delta"). If you were to take a look at the underlying file system, you will notice four files created for the departureDelays Delta Lake table.
/departureDelays.delta$ ls -l . .. _delta_log part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet
Note, the _delta_log is the folder that contains the Delta Lake transaction log. For more information, refer to Diving Into Delta Lake: Unpacking The Transaction Log.
Now, let’s reload the data but this time our DataFrame will be backed by Delta Lake.
# Load flight delay data in Delta Lake format delays_delta = spark \ .read \ .format("delta") \ .load("departureDelays.delta") # Create temporary view delays_delta.createOrReplaceTempView("delays_delta") # How many flights are between Seattle and San Francisco spark.sql("select count(1) from delays_delta where origin = 'SEA' and destination = 'SFO'").show()
Finally, let’s determine the number of flights originating from Seattle to San Francisco; in this dataset, there are 1698 flights.
In-place Conversion to Delta Lake
If you have existing Parquet tables, you have the ability to perform in-place conversions your tables to Delta Lake thus not needing to rewrite your table. To convert the table, you can run the following commands.
from delta.tables import * # Convert non partitioned parquet table at path '/path/to/table' deltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`") # Convert partitioned parquet table at path '/path/to/table' and partitioned by integer column named 'part' partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`/path/to/table`", "part int")
For more information, including how to do this conversion in Scala and SQL, refer to Convert to Delta Lake.
Delete our Flight Data
To delete data from your traditional Data Lake table, you will need to:
- Select all of the data from your table not including the rows you want to delete
- Create a new table based on the previous query
- Delete the original table
- Rename the new table to the original table name for downstream dependencies.
Instead of performing all of these steps, with Delta Lake, we can simplify this process by running a DELETE statement. To show this, let’s delete all of the flights that had arrived early or on-time (i.e.
from delta.tables import * from pyspark.sql.functions import * # Access the Delta Lake table deltaTable = DeltaTable.forPath(spark, pathToEventsTable ) # Delete all on-time and early flights deltaTable.delete("delay
|2||2019-09-29 15:41:22||UPDATE||[predicate -> (or...|
|1||2019-09-29 15:40:45||DELETE||[predicate -> ["(...|
|0||2019-09-29 15:40:14||WRITE||[mode -> Overwrit...|
Travel Back in Time with Table History
With Time Travel, you can see review the Delta Lake table as of the version or timestamp. For more information, refer to Delta Lake documentation > Read older versions of data using Time Travel. To view historical data, specify the
Timestamp option; in the code snippet below, we will specify the
# Load DataFrames for each version dfv0 = spark.read.format("delta").option("versionAsOf", 0).load("departureDelays.delta") dfv1 = spark.read.format("delta").option("versionAsOf", 1).load("departureDelays.delta") dfv2 = spark.read.format("delta").option("versionAsOf", 2).load("departureDelays.delta") # Calculate the SEA to SFO flight counts for each version of history cnt0 = dfv0.where("origin = 'SEA'").where("destination = 'SFO'").count() cnt1 = dfv1.where("origin = 'SEA'").where("destination = 'SFO'").count() cnt2 = dfv2.where("origin = 'SEA'").where("destination = 'SFO'").count() # Print out the value print("SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s" % (cnt0, cnt1, cnt2)) ## Output SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986
Whether for governance, risk management, and compliance (GRC) or rolling back errors, the Delta Lake table contains both the metadata (e.g. recording the fact that a delete had occurred with these operators) and data (e.g. the actual rows deleted). But how do we remove the data files either for compliance or size reasons?
Cleanup Old Table Versions with Vacuum
The Delta Lake vacuum method will delete all of the rows (and files) by default that are older than 7 days (reference: Delta Lake Vacuum). If you were to view the file system, you’ll notice the 11 files for your table.
/departureDelays.delta$ ls -l _delta_log part-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
To delete all of the files so that you only keep the current snapshot of data, you will specify a small value for the vacuum method (instead of the default retention of 7 days).
# Remove all files older than 0 hours old. deltaTable.vacuum(0)
Note, you perform the same task via SQL syntax:¸ # Remove all files older than 0 hours old spark.sql("VACUUM '" + pathToEventsTable + "' RETAIN 0 HOURS")
Once the vacuum has completed, when you review the file system you will notice fewer files as the historical data has been removed.
/departureDelays.delta$ ls -l _delta_log part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet
Note, the ability to time travel back to a version older than the retention period is lost after running vacuum.
Try out Delta Lake today by trying out the preceding code snippets on your Apache Spark 2.4.3 (or greater) instance. By using Delta Lake, you can make your data lakes more reliable (whether you create a new one or migrate an existing data lake). To learn more, refer to https://delta.io/ and join the Delta Lake community via Slack and Google Group. You can track all the upcoming releases and planned features in github milestones.
We want to thank the following contributors for updates, doc changes, and contributions in Delta Lake 0.4.0: Andreas Neumann, Burak Yavuz, Jose Torres, Jules Damji, Jungtaek Lim, Liwen Sun, Michael Armbrust, Mukul Murthy, Pranav Anand, Rahul Mahadev, Shixiong Zhu, Tathagata Das, Terry Kim, Wenchen Fan, Wesley Hoffman, Yishuang Lu, Yucai Yu, lys0716.