Skip to main content

Technical Guide

Getting Started with Delta Live Tables

Getting Started with Delta Live Tables

Introduction

This guide will demonstrate how Delta Live Tables enables you to develop scalable, reliable data pipelines that conform to the data quality standards of a Lakehouse architecture.

Let's begin by describing a common scenario. We have data from various OLTP systems in a cloud object storage such as S3, ADLS or GCS. Some data sets are updated regularly, and some are historical snapshots of the source system. We have a general understanding of the consumers of the data and the transformations, and we will follow the Lakehouse Architecture to segment data quality into raw, refined, and aggregated tiers:

Lakehouse Architecture to segment data quality into raw, refined, and aggregated tiers

Each of these Gold tables may serve diverse consumers, from BI reporting to training machine learning models, and therefore the journey of this data from the source to the Gold layer will have different requirements that we care about as data engineers:

  • Latency: When we ingest new data, it must be visible in the silver table within 5 seconds"
  • Cost: "We cannot run a fixed capacity cluster 24/7 to support these updates"
  • Accuracy: "How much should I account for late arriving data in my real-time sources?"

At first glance, many of these requirements may seem simple to meet in the reference pipeline above. However, while the lakehouse pipeline is intentionally elegant and simple, in reality we often are not dealing with a straightforward linear flow. In reality, it usually looks something like this:

Scale to enrich our analytics environment with additional data source

As we begin to scale to enrich our analytics environment with additional data sources to empower new insights, ETL complexity multiplies exponentially, and the following challenges cause these pipelines to become extremely brittle:

  • Error handling and recovery is laborious due to no clear dependencies between tables
  • Data quality is poor, as enforcing and monitoring constraints is a manual process
  • Data lineage cannot be traced, or heavy implementation is needed at best
  • Observability at the granular, individual batch/stream level is impossible
  • Difficult to account for batch and streaming within a unified pipeline

Note: Batch and Streaming?

Spark provides the ability to use batch and streaming paradigms with a single API, and Delta Lake enables concurrent batch and stream operations on a single dataset hence eliminating the tradeoffs or reprocessing needed in a two-tier or Lambda Architectures, There is still a lot of work that goes into implementing and monitoring streams, especially in an ETL process that combines streams and batch jobs as individual hops between datasets.

Declarative ETL

When programming ETL pipelines, data transformations are often performed "procedurally". This means that actions to be performed on the data are expressed to the ETL engine as a series of computational steps to be carried out. In many cases, even when you are using an orchestration tool such as Airflow or Azure Data Factory, jobs are launched which contain procedural logic. While the orchestrator may have to be aware of the dependencies between jobs, they are opaque to the ETL transformations and business logic.

On the other hand, declarative ETL involves the user describing the desired results of the pipeline without explicitly listing the ordered steps that must be performed to arrive at the result. Declarative means focusing on the what "what" is our desired goal and leveraging an intelligent engine like DLT to figure out "how" the compute framework should carry out these processes.

You may think of procedural vs declarative ETL definitions like giving someone step-by-step driving directions versus providing them with a GPS which includes a map of the city and traffic flow information.

Driving directions will provide steps for the driver to reach their destination, but cannot provide them an ETA, and they won't know which neighborhoods they'll pass on the way. Additionally, if a detour needs to be made to the route, the step-by-step directions are now useless, but the GPS with the map will be able to reroute around the detour.

In this metaphor, the map is your DLT pipeline. The DLT engine is the GPS that can interpret the map and determine optimal routes and provide you with metrics such as ETA. Details about the neighborhoods that were traversed in the route are like data lineage, and the ability to find detours around accidents (or bugs) is a result of dependency resolution and modularity which is afforded by the declarative nature of DLT.

Your First Pipeline

In this guide, we will be implementing a pipeline that suffers from these challenges and will use this as an opportunity to teach you how DLT's declarative development paradigm enables simplified ETL development and improved quality, lineage, and observability across the lakehouse.

To get started quickly, we host the finished result of the pipeline here in the Delta Live Tables Notebooks repo. You can copy this SQL notebook into your Databricks deployment for reference, or you can follow along with the guide as you go.

This guide will focus on the SQL pipeline but if you would rather run the same pipeline in Python, use this notebook.

Prerequisites

To get the most out of this guide, you should have a basic familiarity with:

  • SQL
  • Developing ETL pipelines and/or working with Big Data systems
  • Databricks interactive notebooks and clusters
  • You must have access to a Databricks Workspace with permissions to create new clusters, run jobs, and save data to a location on external cloud object storage or DBFS.

The Dataset

In your first pipeline, we will use the retail-org data set in databricks-datasets which comes with every workspace. Delta Live Tables provides techniques for handling the nuances of Bronze tables (i.e., the raw data) in the Lakehouse. You will use the Auto Loader feature to load the data incrementally from cloud object storage.

Bronze Datasets: Ingesting the dataset using Cloud Files

Bronze datasets represent the rawest quality. We often will make minimal adjustments from the origin, leveraging the cost-effectiveness of cloud storage to create a pristine source off of which we can validate refined data, access fields that we may not usually report on, or create new pipelines altogether. A common pattern at this stage is to continuously ingest new data from a location in cloud storage.

"Streaming Updates," "Continuous Processing," vs. "Streaming" in DLT

While some of these terms may be used interchangeably in common parlance, they have distinct meanings in DLT. Readers experienced with Spark Structured Streaming may also notice some overloaded terminology. Here we try to disambiguate these terms:

  • Streaming is a processing paradigm in which data sets are treated as unbounded
  • Incremental is an update pattern in which minimal changes are made to the destination data
  • Continuous refers to a pipeline that is always running until it is stopped at an arbitrary time, as opposed to stopping at a time based on the state of the source data when the pipeline starts

You may notice some overlap between unbounded stream processing frameworks like Spark Structured Streaming and streaming data sets in DLT. In fact, DLT's streaming data sets leverage the fundamentals of Spark Structured Streaming and the Delta transaction log but abstract much of the complexity, allowing the developer to focus on meeting processing requirements instead of systems-level heavy lifting.

We will discuss how DLT's streaming data sets and DLT's continuous mode interact in the Gold section of this guide.

*Warning*: The term "continuous" is also used to reference an experimental Trigger mode in Spark Structured Streaming in which micro-batches consist of single records. This is a different definition than "continuous" in DLT

As an example, let's take a look at one of the Bronze tables we will ingest.

CREATE STREAMING LIVE TABLE sales_orders_raw
COMMENT "The raw sales orders, ingested from /databricks-datasets."
TBLPROPERTIES ("quality" = "bronze")
AS
SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles.inferColumnTypes", "true"));

The SQL statement uses the Auto Loader to create a streaming live table called sales_orders_raw from json files.

cloud_files: Invokes the Auto Loader and takes a cloud storage path and format as parameters. (Note that the API is slightly different than cloudFiles invocation outside of DLT).
Now, let's create a Pipeline to ingest data from cloud object storage.
Open your Workspace

  1. Create your first DLT Pipeline Notebook
  2. Create a fresh notebook for your DLT pipeline such as "dlt_retail_sales_pipeline"
  3. Copy the following code into the first cell:

    CREATE STREAMING LIVE TABLE customers
    COMMENT "The customers buying finished products, ingested from /databricks-datasets."
    TBLPROPERTIES ("quality" = "mapping")
    AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv");
    
    CREATE STREAMING LIVE TABLE sales_orders_raw
    COMMENT "The raw sales orders, ingested from /databricks-datasets."
    TBLPROPERTIES ("quality" = "bronze")
    AS
    SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles.inferColumnTypes", "true"));

Note: Pipeline Notebooks
DLT Pipeline Notebooks are special, even though they use standard Databricks notebooks. While we don't currently prevent you from attaching a cluster to a Pipeline Notebook, an attached cluster will never be used by DLT to run a pipeline. As a best practice we recommend you leave the pipeline notebook in a detached state, and use a secondary scratch notebook to run arbitrary commands while developing. If you run a pipeline notebook against an attached cluster, you will see something similar to this…

DLT-CreateLiveStream

  1. Open Jobs in a new tab or window, and select "Delta Live Tables"
  2. Select "Create Pipeline" to create a new pipeline
  3. Specify a name such as "Sales Order Pipeline"
  4. Specify the Notebook Path as the notebook created in step 2. This is a required step, but may be modified to refer to a non-notebook library in the future.
  5. A Target is optional but recommended since the target is the target database where other authorized members can access the resulting data from the pipeline.
  6. A Storage Location is optional but recommended. You may specify an external blob storage location if you have configured one. This is where DLT will produce both datasets and metadata logs for the pipeline.Tip:If storage is not specified, all data and logs generated from the DLT Pipeline will be stored in a path in your DBFS root storage created by DLT. You can find the path in the Edit Setting JSON file later on. To store data and logs in an external (i.e. non-DBFS root) location, you must specify Storage Location for the DLT Pipeline.
  7. Set Pipeline Mode to Triggered
  8. Set the minimum and maximum numbers of workers used for cluster size
  9. Select "Start"
  10. You have created your first pipeline!

Pipeline Logs

You will now see a section below the graph that includes the logs of the pipeline runs. Here is what the section may look like.

DLT Pipline

First Ingest Code Explained

The icons represent DLT Datasets, which in this case are Tables. These two tables we consider bronze tables. Specifically, they are Incremental Live Tables and we ingested them using the Auto Loader feature using the cloud_files function

DLT Graphic First Ingest Code

Views and Tables in DLT

In DLT, Views are similar to a temporary view in SQL and are an alias for some computation. A view allows you to break a complicated query into smaller or easier-to-understand queries. Views also allow you to reuse a given transformation as a source for more than one table. Views are available from within a pipeline only and cannot be queried interactively.

In DLT, Tables are similar to traditional materialized views. The Delta Live Tables runtime automatically creates tables in the Delta format and ensures those tables are updated with the latest result of the query that creates the table.

Consumers can read these tables and views from the Data Lakehouse as with standard Delta Tables (e.g. for reporting in SQL or data science in Python), but they are being updated and managed by the DLT engine. For more detail, see the section on targets below.

Silver Datasets: Expectations and high-quality data

In this section, we will hand you the reins to develop an end-to-end pipeline as demonstrated by the below DAG. We have already created the bronze datasets and now for the silver then the gold, as outlined in the Lakehouse Architecture paper published at the CIDR database conference in 2020, and use each layer to teach you a new DLT concept.

DLT-Silver-data-sets


The Silver layer is all about high-quality, diverse, and accessible datasets. These may not serve a specific use case such as serving a production report at low latency, but they have been cleansed, transformed, and curated so that data scientists and analysts can easily and confidently consume these tables to quickly perform preprocessing, exploratory analysis, and feature engineering so that they can spend their remaining time on machine learning and insight gathering.

The big productivity killer for these consumers is not just data access and preprocessing, but confidence in the quality of the data they are using. For this reason, we will use DLT to ensure these datasets adhere to a specific quality level, and clearly annotate the datasets. Both data consumers and decision-makers can use the resulting cataloging and quality monitoring that will be derived from the proper use of constraints and comments.

  1. Open your pipeline notebook and create a new cell.
  2. Copy the following code to a new cell:

    CREATE STREAMING LIVE TABLE sales_orders_cleaned(
      CONSTRAINT valid_order_number EXPECT (order_number IS NOT NULL) ON VIOLATION DROP ROW
    )
    PARTITIONED BY (order_date)
    COMMENT "The cleaned sales orders with valid order_number(s) and partitioned by order_datetime."
    TBLPROPERTIES ("quality" = "silver")
    AS
    SELECT f.customer_id, f.customer_name, f.number_of_line_items,
      TIMESTAMP(from_unixtime((cast(f.order_datetime as long)))) as order_datetime,
      DATE(from_unixtime((cast(f.order_datetime as long)))) as order_date,
      f.order_number, f.ordered_products, c.state, c.city, c.lon, c.lat, c.units_purchased, c.loyalty_segment
      FROM STREAM(LIVE.sales_orders_raw) f
      LEFT JOIN LIVE.customers c
          ON c.customer_id = f.customer_id
         AND c.customer_name = f.customer_name
  3. Return to the Pipeline "Sales Order Pipeline" by navigating to Jobs in the left navbar, selecting "Delta Live Tables" and selecting the pipeline creating in a previous step
  4. Select the dropdown next to the Start/Stop toggle, and select "Full Refresh"

Constraint: Constraints allow you to define data quality expectations. They take a statement that resolves as any Spark filter predicate, and an action to take upon failure. An action can be either to retain, drop, fail, or quarantine. For more detail see here. All constraints are logged to enable streamlined quality monitoring.

Tblproperties: a list of key-value pairs that may be either Delta Lake properties, DLT pipeline properties, or arbitrary. Arbitrary tblproperties are like tags that can be used for data cataloging. In this example, "quality": "silver" is an arbitrary property that functions as a tag.

Comment: A string briefly describing the table's purpose, for use with data cataloging in the future

Gold data sets: complete vs streaming / continuous vs triggered

Many aggregations cannot be performed incrementally and must be performed as complete reprocesses, even if new data can be processed incrementally upstream of the aggregation at the bronze and silver layer. However, there is significant value in having access to real-time or "fast" data that has not yet been aggregated. Unlike traditional Lambda Architectures which require a complex two-tier infrastructure to process fast and slow data, the Lakehouse Architecture enables a single pipeline with both real-time incremental "fast" bronze and silver layers, and a batch updated gold layer (made possible by the strong consistency guarantees of Delta Lake storage).

In practice, this pattern may be challenging in procedural ETL which requires deploying separate stream and batch jobs and maintaining each individually. To solve this, DLT allows you to choose whether each dataset in a pipeline is complete or incremental, with minimal changes to the rest of the pipeline. This makes it easy to scale pipelines involving combinations of bronze and silver real-time data with gold aggregation layers.

Fact bubble: some Spark aggregations can be performed incrementally, such as count, min, max, and sum. In some simple cases, it may make sense to declare gold datasets as incremental. However, even with simple counts and sums this may become inefficient and is not recommended if you are using multiple groupings (e.g. GROUP BY col1, col2, col3).

In this case of our gold tables, we are creating complete gold tables by aggregating data in the silver table by city:

  1. Open your pipeline notebook and create a new cell.
  2. Copy the following code to a new cell:

    CREATE LIVE TABLE sales_order_in_la
    COMMENT "Sales orders in LA."
    TBLPROPERTIES ("quality" = "gold")
    AS
    SELECT city, order_date, customer_id, customer_name, ordered_products_explode.curr,
    SUM(ordered_products_explode.price) as sales,
    SUM(ordered_products_explode.qty) as qantity,
    COUNT(ordered_products_explode.id) as product_count
    FROM (
    SELECT city, DATE(order_datetime) as order_date, customer_id, customer_name,
    EXPLODE(ordered_products) as ordered_products_explode
    FROM LIVE.sales_orders_cleaned
    WHERE city = 'Los Angeles'
      )
    GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr;
    
    CREATE LIVE TABLE sales_order_in_chicago
    COMMENT "Sales orders in Chicago."
    TBLPROPERTIES ("quality" = "gold")
    AS
    SELECT city, order_date, customer_id, customer_name,
    ordered_products_explode.curr,
    SUM(ordered_products_explode.price) as sales,
    SUM(ordered_products_explode.qty) as qantity,
    COUNT(ordered_products_explode.id) as product_count
    FROM (
      SELECT city, DATE(order_datetime) as order_date, customer_id, customer_name,
    EXPLODE(ordered_products) as ordered_products_explode
      FROM LIVE.sales_orders_cleaned
      WHERE city = 'Chicago'
      )
    GROUP BY order_date, city, customer_id, customer_name, ordered_products_explode.curr;
  3. Return to the Pipeline "Sales Order Pipeline" by navigating to Jobs in the left navbar, selecting "Delta Live Tables" and selecting the pipeline creating in a previous step
  4. Select the dropdown next to the Start/Stop toggle, and select "Full Refresh"

Continuous vs Triggered pipeline mode

In DLT, while individual datasets may be Incremental or Complete, the entire pipeline may be Triggered or Continuous. When a continuous pipeline is started, it will spin up infrastructure and continue to ingest new data until the pipeline is stopped manually or via the API. A triggered pipeline will consume all new data in the source once and will spin down infrastructure automatically. A triggered pipeline will typically run on a schedule using an orchestrator or Databricks Multi-Task Jobs in production.

To toggle between Triggered and Continuous modes, open your pipeline and select "Edit Settings." Continuous will be a boolean in the JSON. Setting "continuous": false" is equivalent to setting the pipeline to Triggered mode.

This gives you the flexibility to slowly mature into continuous processing paradigm without significantly refactoring your code. This is a common pattern for organizations who are beginning to realize the value of real-time insights without incurring the higher cost of continuously running cloud infrastructure. Experienced Spark engineers may use the below matrix to understand DLT's functionality:

Read from:Write to:Continuous ModeTriggered Mode
CompleteCompleteReprocess on a predefined intervalSingle reprocess (drop and replace)
CompleteIncrementalNot possibleNot possible
IncrementalCompleteReprocess on a predefined intervalReprocess the materialized stream result
IncrementalIncrementalStreaming using default triggerTrigger.once() streaming

Productionization

We have now defined the pipeline. We can conclude with the following steps:

  1. Set a target database to make it easier for consumers to query the pipeline's datasets
  2. Set the pipeline to production mode
  3. If you choose to use Triggered mode, you can schedule the pipeline using multi task jobs

Pipeline Observability and Data Quality Monitoring

Try this notebook in Databricks

Event Logs

DLT emits all pipeline logs to a predefined Delta Lake table in the pipeline's Storage Location, which can be used for monitoring, lineage, and data quality reporting. You can import this generic log analysis notebook to inspect the event logs, or use dbutils to access the Delta table as {{your storage location}}/system/events.

Most useful information is in the log table's "details" column. Here are the different types of actions that will cause DLT to emit a log, and some relevant fields for that event you will find in within "details":

  • user_action: Events occur when taking actions like creating the pipeline
  • flow_definition: Events occur when a pipeline is deployed or updated and have lineage, schema, and execution plan information
    • output_dataset and input_datasets- output table/view and its upstream table(s)/view(s)
    • flow_type - whether this is a complete or append flow
    • explain_text - the Spark explain plan
  • flow_progress: Events occur when a data flow starts running or finishes processing a batch of data
    • metrics - currently contains num_output_rows
    • data_quality - contains an array of the results of the data quality rules for this particular dataset
      • dropped_records
      • expectations
        • name, dataset, passed_records, failed_records

Data Quality Monitoring (requires Databricks SQL)

Because DLT logs are exposed as a Delta table, and the log contains data expectation metrics, it is easy to generate reports to monitor data quality with your BI tool of choice. We recommend using Databricks SQL as it is tightly integrated with Delta and the Databricks platform and provides extremely fast query speeds via easy to manage compute endpoints.

Data quality report using Databricks SQL

To create a data quality report using Databricks SQL, follow these steps:

  1. Note the "storage location" for your pipeline by navigating to your pipeline, selecting Edit Settings, and copying the value for "storage_location"
  2. Register the log table in the metastore using the below example and the storage location from step 1:

    CREATE TABLE {{my_pipeline_logs}}
    AS SELECT * FROM delta.`{{pipeline storage location}}/system/events`
  3. In the top-left dropdown, toggle to the "SQL" workspace (you should be in "Data Science & Engineering" workspace when developing DLT pipelines)
  4. In the left navbar, select "Queries"
  5. Select "Create Query"
  6. Copy the following SQL query, replacing {{my_pipeline_logs}} with the name of the table you created in step 2:

    WITH all_expectations AS (
        SELECT
            explode(
              from_json(
                details:flow_progress:data_quality:expectations,
                schema_of_json("[{'name':'str', 'dataset':'str',
                'passed_records':'int', 'failed_records':'int'}]")
              )
            ) AS expectation
          FROM {{my_pipeline_logs}}
          WHERE details:flow_progress.metrics IS NOT NULL
    )
    SELECT expectation_name,  X_Axis, SUM(Y_Axis) AS Y_Axis
    FROM (
        SELECT expectation.name AS expectation_name, 'Passed'
    AS X_Axis, expectation.passed_records AS Y_Axis
        FROM all_expectations
        UNION ALL
        SELECT expectation.name AS expectation_name, 'Failed'
    AS X_Axis, expectation.failed_records AS Y_Axis
        FROM all_expectations
      )
    GROUP BY expectation_name, X_Axis
  7. Running the query, you should see a response similar to below:
DLT_graphic_running_query_ex
  1. Select "Add Visualization"
  2. Select a Visualization type as "Chart" and a Chart Type as "Pie." Set the X and Y columns, and set grouping to expectation_name:
Databricks SQL Pie Chart Visualization

You can now experiment with using different chart and/or visualization types within Redash. In general, for charts, you can use the X_axis and Y_axis and group by expectation_name to create dashboards for different quality monitoring purposes

Conclusion

Now that you have stepped through your first Delta Live Tables pipeline and learned some key concepts along the way, we can't wait to see the pipelines you create! For more information on Delta Live Tables, please see our DLT documentation, watch a demo, or download the notebooks!