Try out Delta Lake 0.7.0 with Spark 3.0 today!
It has been a little more than a year since Delta Lake became an open-source project as a Linux Foundation project. While a lot has changed over the last year, the challenges for most data lakes remain stubbornly the same - the inherent unreliability of data lakes. To address this, Delta Lake brings reliability and data quality for data lakes and Apache Spark; learn more by watching Michael Armbrust’s session at Big Things Conference.
With Delta Lake, you can simplify and scale your data engineering pipelines and improve your data quality data flow with the Delta Architecture.
Delta Lake Primer
To provide more details, the following section provides an overview of the features of Delta Lake. Included are links to various blogs and tech talks that dive into the technical aspects including the Dive into Delta Lake Internals Series of tech talks.
Building upon the Apache Spark Foundation
- Open Format: All data in Delta Lake is stored in Apache Parquet format, enabling Delta Lake to leverage the efficient compression and encoding schemes that are native to Parquet. Try this out using a Jupyter notebook and local Spark instance from Simple, Reliable Upserts, and Deletes on Delta Lake Tables using Python APIs.
- Spark API: Developers can use Delta Lake with their existing data pipelines with minimal change as it is fully compatible with Spark, the commonly used big data processing engine.
- Updates and Deletes: Delta Lake supports Scala / Java APIs to merge, update and delete datasets. This allows you to easily comply with GDPR and CCPA and also simplifies use cases like Change Data Capture. For more information, refer to Announcing the Delta Lake 0.3.0 Release, Simple, Reliable Upserts, and Deletes on Delta Lake Tables using Python APIs, and Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work.
- ACID transactions: Data lakes typically have multiple data pipelines reading and writing data concurrently, and data engineers have to go through a tedious process to ensure data integrity, due to the lack of transactions. Delta Lake brings ACID transactions to your data lakes. It provides serializability, the strongest level of isolation level. Learn more at Diving into Delta Lake: Unpacking the Transaction Log blog and tech talk.
- Unified Batch and Streaming Source and Sink: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box. To see this in action, try out the Delta Lake Tutorial from Spark + AI Summit EU 2019.
Data Lake Enhancements
- Scalable Metadata Handling: In big data, even the metadata itself can be "big data". Delta Lake treats metadata just like data, leveraging Spark's distributed processing power to handle all its metadata. As a result, Delta Lake can handle petabyte-scale tables with billions of partitions and files at ease. Learn more at Diving into Delta Lake: Unpacking the Transaction Log blog and tech talk.
- Time Travel (data versioning): Delta Lake provides snapshots of data enabling developers to access and revert to earlier versions of data for audits or rollbacks, or to reproduce experiments. Learn more in Introducing Delta Lake Time Travel for Large Scale Data Lakes and Getting Data Ready for Data Science with Delta Lake and MLflow.
- Audit History: Delta Lake transaction log records details about every change made to data providing a full audit trail of the changes. Learn more about related scenarios such as addressing GDPR and CCPA and using Delta Lake as a Using Delta Lake as a Change Data Capture source.
Schema Enforcement and Evolution
- Schema Enforcement: Delta Lake provides the ability to specify your schema and enforce it. This helps ensure that the data types are correct and required columns are present, preventing bad data from causing data corruption. For more information, refer to Diving Into Delta Lake: Schema Enforcement & Evolution blog and tech talk.
- Schema Evolution: Business requirements continuously change, therefore the shape and form of your data does as well. Delta Lake enables you to make changes to a table schema that can be applied automatically, without the need for cumbersome DDL. For more information, refer to Diving Into Delta Lake: Schema Enforcement & Evolution blog and tech talk.
Checkpoints from the last year
In April 2019, we announced that Delta Lake would be open-sourced with the Linux Foundation; the source code for the project can be found at https://github.com/delta-io/delta. In that time span, the project has quickly progressed with releases (6 so far), contributors (65 so far), and stars (>2500). At this time, we wanted to call out some of the cool features.
Execute DML statements
With Delta Lake 0.3.0, you now have the ability to run DELETE, UPDATE, and MERGE statements using the Spark API. Instead of running a convoluted mix of INSERTs, file-level deletions, and table removals and re-creations, you can execute DML statements within a single atomic transaction.
import io.delta.tables._ val deltaTable = DeltaTable.forPath(sparkSession, pathToEventsTable) deltaTable.delete("date In addition, this release included the ability to <i>query commit history</i> to understand what operations modified the table.
import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, pathToTable) val fullHistoryDF = deltaTable.history() // get the full history of the table. val lastOperationDF = deltaTable.history(1) // get the last operation.
The returned DataFrame will have the following structure.
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+ |version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+ | 5|2019-07-29 14:07:47| null| null| DELETE|[predicate -> ["(...|null| null| null| 4| null| false| | 4|2019-07-29 14:07:41| null| null| UPDATE|[predicate -> (id...|null| null| null| 3| null| false| | 3|2019-07-29 14:07:29| null| null| DELETE|[predicate -> ["(...|null| null| null| 2| null| false| | 2|2019-07-29 14:06:56| null| null| UPDATE|[predicate -> (id...|null| null| null| 1| null| false| | 1|2019-07-29 14:04:31| null| null| DELETE|[predicate -> ["(...|null| null| null| 0| null| false| | 0|2019-07-29 14:01:40| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| null| true| +-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
For Delta Lake 0.4.0, we made executing DML statements by supporting Python APIs as noted in Simple, Reliable Upserts, and Deletes on Delta Lake Tables using Python APIs.
Support for other processing engines
An important fundamental of Delta Lake was that while it is a storage layer originally conceived to work with Apache Spark, it can work with many other processing engines. As part of the Delta Lake 0.5.0 release, we included the ability to create manifest files so that you can query Delta Lake tables from Presto and Amazon Athena.
The blog post Query Delta Lake Tables from Presto and Athena, Improved Operations Concurrency, and Merge performance provides examples of how to create the manifest file to query Delta Lake from Presto; for more information, refer to Presto and Athena to Delta Lake Integration. Included as part of the same release was the experimental support for Snowflake and Redshift Spectrum. More recently, we’d like to call out integrations with dbt and koalas.
With Delta Connector 0.1.0, your Apache Hive environment can now read Delta Lake tables. With this connector, you can create a table in Apache Hive using STORED BY syntax to point it to an existing Delta table like this:
CREATE EXTERNAL TABLE deltaTable(col1 INT, col2 STRING) STORED BY 'io.delta.hive.DeltaStorageHandler' LOCATION '/delta/table/path'
Simplifying Operational Maintenance
As your data lakes grow in size and complexity, it becomes increasingly difficult to maintain it. But with Delta Lake, each release included more features to simplify the operational overhead. For example, Delta Lake 0.5.0 includes improvements in concurrency control and support for file compaction. Delta Lake 0.6.0 made further improvements including support for reading Delta tables from any file system and improved merge performance and automatic repartitioning.
As noted in Schema Evolution in Merge Operations and Operational Metrics in Delta Lake, Delta Lake 0.6.0 introduces schema evolution and performance improvements in merge and operational metrics in table history. By enabling automatic schema evolution in your environment,
# Enable automatic schema evolution spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")
you can run a single atomic operation to update values as well as merge together the new schema with the following example statement.
from delta.tables import * deltaTable = DeltaTable.forPath(spark, DELTA_PATH) # Schema Evolution with a Merge Operation deltaTable.alias("t").merge( new_data.alias("s"), "s.col1 = t.col1 AND s.col2 = t.col2" ).whenMatchedUpdateAll( ).whenNotMatchedInsertAll( ).execute()
Improvements to operational metrics were also included in the release so that you can review them from both the API and the Spark UI. For example, running the statement:
provides the abbreviated output of the modifications that had happened to your table.
+-------+------+---------+--------------------+ |version|userId|operation| operationMetrics| +-------+------+---------+--------------------+ | 1|100802| MERGE|[numTargetRowsCop...| | 0|100802| WRITE|[numFiles -> 1, n...| +-------+------+---------+--------------------+
For the same action, you can view this information directly within the Spark UI as visualized in the following animated GIF.
For more details surrounding this action, refer to Schema Evolution in Merge Operations and Operational Metrics in Delta Lake.
Enhancements coming with Spark 3.0
While the preceding section has been about our recent past, let’s get back to the future and focus on the enhancements coming with Spark 3.0.
Support for Catalog Tables
Delta tables can be referenced in an external catalog such as the HiveMetaStore with Delta Lake 0.7.0. Look out for Delta Lake 0.7.0 release working with Spark 3.0 in the coming weeks.
Expectations - NOT NULL columns
Delta tables can be created by specifying columns as NOT NULL. This will prevent any rows containing null values for those columns from being written to your tables.
CREATE TABLE events ( eventTime TIMESTAMP NOT NULL, eventType STRING NOT NULL, source STRING, tags MAP<string string=""> ) USING delta </string>
More support is on the way, for example the definition of arbitrary SQL expressions as invariants as well as being able to define these invariants on existing tables.
DataFrameWriterV2 is a much cleaner interface for writing a DataFrame to a table. Table creation operations such as “create”, “replace” are separate from data modification operations such as “append”, “overwrite” and provide the users a better understanding of what to expect. DataFrameWriterV2 APIs are only available in Scala with Spark 3.0.
// Create a table using the DataFrame or replace the existing table df.writeTo(“delta_table”) .tableProperties(“delta.appendOnly”, “true”) .createOrReplace() // Insert more data into the table df2.writeTo(“delta_table”).append()
Get Started with Delta Lake
Try out Delta Lake with the preceding code snippets on your Apache Spark 2.4.5 (or greater) instance (on Databricks, try this with DBR 6.6+). Delta Lake makes your data lakes more reliable (whether you create a new one or migrate an existing data lake). To learn more, refer to https://delta.io/, and join the Delta Lake community via Slack and Google Group. You can track all the upcoming releases and planned features in GitHub milestones. You can also try out Managed Delta Lake on Databricks with a free account.