spark.sql("set spark.sql.shuffle.partitions = 1")
sourcePath = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
# Configure Delta Lake Path
deltaPath = "/tmp/loans_delta"
# Remove folder if it exists
dbutils.fs.rm(deltaPath, recurse=True)
# Create the Delta table with the same loans data
(spark.read.format("parquet").load(sourcePath)
.write.format("delta").save(deltaPath))
spark.read.format("delta").load(deltaPath).createOrReplaceTempView("loans_delta")
print("Defined view 'loans_delta'")
Defined view 'loans_delta'
import random
import os
from pyspark.sql.functions import *
from pyspark.sql.types import *
def random_checkpoint_dir():
return "/tmp/chkpt/%s" % str(random.randint(0, 10000))
# User-defined function to generate random state
states = ["CA", "TX", "NY", "WA"]
(returnType=StringType())
def random_state():
return str(random.choice(states))
# Function to start a streaming query with a stream of randomly generated data and append to the parquet table
def generate_and_append_data_stream():
newLoanStreamDF = (spark.readStream.format("rate").option("rowsPerSecond", 5).load()
.withColumn("loan_id", 10000 + col("value"))
.withColumn("funded_amnt", (rand() * 5000 + 5000).cast("integer"))
.withColumn("paid_amnt", col("funded_amnt") - (rand() * 2000))
.withColumn("addr_state", random_state())
.select("loan_id", "funded_amnt", "paid_amnt", "addr_state"))
checkpointDir = random_checkpoint_dir()
streamingQuery = (newLoanStreamDF.writeStream
.format("delta")
.option("checkpointLocation", random_checkpoint_dir())
.trigger(processingTime = "10 seconds")
.start(deltaPath))
return streamingQuery
# Function to stop all streaming queries
def stop_all_streams():
# Stop all the streams
print("Stopping all streams")
for s in spark.streams.active:
s.stop()
print("Stopped all streams")
print("Deleting checkpoints")
dbutils.fs.rm("/tmp/chkpt/", True)
print("Deleted checkpoints")
Building Reliable Data Lakes with Delta Lake and Apache Spark™
Delta Lake: An open-source storage format that brings ACID transactions to Apache Spark™ and big data workloads. This is a quick 101 introduction some of Delta Lake features.
Setup Instructions
Source
This notebook is a modified version of the SAIS EU 2019 Delta Lake Tutorial. The data used is a modified version of the public data from Lending Club. It includes all funded loans from 2012 to 2017. Each loan includes applicant information provided by the applicant as well as the current loan status (Current, Late, Fully Paid, etc.) and latest payment information. For a full view of the data please view the data dictionary available here.