Skip to main content
Platform blog

Using Structured Streaming with Delta Sharing in Unity Catalog

Will Girten
Josh Seidel
Lin Zhou
Sachin Thakur
Share this post

We are excited to announce that support for using Structured Streaming with Delta Sharing is now generally available (GA) in Azure, AWS, and GCP! This new feature will allow data recipients on the Databricks Lakehouse Platform to stream changes from a Delta Table shared through the Unity Catalog.

Data providers can leverage this capability to scale their data-as-a-service easily, reduce the operational cost of sharing large data sets, improve data quality with immediate data validation and quality checks as new data arrives, and improve customer service with real-time data delivery. Similarly, data recipients can stream the latest changes from a shared dataset, reducing the infrastructure cost of processing large batch data and setting the foundation for cutting-edge, real-time data applications. Data recipients across many industry verticals can benefit from this new feature, for example:

  • Retail: Data analysts can stream the latest sales figures for a seasonal fashion line and present business insights in the form of a BI report.
  • Health Life Sciences: Health practitioners can stream electrocardiogram readings into an ML model to identify abnormalities.
  • Manufacturing: Building management teams can stream smart thermostat readings and identify what time of day or night heating and cooling units should efficiently turn on or off.

Oftentimes, data teams rely upon data pipelines executed in a batch fashion to process their data due to the fact that batch execution is both robust and easy to implement. However, today, organizations need the latest arriving data to make real-time business decisions. Structured streaming not only simplifies real-time processing but also simplifies batch processing by reducing the number of batch jobs to just a few streaming jobs. Converting batch data pipelines to streaming is trivial as Structured Streaming supports the same DataFrame API.

In this blog article, we'll explore how enterprises can leverage Structured Streaming with Delta Sharing to maximize the business value of their data in near real-time using an example in the financial industry. We'll also examine how other complementary features, like Databricks Workflows, can be used in conjunction with Delta Sharing and Unity Catalog to build a real-time data application.

Support for Structured Streaming

Perhaps the most highly anticipated Delta Sharing feature over the past few months has been added support for using a shared Delta Table as a source in Structured Streaming. This new feature will allow data recipients to build real-time applications using Delta Tables shared through Unity Catalog on the Databricks Lakehouse Platform.

Delta Sharing now supports using a shared Delta Table as a Structured Streaming source.
Delta Sharing now supports using a shared Delta Table as a Structured Streaming source.

How to Use Delta Sharing with Structured Streaming

Let's take a closer look at how a data recipient might stream publicly traded stock symbol information for real-time trading insights. This article will use the FINRA CAT Reportable Equity Securities Symbol Master dataset, which lists all stocks and equity securities traded across the U.S. National Market System (NMS). Structured Streaming can be used to build real-time applications, but it can also be useful in scenarios where data arrives less frequently. For a simple notebook demonstration, we'll use a dataset that is updated three times throughout the day - once at the start of the transaction date (SOD), a second time during the day to reflect any intraday changes, and a third time at the end of the transaction date (EOD). There are no updates published on weekends or on U.S. holidays.

Published FileSchedule
CAT Reportable Equity Securities Symbol Master – SOD6:00 a.m. EST
CAT Reportable Options Securities Symbol Master – SOD6:00 a.m. EST
Member ID (IMID) List6:00 a.m. EST
Member ID (IMID) Conflicts List6:00 a.m. EST
CAT Reportable Equity Securities Symbol Master – Intraday10:30 a.m. EST, and approximately every 2 hours until EOD file is published
CAT Reportable Options Securities Symbol Master – Intraday10:30 a.m. EST, and approximately every 2 hours until EOD file is published
CAT Reportable Equity Securities Symbol Master – EOD8 p.m. EST
CAT Reportable Options Securities Symbol Master – EOD8 p.m. EST

Table 1.1 - The FINRA CAT symbol and member reference data is published throughout the business day. There are no updates published on weekends or on U.S. holidays.

From Data Provider's Perspective: Ingesting the CAT Data using Databricks Workflows

One of the major benefits of the Databricks Lakehouse Platform is that it makes continuously streaming changes into a Delta Table extremely easy. We'll first start by defining a simple Python task that downloads the FINRA CAT equity securities symbol file at the start of the transaction date (SOD). Afterward, we'll save the published file to a temporary directory on the Databricks filesystem.

# First, we'll download the FINRA CAT Equity Securities Symbols file for today's Start of Day
request = requests.get(catReferenceDataURL, stream=True, allow_redirects=True)

# Next, save the published file to a temp directory on the Databricks filesystem
with open(dbfsPath, "wb") as binary_file:
   for chunk in request.iter_content(chunk_size=2048):
       if chunk:
           binary_file.write(chunk)
           binary_file.flush()

Code 1.1. - A simple Python task can download the FINRA CAT equity symbol file at the start of the trading day.

To demonstrate, we'll also define a function that will ingest the raw file and continuously update a bronze table in our Delta Lake each time an updated file is published.

# Finally, we'll ingest the latest equity symbols CSV file into a "bronze" Delta table
def load_CAT_reference_data():
   return (
       spark.read.option("header", "true")
           .schema(catEquitySymbolsMasterSchema)
           .option("delimiter", "|")
           .format("csv")
           .load(localFilePath)
           .withColumn("catReferenceDataType", lit("FINRACATReportableEquitySecurities_SOD"))
           .withColumn("currentDate", current_date())
           .withColumn("currentTimestamp", current_timestamp())
           .withColumn("compositeKey", concat_ws(".", "symbol", "listingExchange"))
   )

Code. 1.2 - The FINRA CAT equity symbol data is ingested into a Delta Table at the start of each trading day.

Once it is started, the Databricks Workflow will begin populating the CAT equity symbols dataset each time the file is published at the start of the trading day.

Figure 1.1. - The CAT equity symbols master file (CSV) is ingested daily at the start of the transaction date and landed into a bronze Delta Table.
Figure 1.1. - The CAT equity symbols master file (CSV) is ingested daily at the start of the transaction date and landed into a bronze Delta Table.

From Data Provider's Perspective: Sharing a Delta Table as a Streaming Source

Now that we've created a streaming pipeline to ingest updates to the symbol file each trading day, we can leverage Delta Sharing to share the Delta Table with data recipients. Creating a Delta Share on the Databricks Lakehouse Platform can be done with just a few clicks of the button or with a single SQL statement if SQL syntax is preferred.

Fig.1.2 - A data provider first creates a Delta Share, which will later hold the shared Delta Table.
Fig.1.2 - A data provider first creates a Delta Share, which will later hold the shared Delta Table.

Similarly, a data provider can populate a Delta Share with one or more tables by clicking the 'Manage assets' button, followed by the 'Edit tables' button. In this case, the bronze Delta Table containing the equity symbol data is added to the Share object.

Fig. 1.3 - A Delta Sharing provider can add a streaming table to a Delta Share just like a typical Delta table.
Fig. 1.3 - A Delta Sharing provider can add a streaming table to a Delta Share just like a typical Delta table.

Note that the full history of a Delta table must be shared to support reads using Structured Streaming. History sharing is enabled by default using the Databricks UI to add a Delta table to a Share. However, history sharing must be explicitly specified when using the SQL syntax.

/**
  A Delta table must be shared with history in order to support
  Spark Structured Stream reads.
*/
ALTER SHARE finra_cat_share
ADD TABLE finance_catalog.finra.symbols_master
WITH HISTORY;

Code 1.4 - The history of a Delta table must be explicitly shared to support Structured Streaming reads when using the SQL syntax.

From Data Recipient's Perspective: Streaming a Shared Delta Table

Fig.1.4 - A data recipient can create a new Catalog from the Delta Share.
Fig.1.4 - A data recipient can create a new Catalog from the Delta Share.

As a data recipient, streaming from a shared Delta table is just as simple! After the Delta Share has been shared with a data recipient, the recipient will immediately see the Share appear under the provider details in Unity Catalog. Subsequently, the data recipient can create a new catalog in Unity Catalog by clicking the 'Create catalog' button, providing a meaningful name, and adding an optional comment to describe the Share contents.

Data recipients can stream from a Delta Table shared through Unity Catalog using Databricks Runtime 12.1 or greater. In this example, we've used a Databricks cluster with Databricks 12.2 LTS Runtime installed. A data recipient can read the shared Delta table as a Spark Structured Stream using the deltaSharing data source and supplying the name of the shared table.

# Stream from the shared Delta table that's been created with a new Catalog in Unity Catalog
equity_master_stream = (spark.readStream
                       .format('deltaSharing')
                       .table('finra_cat_catalog.finra.cat_equity_master'))
equity_master_stream.display()

Code 1.4 - A data recipient can stream from a shared Delta Table using the deltaSharing data source.

As a further example, let's combine the shared CAT equity symbols master dataset with a stock price history dataset, local to the data recipient's Unity Catalog. We'll begin by defining a utility function for getting the weekly stock price histories of a given stock ticker symbol.

import yfinance as yf
import pyspark.sql.functions as F


def get_weekly_stock_prices(symbol: str):
   """ Scrapes the stock price history of a ticker symbol over the last 1 week.
   arguments:
       symbol (String) - The target stock symbol, typically a 3-4 letter abbreviation.
   returns:
       (Spark DataFrame) - The current price of the provided ticker symbol.
   """
   ticker = yf.Ticker(symbol)

   # Retrieve the last recorded stock price in the last week
   current_stock_price = ticker.history(period="1wk")

   # Convert to a Spark DataFrame
   df = spark.createDataFrame(current_stock_price)

   # Select only columns relevant to stock price and add an event processing timestamp
   event_ts = str(current_stock_price.index[0])
   df = (df.withColumn("Event_Ts", F.lit(event_ts))
       .withColumn("Symbol", F.lit(symbol))
       .select(
       F.col("Symbol"), F.col("Open"), F.col("High"), F.col("Low"), F.col("Close"),
       F.col("Volume"), F.col("Event_Ts").cast("timestamp"))
   )

   # Return the latest price information
   return df

Next, we'll join together the equity stock master data stream with the local stock price histories of 3 large tech stocks - Apple Inc. (AAPL), the Microsoft Corporation (MSFT), and the Invidia Corporation (NVDA).

# Grab the weekly price histories for three major tech stocks
aapl_stock_prices = get_weekly_stock_prices('AAPL')
msft_stock_prices = get_weekly_stock_prices('MSFT')
nvidia_stock_prices = get_weekly_stock_prices('NVDA')
all_stock_prices = aapl_stock_prices.union(msft_stock_prices).union(nvidia_stock_prices)

# Join the stock price histories with the equity symbols master stream
symbols_master = spark.readStream.format('deltaSharing').table('finra_catalog.finra.cat_equity_master')
(symbols_master.join(all_stock_prices, on="symbol", how="inner")
.select("symbol", "issueName", "listingExchange", "testIssueFlag", "catReferenceDataType",
        "Open", "High", "Low", "Close", "Volume", "event_ts")
).display()

Finally, the data recipient can add an optional destination sink and start the streaming query.

Fig.1.6. - Data recipients can read a shared Delta table as a Spark structured stream.
Fig.1.6. - Data recipients can read a shared Delta table as a Spark structured stream.

Getting Started with Delta Sharing on Databricks

I hope you enjoyed this example of how organizations can leverage Delta Sharing to maximize the business value of their data in near real-time.

Want to get started with Delta Sharing but don't know where to start? If you already are a Databricks customer, follow the guide to get started using Delta Sharing (AWS | Azure | GCP). Read the documentation to learn more about the configuration options included in with feature. If you are not an existing Databricks customer, sign up for a free trial with a Premium or Enterprise workspace.

Credits

We'd like to extend special thanks for all of the contributions to this release, including Abhijit Chakankar, Lin Zhou, and Shixiong Zhu.

Resources

Try Databricks for free

Related posts

Platform blog

Cost Effective and Secure Data Sharing: The Advantages of Leveraging Data Partitions for Sharing Large Datasets

In today's business landscape, secure and cost-effective data sharing is more critical than ever for organizations looking to optimize their internal and external...
Platform blog

Announcing General Availability of Delta Sharing

Today we are excited to announce that Delta Sharing is generally available (GA) on AWS and Azure. With the GA release, you can...
Platform blog

Security Best Practices for Delta Sharing

Update: Delta Sharing is now generally available on AWS and Azure. The data lakehouse has enabled us to consolidate our data management architectures...
See all Platform Blog posts