reconcile-test(Scala)
Loading...
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming._
import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions._ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.streaming._ import io.delta.tables.DeltaTable
// Tables 
val usersTablePath = "dbfs:/tables/test.users/"
val loginsTablePath = "dbfs:/tables/test.logins/"
val reconciliationTablePath = "dbfs:/tables/test.reconciliation/"
val streamingErrorsTablePath = "dbfs:/tables/test.streaming_pipeline_errors/"
 
// Checkpoints
val loginsCheckpoint = "dbfs:/checkpoint/test.logins/"
 
// Max number of retries for data written in error table
val maxRetryAttempts = 3
 
// Update status in error table
val STATUS_RESOLVED = "RESOLVED"
val STATUS_NOT_RESOLVED = "NOT_RESOLVED"
val STATUS_DEAD_LETTER = "DEAD"
usersTablePath: String = dbfs:/tables/test.users/ loginsTablePath: String = dbfs:/tables/test.logins/ reconciliationTablePath: String = dbfs:/tables/test.reconciliation/ streamingErrorsTablePath: String = dbfs:/tables/test.streaming_pipeline_errors/ loginsCheckpoint: String = dbfs:/checkpoint/test.logins/ maxRetryAttempts: Int = 3 STATUS_RESOLVED: String = RESOLVED STATUS_NOT_RESOLVED: String = NOT_RESOLVED STATUS_DEAD_LETTER: String = DEAD
spark.sql( "DROP TABLE IF EXISTS test.logins" )
spark.sql( "DROP TABLE IF EXISTS test.streaming_pipeline_errors" )
spark.sql( "DROP TABLE IF EXISTS test.reconciliation" )
 
dbutils.fs.rm(usersTablePath, true)
dbutils.fs.rm(loginsTablePath, true)
dbutils.fs.rm(reconciliationTablePath, true)
dbutils.fs.rm(streamingErrorsTablePath, true)
dbutils.fs.rm(loginsCheckpoint, true)
res0: Boolean = true
// Create database
spark.sql( s"CREATE DATABASE IF NOT EXISTS test")
 
// Target logins table
spark.sql( s"""
  CREATE TABLE IF NOT EXISTS test.logins(
    user_id BigInt,
    timestamp Timestamp,
    date Date,
    first_name String,
    last_name String,
    login_count Int,
    audit_created_datetime Timestamp,
    audit_modified_datetime Timestamp
  ) USING DELTA LOCATION '${loginsTablePath}'
""")
 
// Reconciliation Table
spark.sql( s"""
  CREATE TABLE IF NOT EXISTS test.reconciliation(
    raw String,
    event_id String,
    pipeline_name String,
    status String,
    retry_count Int,
    error_created_datetime Timestamp,
    audit_created_datetime Timestamp,
    audit_modified_datetime Timestamp
  ) USING DELTA LOCATION '${reconciliationTablePath}'
""")
res1: org.apache.spark.sql.DataFrame = []

Part-1: Logins pipeline

This pipeline processes login events in near real-time and calculate daily logins per user. This pipeline uses reconciliation pattern to handle late arriving dimension (users).


This is how the flow looks like:

  1. Read new login events from Kafka.
  2. Read retryable events from reconciliation table.
  3. Union kafka and retryable events.
  4. Transform/Join this data with users dimension table.
  5. If there's any early arriving fact (dimension have not arrived yet), mark it as retryable.
  6. Write retryable records to streaming_pipeline_errors table.
  7. MERGE all the good records into target delta table.
val nextId = { var i = 0; () => { i += 1; i } }
Seq.fill(200)(
  ( nextId(), util.Random.alphanumeric.take(10).mkString(""), util.Random.alphanumeric.take(10).mkString("") )
)
.toDF( "user_id", "first_name", "last_name")
.write
  .format("delta")
  .mode("overwrite")
  .option("path", usersTablePath)
  .saveAsTable("test.users")
 
val usersDF = spark.read
  .format("delta")
  .load(usersTablePath)
 
display(usersDF)
 
user_id
first_name
last_name
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
51
6XYVkSqva2
VCzPjjPqEl
52
lYyJd2m8GJ
XQJyyoHIte
53
LhXkSyp0FS
ZhJBzdQqHX
54
E3onReUvQR
dRjf1w0u6Q
55
HCQtLXlzHq
MZU2L1AhC7
56
aymHrFMh8P
kvPpRugptm
57
I9UKsVpNe0
3bnyPPDqEj
58
VCvwVDUCT5
SLqTbqfHpu
59
vZ1FGYfAfw
FDfCgEZy1B
60
Y1O2EKcCCC
dujfcMDMe4
61
XEO3ZhzQgb
ukCGhYf9zQ
62
gS82ptZ28E
DZbINN2mKz
63
i8prcPQ9Wy
uRCCXkviiv
64
EDB3cWsy3N
8kT0DHjOWX
65
pRyRqBEwpj
HDDg4QYZcc
66
BcgGDslGqQ
aRZnR91dOv
67
X3V8FBb59p
Redd0D6MBa
68
XVDns1XnyM
in7lCmbufL

Showing all 200 rows.

// Read reconcile data that have status = "NOT_RESOLVED" and have not exausted number of retries. 
// Note that this data is updated via batch process that runs periodically.
// As soon as new data is written to reconciliation table, it will be processed here. 
val retryableData = spark.readStream
  .format("delta")
  .option( "ignoreChanges", true )
  .table( "test.reconciliation" )
  .filter( $"status" === STATUS_NOT_RESOLVED && $"retry_count" <= maxRetryAttempts  )
  .select( 
    get_json_object($"raw", "$.user_id").as("user_id"),
    get_json_object($"raw", "$.event_id").as("event_id"),
    get_json_object($"raw", "$.timestamp").as("timestamp"),
    lit(true).as("is_retry_event"),
    $"pipeline_name"
  )
retryableData: org.apache.spark.sql.DataFrame = [user_id: string, event_id: string ... 3 more fields]
// Lets use rate stream to generate synthetic login events
// These events contains `user_id` key to `users` dimension. 
// This synthetic data may generate `user_id`s that are not in users table (yet)
// We can treat such events as early arriving facts. 
val loginsData = spark
  .readStream
  .format("rate")
  .option("rowsPerSecond", 1)
  .load
  .withColumnRenamed( "value", "event_id")
  .withColumn( "user_id", ( floor( rand() * 1000 ) % 225 ) + 1 )  // Random between 1 and 225
  .withColumn( "is_retry_event", lit(false) )
  .withColumn( "pipeline_name", lit("logins-stream") )
  
  // UNION this with retryable records
  .unionByName( retryableData )
loginsData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [timestamp: string, event_id: string ... 3 more fields]
display(loginsData)
display_query_22(id: 199a6dcd-f7cb-473a-88a1-8d4de44f6fa4)
Last updated: 1628 days ago
 
timestamp
event_id
user_id
is_retry_event
pipeline_name
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2020-12-03 09:11:15.425
0
99
false
logins-stream
2020-12-03 09:11:16.425
1
155
false
logins-stream
2020-12-03 09:11:17.425
2
175
false
logins-stream
2020-12-03 09:11:18.425
3
23
false
logins-stream
2020-12-03 09:11:19.425
4
9
false
logins-stream
2020-12-03 09:11:20.425
5
107
false
logins-stream
2020-12-03 09:11:21.425
6
104
false
logins-stream
2020-12-03 09:11:22.425
7
212
false
logins-stream
2020-12-03 09:11:23.425
8
156
false
logins-stream
2020-12-03 09:11:24.425
9
48
false
logins-stream
2020-12-03 09:11:25.425
10
170
false
logins-stream
2020-12-03 09:11:26.425
11
15
false
logins-stream
2020-12-03 09:11:27.425
12
85
false
logins-stream
2020-12-03 09:11:28.425
13
192
false
logins-stream
2020-12-03 09:11:29.425
14
54
false
logins-stream
2020-12-03 09:11:30.425
15
90
false
logins-stream
2020-12-03 09:11:31.425
16
171
false
logins-stream
2020-12-03 09:11:32.425
17
25
false
logins-stream

Showing the first 1000 rows.

// Join facts with users dimension 
// Mark the record as 'retryable' is corresponding dimension has not arrived yet.
val joinedDF = loginsData
  .join( usersDF, Seq("user_id"), "left" )
  .withColumn( "is_retryable", $"first_name".isNull && $"last_name".isNull )
  .withColumn("date", to_date($"timestamp") )
joinedDF: org.apache.spark.sql.DataFrame = [user_id: string, timestamp: string ... 7 more fields]
joinedDF
 
  // 1. Write retryable records to `streaming_pipeline_errors` in append mode
  // 2. MERGE good data into the target table and update the login count.
  .writeStream
  .trigger( Trigger.ProcessingTime( "10 seconds" ) )
  .option( "checkpointLocation", loginsCheckpoint )
  .foreachBatch( (ds: DataFrame, batchId: Long) => {
    
    // ------------------------------------------------
    // Part:1 - Write retryable data OR update status 
    //          of retry events to orchestration
    // ------------------------------------------------
    
    // Transform retryable data to write into `streaming_pipeline_errors` table
    // Schema of this table should be generic enough so that you can write 
    // data from multiple pipelines here
    val invalidOrReconciledRecords = ds
      .filter( $"is_retryable" || $"is_retry_event" )
      .select(
        $"event_id",
        $"timestamp",
        to_json( struct( $"event_id", $"user_id", $"timestamp" ) ).as("raw"),
        current_timestamp().as("audit_created_datetime"),
        $"pipeline_name",
        when( $"is_retryable", STATUS_NOT_RESOLVED ).otherwise( STATUS_RESOLVED ).as("status")
      )
      .write
      .format("delta")
      .mode("append")
      .option("path", streamingErrorsTablePath)
      .saveAsTable("test.streaming_pipeline_errors")
      
    // ----------------------------------------------
    // Part:2 - Write good data to delta table
    // -----------------------------------------------
    
    // Dedup on the MERGE keys
    val goodData = ds.filter( not( $"is_retryable" ) )
      .drop( "is_retryable", "is_retry_event" )
      .groupBy( $"user_id", $"date" )
      .agg( max( $"timestamp").as("timestamp"), count($"*").as("login_count"), first($"first_name").as("first_name"), first("last_name").as("last_name") )
      .withColumn( "audit_created_datetime", current_timestamp() )
      .withColumn( "audit_modified_datetime", current_timestamp() )
    
    // Update the daily login count
    DeltaTable
      .forName("test.logins").as("t")
      .merge(goodData.as("u"), "u.user_id = t.user_id AND u.date = t.date" )
      .whenMatched("t.timestamp <= u.timestamp")
      .updateExpr(Map( "login_count" -> "t.login_count + 1", "audit_modified_datetime" -> "current_timestamp()"))
      .whenNotMatched
      .insertAll()
      .execute()
    
  })
  .start()
 
160bf728-c445-41ca-9485-28c652d7e58b
Last updated: 1628 days ago
res9: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5d5e217f
%sql SELECT * FROM test.logins
 
user_id
timestamp
date
first_name
last_name
login_count
audit_created_datetime
audit_modified_datetime
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1
2020-12-03T09:18:57.630+0000
2020-12-03
Yu0BXZeOjC
F0CgWsaGxG
3
2020-12-03T09:19:15.178+0000
2020-12-03T09:21:24.912+0000
2
2020-12-03T09:15:27.630+0000
2020-12-03
otlpYBsk0G
6Xo8l8QKQ3
5
2020-12-03T09:15:59.749+0000
2020-12-03T09:25:52.590+0000
3
2020-12-03T09:20:41.630+0000
2020-12-03
YUwt5BdUa3
wPZuZeFkpn
4
2020-12-03T09:21:01.279+0000
2020-12-03T09:24:57.887+0000
4
2020-12-03T09:21:53.630+0000
2020-12-03
ZnZa9ogJog
6woutqECHH
2
2020-12-03T09:22:21.149+0000
2020-12-03T09:24:30.981+0000
5
2020-12-03T09:13:00.630+0000
2020-12-03
MeKuyoQzO6
XVUjkYGm8p
4
2020-12-03T09:13:46.042+0000
2020-12-03T09:19:36.370+0000
6
2020-12-03T09:13:01.630+0000
2020-12-03
0VDH18tYto
COLuHvIEc8
4
2020-12-03T09:13:46.042+0000
2020-12-03T09:22:42.791+0000
7
2020-12-03T09:14:22.630+0000
2020-12-03
LPzQp40e1z
wKTSEVvAOZ
7
2020-12-03T09:14:38.260+0000
2020-12-03T09:23:57.487+0000
8
2020-12-03T09:15:30.630+0000
2020-12-03
TZzQUflUZX
DEERB2N8HW
2
2020-12-03T09:15:59.749+0000
2020-12-03T09:21:51.481+0000
9
2020-12-03T09:14:08.630+0000
2020-12-03
SpXxyClcsh
KjboScU4Ot
1
2020-12-03T09:14:38.260+0000
2020-12-03T09:14:38.260+0000
10
2020-12-03T09:20:55.630+0000
2020-12-03
ar5ZFP4Qss
NN5v3PaafR
2
2020-12-03T09:21:30.176+0000
2020-12-03T09:23:57.487+0000
11
2020-12-03T09:11:39.630+0000
2020-12-03
Ll0oqzrJYx
DzjVCZhgUU
6
2020-12-03T09:11:56.372+0000
2020-12-03T09:25:28.825+0000
12
2020-12-03T09:15:18.630+0000
2020-12-03
hkXFWmXSCC
PaSCibNvDw
3
2020-12-03T09:15:59.749+0000
2020-12-03T09:24:30.981+0000
13
2020-12-03T09:16:07.630+0000
2020-12-03
geBUarH0qW
JXDwVdWg6H
4
2020-12-03T09:16:27.176+0000
2020-12-03T09:25:28.825+0000
14
2020-12-03T09:16:18.630+0000
2020-12-03
vo2CHzAIEV
pStKnFpP4M
3
2020-12-03T09:16:49.145+0000
2020-12-03T09:23:12.167+0000
15
2020-12-03T09:11:41.630+0000
2020-12-03
CXTzhAn1RR
qexHMjOBPY
7
2020-12-03T09:11:56.372+0000
2020-12-03T09:25:28.825+0000
16
2020-12-03T09:13:17.630+0000
2020-12-03
qVnClFPdgf
hCvc8MbBDs
4
2020-12-03T09:13:46.042+0000
2020-12-03T09:24:30.981+0000
17
2020-12-03T09:12:33.630+0000
2020-12-03
nf9nHIxAFW
zFe1W2c2dm
5
2020-12-03T09:12:51.805+0000
2020-12-03T09:25:52.590+0000
18
2020-12-03T09:13:39.630+0000
2020-12-03
WvDMnHVsge
4ZeZl1qaBW
6
2020-12-03T09:14:14.398+0000
2020-12-03T09:24:57.887+0000

Showing all 200 rows.

%sql SELECT * FROM test.streaming_pipeline_errors
 
event_id
timestamp
raw
audit_created_datetime
pipeline_name
status
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
0
2020-12-03 09:11:17.63
{"event_id":"0","user_id":"221","timestamp":"2020-12-03 09:11:17.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
136
2020-12-03 09:13:33.63
{"event_id":"136","user_id":"214","timestamp":"2020-12-03 09:13:33.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
140
2020-12-03 09:13:37.63
{"event_id":"140","user_id":"210","timestamp":"2020-12-03 09:13:37.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
143
2020-12-03 09:13:40.63
{"event_id":"143","user_id":"204","timestamp":"2020-12-03 09:13:40.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
152
2020-12-03 09:13:49.63
{"event_id":"152","user_id":"215","timestamp":"2020-12-03 09:13:49.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
153
2020-12-03 09:13:50.63
{"event_id":"153","user_id":"209","timestamp":"2020-12-03 09:13:50.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
161
2020-12-03 09:13:58.63
{"event_id":"161","user_id":"201","timestamp":"2020-12-03 09:13:58.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
179
2020-12-03 09:14:16.63
{"event_id":"179","user_id":"207","timestamp":"2020-12-03 09:14:16.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
18
2020-12-03 09:11:35.63
{"event_id":"18","user_id":"217","timestamp":"2020-12-03 09:11:35.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
20
2020-12-03 09:11:37.63
{"event_id":"20","user_id":"206","timestamp":"2020-12-03 09:11:37.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
201
2020-12-03 09:14:38.63
{"event_id":"201","user_id":"212","timestamp":"2020-12-03 09:14:38.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
207
2020-12-03 09:14:44.63
{"event_id":"207","user_id":"220","timestamp":"2020-12-03 09:14:44.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
210
2020-12-03 09:14:47.63
{"event_id":"210","user_id":"215","timestamp":"2020-12-03 09:14:47.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
214
2020-12-03 09:14:51.63
{"event_id":"214","user_id":"210","timestamp":"2020-12-03 09:14:51.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
231
2020-12-03 09:15:08.63
{"event_id":"231","user_id":"211","timestamp":"2020-12-03 09:15:08.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
259
2020-12-03 09:15:36.63
{"event_id":"259","user_id":"206","timestamp":"2020-12-03 09:15:36.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
270
2020-12-03 09:15:47.63
{"event_id":"270","user_id":"223","timestamp":"2020-12-03 09:15:47.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED
283
2020-12-03 09:16:00.63
{"event_id":"283","user_id":"212","timestamp":"2020-12-03 09:16:00.63"}
2020-12-03T09:18:09.629+0000
logins-stream
NOT_RESOLVED

Showing all 103 rows.

Part-2: Reconciliation Job

This will be a separate job that runs periodically.
It takes new data written to streaming_pipeline_errors table, dedup it, and write it to reconciliation table from where it is injected back into the stream for re-processing.

You can pack more functionality like alerting and monitoring in this job.

def runReconciliation(){
  
  // Get the start datetime
  // Use this to query all the new records written to `streaming_pipeline_errors` after this timestamp.
  val prevMaxDatetime = spark.sql( "SELECT coalesce( max(error_created_datetime), '1970-01-01T00:00:00.001+0000' ) FROM test.reconciliation" ).head.getString(0)
  
  // Get the new error events written to `streaming_pipeline_errors` 
  // Use `audit_created_datetime` from  `streaming_pipeline_errors` table as `error_created_datetime`
  // Dedup before MERGE
  val newErrors = spark.read
    .table( "test.streaming_pipeline_errors" )
    .filter( $"audit_created_datetime" > prevMaxDatetime )
    
    // Dedup
    .groupBy( $"pipeline_name", $"event_id" )
    .agg( max(struct($"audit_created_datetime", $"event_id", $"pipeline_name", $"status", $"raw")).as("latest") )
    .select( $"latest.*" )
    
    // Set defaults for the merge
    .withColumnRenamed( "audit_created_datetime", "error_created_datetime" )
    .withColumn( "audit_created_datetime", current_timestamp() )   // Default for MERGE (Used in INSERT, if no match)
    .withColumn( "audit_modified_datetime", current_timestamp() )  // Default for MERGE (Used in INSERT, if no match)
    .withColumn( "retry_count", lit(0) )                           // Default for MERGE (Used in INSERT, if no match)
  
  // This condition is used to update `status` column during the MERGE
  // 1. If status is NOT_RESOLVED, check if it has exausted the retry count.
  // 2. If exhausted retry count, mark it as DEAD_LETTER.
  // 3. Mark it as RESOLVED otherwise.
  // You can configure the `maxRetryAttempts` for different pipelines and have this job read that config.
  val statusUpdateSql = s"""
    CASE WHEN u.status = '${STATUS_NOT_RESOLVED}' THEN
        CASE WHEN t.retry_count >= ${maxRetryAttempts}
          THEN '${STATUS_DEAD_LETTER}' ELSE '${STATUS_NOT_RESOLVED}' END
      ELSE '${STATUS_RESOLVED}' END
  """
  
  // Update the daily login count
  DeltaTable
    .forName("test.reconciliation").as("t")
    .merge(newErrors.as("u"), "u.pipeline_name = t.pipeline_name AND u.event_id = t.event_id" )
    .whenMatched("t.error_created_datetime <= u.error_created_datetime")
    .updateExpr(Map(
      "status" -> statusUpdateSql,
      "audit_modified_datetime" -> "current_timestamp()",
      "retry_count" -> s"CASE WHEN u.status = '${STATUS_RESOLVED}' THEN -1 ELSE t.retry_count + 1 END"
    ))
    .whenNotMatched
    .insertAll()
    .execute()
  
}
runReconciliation: ()Unit