IntroductionDeltaLake(Python)
Loading...

Building Reliable Data Lakes with Delta Lake and Apache Spark™

Delta Lake: An open-source storage format that brings ACID transactions to Apache Spark™ and big data workloads. This is a quick 101 introduction some of Delta Lake features.

  • Open format: Stored as Parquet format in blob storage.
  • ACID Transactions: Ensures data integrity and read consistency with complex, concurrent data pipelines.
  • Schema Enforcement and Evolution: Ensures data cleanliness by blocking writes with unexpected.
  • Audit History: History of all the operations that happened in the table.
  • Time Travel: Query previous versions of the table by time or version number.
  • Deletes and upserts: Supports deleting and upserting into tables with programmatic APIs.
  • Scalable Metadata management: Able to handle millions of files are scaling the metadata operations with Spark.
  • Unified Batch and Streaming Source and Sink: A table in Delta Lake is both a batch table, as well as a streaming source and sink. Streaming data ingest, batch historic backfill, and interactive queries all just work out of the box.

Setup Instructions

  • You need DBR 7.6 or above.

Source

This notebook is a modified version of the SAIS EU 2019 Delta Lake Tutorial. The data used is a modified version of the public data from Lending Club. It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data please view the data dictionary available here.

Delta Lake Tiny Logo Loading data in Delta Lake table

First let’s, read this data and save it as a Delta Lake table.

spark.sql("set spark.sql.shuffle.partitions = 1")
 
sourcePath = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
 
# Configure Delta Lake Path
deltaPath = "/tmp/loans_delta"
 
# Remove folder if it exists
dbutils.fs.rm(deltaPath, recurse=True)
 
# Create the Delta table with the same loans data
(spark.read.format("parquet").load(sourcePath) 
  .write.format("delta").save(deltaPath))
 
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")
print("Defined view 'loans_delta'")
Defined view 'loans_delta'

Delta Lake Tiny Logo Let's explore the data.

spark.sql("SELECT count(*) FROM loans_delta").show()
+--------+ |count(1)| +--------+ | 14705| +--------+
display(spark.sql("SELECT * FROM loans_delta LIMIT 5"))
 
loan_id
funded_amnt
paid_amnt
addr_state
1
2
3
4
5
0
1000
182.22
CA
1
1000
361.19
WA
2
1000
176.26
TX
3
1000
1000
OK
4
1000
249.98
PA

Showing all 5 rows.

Delta Lake Tiny Logo Loading data streams into Delta Lake table

We will generate a stream of data from with randomly generated loan ids and amounts. In addition, we are going to define a few useful utility functions.

import random
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *
 
def random_checkpoint_dir(): 
  return "/tmp/chkpt/%s" % str(random.randint(0, 10000))
 
 
# User-defined function to generate random state
 
states = ["CA", "TX", "NY", "WA"]
 
@udf(returnType=StringType())
def random_state():
  return str(random.choice(states))
 
 
# Function to start a streaming query with a stream of randomly generated data and append to the parquet table
def generate_and_append_data_stream():
 
  newLoanStreamDF = (spark.readStream.format("rate").option("rowsPerSecond", 5).load() 
    .withColumn("loan_id", 10000 + col("value")) 
    .withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer")) 
    .withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000)) 
    .withColumn("addr_state", random_state())
    .select("loan_id", "funded_amnt", "paid_amnt", "addr_state"))
    
  checkpointDir = random_checkpoint_dir()
 
  streamingQuery = (newLoanStreamDF.writeStream 
    .format("delta") 
    .option("checkpointLocation", random_checkpoint_dir()) 
    .trigger(processingTime = "10 seconds") 
    .start(deltaPath))
 
  return streamingQuery
 
# Function to stop all streaming queries 
def stop_all_streams():
  # Stop all the streams
  print("Stopping all streams")
  for s in spark.streams.active:
    s.stop()
  print("Stopped all streams")
  print("Deleting checkpoints")  
  dbutils.fs.rm("/tmp/chkpt/", True)
  print("Deleted checkpoints")
streamingQuery = generate_and_append_data_stream()
5733fd9f-3ef1-43f6-ac80-09ad8b0b415e
Last updated: 1565 days ago

You can see that the streaming query is adding data to the table by counting the number of records in the table. Run the following cell multiple times.

display(spark.sql("SELECT count(*) FROM loans_delta"))
 
count(1)
1
14950

Showing all 1 rows.

Remember to stop all the streaming queries.

stop_all_streams()
Stopping all streams Stopped all streams Deleting checkpoints Deleted checkpoints

Delta Lake Tiny Logo Enforcing schema on write to prevent data corruption

Let’s test this by trying to write some data with an additional column closed that signifies whether the loan has been terminated. Note that this column does not exist in the table.

cols = ['loan_id', 'funded_amnt', 'paid_amnt', 'addr_state', 'closed']
 
items = [
  (1111111, 1000, 1000.0, 'TX', True), 
  (2222222, 2000, 0.0, 'CA', False)
]
 
from pyspark.sql.functions import *
 
loanUpdates = (spark
                .createDataFrame(items, cols)
                .withColumn("funded_amnt", col("funded_amnt").cast("int")))