// Tables
val usersTablePath = "dbfs:/tables/test.users/"
val loginsTablePath = "dbfs:/tables/test.logins/"
val reconciliationTablePath = "dbfs:/tables/test.reconciliation/"
val streamingErrorsTablePath = "dbfs:/tables/test.streaming_pipeline_errors/"
// Checkpoints
val loginsCheckpoint = "dbfs:/checkpoint/test.logins/"
// Max number of retries for data written in error table
val maxRetryAttempts = 3
// Update status in error table
val STATUS_RESOLVED = "RESOLVED"
val STATUS_NOT_RESOLVED = "NOT_RESOLVED"
val STATUS_DEAD_LETTER = "DEAD"
usersTablePath: String = dbfs:/tables/test.users/
loginsTablePath: String = dbfs:/tables/test.logins/
reconciliationTablePath: String = dbfs:/tables/test.reconciliation/
streamingErrorsTablePath: String = dbfs:/tables/test.streaming_pipeline_errors/
loginsCheckpoint: String = dbfs:/checkpoint/test.logins/
maxRetryAttempts: Int = 3
STATUS_RESOLVED: String = RESOLVED
STATUS_NOT_RESOLVED: String = NOT_RESOLVED
STATUS_DEAD_LETTER: String = DEAD
spark.sql( "DROP TABLE IF EXISTS test.logins" )
spark.sql( "DROP TABLE IF EXISTS test.streaming_pipeline_errors" )
spark.sql( "DROP TABLE IF EXISTS test.reconciliation" )
dbutils.fs.rm(usersTablePath, true)
dbutils.fs.rm(loginsTablePath, true)
dbutils.fs.rm(reconciliationTablePath, true)
dbutils.fs.rm(streamingErrorsTablePath, true)
dbutils.fs.rm(loginsCheckpoint, true)
res0: Boolean = true
// Create database
spark.sql( s"CREATE DATABASE IF NOT EXISTS test")
// Target logins table
spark.sql( s"""
CREATE TABLE IF NOT EXISTS test.logins(
user_id BigInt,
timestamp Timestamp,
date Date,
first_name String,
last_name String,
login_count Int,
audit_created_datetime Timestamp,
audit_modified_datetime Timestamp
) USING DELTA LOCATION '${loginsTablePath}'
""")
// Reconciliation Table
spark.sql( s"""
CREATE TABLE IF NOT EXISTS test.reconciliation(
raw String,
event_id String,
pipeline_name String,
status String,
retry_count Int,
error_created_datetime Timestamp,
audit_created_datetime Timestamp,
audit_modified_datetime Timestamp
) USING DELTA LOCATION '${reconciliationTablePath}'
""")
res1: org.apache.spark.sql.DataFrame = []
val nextId = { var i = 0; () => { i += 1; i } }
Seq.fill(200)(
( nextId(), util.Random.alphanumeric.take(10).mkString(""), util.Random.alphanumeric.take(10).mkString("") )
)
.toDF( "user_id", "first_name", "last_name")
.write
.format("delta")
.mode("overwrite")
.option("path", usersTablePath)
.saveAsTable("test.users")
val usersDF = spark.read
.format("delta")
.load(usersTablePath)
display(usersDF)
// Read reconcile data that have status = "NOT_RESOLVED" and have not exausted number of retries.
// Note that this data is updated via batch process that runs periodically.
// As soon as new data is written to reconciliation table, it will be processed here.
val retryableData = spark.readStream
.format("delta")
.option( "ignoreChanges", true )
.table( "test.reconciliation" )
.filter( $"status" === STATUS_NOT_RESOLVED && $"retry_count" <= maxRetryAttempts )
.select(
get_json_object($"raw", "$.user_id").as("user_id"),
get_json_object($"raw", "$.event_id").as("event_id"),
get_json_object($"raw", "$.timestamp").as("timestamp"),
lit(true).as("is_retry_event"),
$"pipeline_name"
)
retryableData: org.apache.spark.sql.DataFrame = [user_id: string, event_id: string ... 3 more fields]
// Lets use rate stream to generate synthetic login events
// These events contains `user_id` key to `users` dimension.
// This synthetic data may generate `user_id`s that are not in users table (yet)
// We can treat such events as early arriving facts.
val loginsData = spark
.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load
.withColumnRenamed( "value", "event_id")
.withColumn( "user_id", ( floor( rand() * 1000 ) % 225 ) + 1 ) // Random between 1 and 225
.withColumn( "is_retry_event", lit(false) )
.withColumn( "pipeline_name", lit("logins-stream") )
// UNION this with retryable records
.unionByName( retryableData )
loginsData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [timestamp: string, event_id: string ... 3 more fields]
// Join facts with users dimension
// Mark the record as 'retryable' is corresponding dimension has not arrived yet.
val joinedDF = loginsData
.join( usersDF, Seq("user_id"), "left" )
.withColumn( "is_retryable", $"first_name".isNull && $"last_name".isNull )
.withColumn("date", to_date($"timestamp") )
joinedDF: org.apache.spark.sql.DataFrame = [user_id: string, timestamp: string ... 7 more fields]
joinedDF
// 1. Write retryable records to `streaming_pipeline_errors` in append mode
// 2. MERGE good data into the target table and update the login count.
.writeStream
.trigger( Trigger.ProcessingTime( "10 seconds" ) )
.option( "checkpointLocation", loginsCheckpoint )
.foreachBatch( (ds: DataFrame, batchId: Long) => {
// ------------------------------------------------
// Part:1 - Write retryable data OR update status
// of retry events to orchestration
// ------------------------------------------------
// Transform retryable data to write into `streaming_pipeline_errors` table
// Schema of this table should be generic enough so that you can write
// data from multiple pipelines here
val invalidOrReconciledRecords = ds
.filter( $"is_retryable" || $"is_retry_event" )
.select(
$"event_id",
$"timestamp",
to_json( struct( $"event_id", $"user_id", $"timestamp" ) ).as("raw"),
current_timestamp().as("audit_created_datetime"),
$"pipeline_name",
when( $"is_retryable", STATUS_NOT_RESOLVED ).otherwise( STATUS_RESOLVED ).as("status")
)
.write
.format("delta")
.mode("append")
.option("path", streamingErrorsTablePath)
.saveAsTable("test.streaming_pipeline_errors")
// ----------------------------------------------
// Part:2 - Write good data to delta table
// -----------------------------------------------
// Dedup on the MERGE keys
val goodData = ds.filter( not( $"is_retryable" ) )
.drop( "is_retryable", "is_retry_event" )
.groupBy( $"user_id", $"date" )
.agg( max( $"timestamp").as("timestamp"), count($"*").as("login_count"), first($"first_name").as("first_name"), first("last_name").as("last_name") )
.withColumn( "audit_created_datetime", current_timestamp() )
.withColumn( "audit_modified_datetime", current_timestamp() )
// Update the daily login count
DeltaTable
.forName("test.logins").as("t")
.merge(goodData.as("u"), "u.user_id = t.user_id AND u.date = t.date" )
.whenMatched("t.timestamp <= u.timestamp")
.updateExpr(Map( "login_count" -> "t.login_count + 1", "audit_modified_datetime" -> "current_timestamp()"))
.whenNotMatched
.insertAll()
.execute()
})
.start()
160bf728-c445-41ca-9485-28c652d7e58b
Last updated: 1628 days ago
res9: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5d5e217f
def runReconciliation(){
// Get the start datetime
// Use this to query all the new records written to `streaming_pipeline_errors` after this timestamp.
val prevMaxDatetime = spark.sql( "SELECT coalesce( max(error_created_datetime), '1970-01-01T00:00:00.001+0000' ) FROM test.reconciliation" ).head.getString(0)
// Get the new error events written to `streaming_pipeline_errors`
// Use `audit_created_datetime` from `streaming_pipeline_errors` table as `error_created_datetime`
// Dedup before MERGE
val newErrors = spark.read
.table( "test.streaming_pipeline_errors" )
.filter( $"audit_created_datetime" > prevMaxDatetime )
// Dedup
.groupBy( $"pipeline_name", $"event_id" )
.agg( max(struct($"audit_created_datetime", $"event_id", $"pipeline_name", $"status", $"raw")).as("latest") )
.select( $"latest.*" )
// Set defaults for the merge
.withColumnRenamed( "audit_created_datetime", "error_created_datetime" )
.withColumn( "audit_created_datetime", current_timestamp() ) // Default for MERGE (Used in INSERT, if no match)
.withColumn( "audit_modified_datetime", current_timestamp() ) // Default for MERGE (Used in INSERT, if no match)
.withColumn( "retry_count", lit(0) ) // Default for MERGE (Used in INSERT, if no match)
// This condition is used to update `status` column during the MERGE
// 1. If status is NOT_RESOLVED, check if it has exausted the retry count.
// 2. If exhausted retry count, mark it as DEAD_LETTER.
// 3. Mark it as RESOLVED otherwise.
// You can configure the `maxRetryAttempts` for different pipelines and have this job read that config.
val statusUpdateSql = s"""
CASE WHEN u.status = '${STATUS_NOT_RESOLVED}' THEN
CASE WHEN t.retry_count >= ${maxRetryAttempts}
THEN '${STATUS_DEAD_LETTER}' ELSE '${STATUS_NOT_RESOLVED}' END
ELSE '${STATUS_RESOLVED}' END
"""
// Update the daily login count
DeltaTable
.forName("test.reconciliation").as("t")
.merge(newErrors.as("u"), "u.pipeline_name = t.pipeline_name AND u.event_id = t.event_id" )
.whenMatched("t.error_created_datetime <= u.error_created_datetime")
.updateExpr(Map(
"status" -> statusUpdateSql,
"audit_modified_datetime" -> "current_timestamp()",
"retry_count" -> s"CASE WHEN u.status = '${STATUS_RESOLVED}' THEN -1 ELSE t.retry_count + 1 END"
))
.whenNotMatched
.insertAll()
.execute()
}
runReconciliation: ()Unit