Quando si creano pipeline in tempo reale, una delle realtà con cui i team devono fare i conti è che l'ingestione distribuita dei dati è intrinsecamente disordinata. Inoltre, nel contesto delle operazioni di streaming stateful, i team devono essere in grado di monitorare correttamente l'avanzamento dell'event time nello stream di dati che stanno ingerendo per il corretto calcolo delle aggregazioni a finestra temporale e altre operazioni stateful. Possiamo risolvere tutto questo utilizzando Structured Streaming.
Ad esempio, supponiamo di far parte di un team che sta creando una pipeline per aiutare la nostra azienda a effettuare la manutenzione proattiva delle nostre macchine minerarie che noleggiamo ai nostri clienti. Queste macchine devono essere sempre in perfette condizioni, quindi le monitoriamo in tempo reale. Dovremo eseguire aggregazioni stateful sui dati in streaming per comprendere e identificare i problemi nelle macchine.
È qui che dobbiamo sfruttare Structured Streaming e il Watermarking per produrre le necessarie aggregazioni stateful che aiuteranno a informare le decisioni relative alla manutenzione predittiva e altro ancora per queste macchine.
In generale, quando si lavora con dati di streaming in tempo reale, ci saranno ritardi tra l'event time e il processing time a causa del modo in cui i dati vengono ingeriti e se l'applicazione generale riscontra problemi come tempi di inattività. A causa di questi potenziali ritardi variabili, il motore utilizzato per elaborare questi dati deve avere un meccanismo per decidere quando chiudere le finestre aggregate e produrre il risultato aggregato.
Sebbene l'istinto naturale per risolvere questi problemi possa essere quello di utilizzare un ritardo fisso basato sull'orologio, mostreremo nel prossimo esempio perché questa non è la soluzione migliore.
Per spiegare questo visivamente, prendiamo uno scenario in cui stiamo ricevendo dati in tempi diversi, dalle 10:50 alle 11:20. Stiamo creando finestre a cascata di 10 minuti che calcolano la media delle letture di temperatura e pressione arrivate durante il periodo di finestra.
In questa prima immagine, abbiamo le finestre a cascata che si attivano alle 11:00, 11:10 e 11:20, portando alle tabelle dei risultati mostrate nei rispettivi orari. Quando il secondo batch di dati arriva intorno alle 11:10 con dati che hanno un event time di 10:53, questo viene incorporato nelle medie di temperatura e pressione calcolate per la finestra dalle 11:00 alle 11:10 che si chiude alle 11:10, il che non fornisce il risultato corretto.

Per garantire di ottenere i risultati corretti per gli aggregati che vogliamo produrre, dobbiamo definire un watermark che permetta a Spark di capire quando chiudere la finestra aggregata e produrre il risultato aggregato corretto.
Nelle applicazioni Structured Streaming, possiamo garantire che tutti i dati rilevanti per le aggregazioni che vogliamo calcolare vengano raccolti utilizzando una funzionalità chiamata watermarking. Nel senso più elementare, definendo un watermark, Spark Structured Streaming sa quando ha ingerito tutti i dati fino a un certo tempo, T, (basato su un'aspettativa di lateness impostata) in modo che possa chiudere e produrre aggregati a finestra fino al timestamp T.
Questa seconda immagine mostra l'effetto dell'implementazione di un watermark di 10 minuti e l'utilizzo della modalità Append in Spark Structured Streaming.

A differenza del primo scenario in cui Spark emette l'aggregazione a finestra per i dieci minuti precedenti ogni dieci minuti (cioè, emette la finestra dalle 11:00 alle 11:10 alle 11:10), Spark ora attende per chiudere e produrre l'aggregazione a finestra una volta che il tempo massimo dell'evento osservato meno il watermark specificato è maggiore del limite superiore della finestra.
In altre parole, Spark ha dovuto attendere di vedere punti dati in cui l'ultimo event time osservato meno 10 minuti era maggiore delle 11:00 per emettere la finestra aggregata dalle 10:50 alle 11:00. Alle 11:00, non lo vede, quindi inizializza solo il calcolo aggregato nello store di stato interno di Spark. Alle 11:10, questa condizione non è ancora soddisfatta, ma abbiamo un nuovo punto dati per le 10:53, quindi lo stato interno viene aggiornato, ma non emesso. Quindi, infine, alle 11:20 Spark ha visto un punto dati con un event time di 11:15 e poiché 11:15 meno 10 minuti sono le 11:05, che è successivo alle 11:00, la finestra dalle 10:50 alle 11:00 può essere emessa alla tabella dei risultati.
Questo produce il risultato corretto incorporando correttamente i dati in base alla lateness prevista definita dal watermark. Una volta che i risultati vengono emessi, lo stato corrispondente viene rimosso dallo state store.
Per capire come incorporare questi watermark nelle nostre pipeline Structured Streaming, esploreremo questo scenario ripercorrendo un esempio di codice effettivo basato sul nostro caso d'uso dichiarato nella sezione introduttiva di questo blog.
Supponiamo di ingerire tutti i nostri dati dei sensori da un cluster Kafka nel cloud e di voler calcolare le medie di temperatura e pressione ogni dieci minuti con uno skew temporale previsto di dieci minuti. La pipeline Structured Streaming con watermarking sarebbe la seguente:
PySpark
Qui leggiamo semplicemente da Kafka, applichiamo le nostre trasformazioni e aggregazioni, quindi scriviamo su tabelle Delta Lake che verranno visualizzate e monitorate in Databricks SQL. L'output scritto nella tabella per un particolare campione di dati sarebbe simile a questo:

Per incorporare il watermarking, abbiamo prima dovuto identificare due elementi:
Prendendo dall'esempio precedente, possiamo vedere il watermark definito dal metodo .withWatermark() con la colonna eventTimestamp utilizzata come colonna event time e 10 minuti per rappresentare lo skew temporale che ci aspettiamo.
PySpark
Ora che sappiamo come implementare i watermark nelle nostre pipeline Structured Streaming, sarà importante capire come altri elementi come le operazioni di join in streaming e la gestione dello stato sono influenzati dai watermark. Inoltre, man mano che scaliamo le nostre pipeline, ci saranno metriche chiave di cui i nostri data engineer dovranno essere consapevoli e che dovranno monitorare per evitare problemi di prestazioni. Esploreremo tutto questo approfondendo il watermarking.
Prima di approfondire, è importante capire come la scelta della modalità di output influisce sul comportamento dei watermark impostati.
I watermark possono essere utilizzati solo quando si esegue l'applicazione di streaming in modalità di output append o update. Esiste una terza modalità di output, la modalità complete, in cui l'intera tabella dei risultati viene scritta nello storage. Questa modalità non può essere utilizzata perché richiede la conservazione di tutti i dati aggregati e, quindi, non può utilizzare il watermarking per eliminare lo stato intermedio.
L'implicazione di queste modalità di output nel contesto di aggregazioni window e watermarks è che in modalità ‘append’ un aggregato può essere prodotto solo una volta e non può essere aggiornato. Pertanto, una volta prodotto l'aggregato, il motore può eliminare lo stato dell'aggregato e mantenere così lo stato di aggregazione complessivo limitato. I record tardivi – quelli per i quali l'euristica del watermark approssimativo non si è applicata (erano più vecchi del periodo di ritardo del watermark), devono quindi essere scartati per necessità – l'aggregato è stato prodotto e lo stato dell'aggregato è stato eliminato.
Inversamente, per la modalità ‘update’, l'aggregato può essere prodotto ripetutamente a partire dal primo record e su ogni record ricevuto, quindi un watermark è facoltativo. Il watermark è utile solo per la potatura dello stato una volta che il motore sa, in modo euristico, che non verranno più ricevuti record per quell'aggregato. Una volta eliminato lo stato, anche i record tardivi devono essere scartati poiché il valore aggregato è andato perso e non può essere aggiornato.
È importante capire come lo stato, i record in arrivo tardivi e le diverse modalità di output potrebbero portare a comportamenti diversi della tua applicazione in esecuzione su Spark. Il punto chiave qui è che sia in modalità append che update, una volta che il watermark indica che tutti i dati sono stati ricevuti per una finestra di aggregazione, il motore può potare lo stato della finestra. In modalità append l'aggregato viene prodotto solo alla chiusura della finestra temporale più il ritardo del watermark, mentre in modalità update viene prodotto ad ogni aggiornamento della finestra.
Infine, aumentando la finestra di ritardo del watermark, la pipeline attenderà più a lungo i dati e potenzialmente scarterà meno dati – maggiore precisione, ma anche maggiore latenza per produrre gli aggregati. D'altro canto, un ritardo del watermark più breve porta a una minore precisione ma anche a una minore latenza per produrre gli aggregati.
| Lunghezza Ritardo Finestra | Precisione | Latenza |
|---|---|---|
| Finestra di Ritardo Più Lunga | Precisione Più Alta | Latenza Più Alta |
| Finestra di Ritardo Più Corta | Precisione Più Bassa | Latenza Più Bassa |
Ci sono un paio di considerazioni da tenere a mente quando si eseguono operazioni di join nelle applicazioni di streaming, in particolare quando si uniscono due stream. Supponiamo, per il nostro caso d'uso, di voler unire il dataset in streaming di letture di temperatura e pressione con valori aggiuntivi catturati da altri sensori attraverso le macchine.
Esistono tre tipi principali di join stream-stream che possono essere implementati in Structured Streaming: inner, outer e semi join. Il problema principale nell'eseguire join nelle applicazioni di streaming è che potresti avere un quadro incompleto di un lato del join. Dare a Spark la comprensione di quando non ci si aspettano più corrispondenze future è simile al problema precedente con le aggregazioni, dove Spark doveva capire quando non c'erano nuove righe da incorporare nel calcolo per l'aggregazione prima di emetterla.
Per consentire a Spark di gestire questo, possiamo sfruttare una combinazione di watermarks e vincoli di event-time all'interno della condizione di join del join stream-stream. Questa combinazione consente a Spark di filtrare i record tardivi e di potare lo stato per l'operazione di join attraverso un intervallo di tempo sulla condizione di join. Lo dimostriamo nell'esempio seguente:
PySpark
Tuttavia, a differenza dell'esempio precedente, ci saranno casi in cui ogni stream potrebbe richiedere diversi sfasamenti temporali per i propri watermarks. In questo scenario, Spark ha una policy per la gestione di definizioni multiple di watermark. Spark mantiene un watermark globale basato sullo stream più lento per garantire la massima sicurezza nel non perdere dati.
Gli sviluppatori hanno la possibilità di modificare questo comportamento cambiando spark.sql.streaming.multipleWatermarkPolicy in max; tuttavia, ciò significa che i dati dallo stream più lento verranno scartati.
Per vedere la gamma completa delle operazioni di join che richiedono o potrebbero sfruttare i watermarks, consulta questa sezione della documentazione di Spark.
Quando si gestisce una query di streaming in cui Spark potrebbe dover gestire milioni di chiavi e mantenere lo stato per ciascuna di esse, lo store di stato predefinito fornito dai cluster Databricks potrebbe non essere efficace. Potresti iniziare a notare un maggiore utilizzo della memoria, e quindi pause più lunghe di garbage collection. Entrambi questi fattori influenzeranno le prestazioni e la scalabilità della tua applicazione Structured Streaming.
È qui che entra in gioco RocksDB. Puoi sfruttare RocksDB nativamente in Databricks abilitandolo in questo modo nella configurazione di Spark:
Ciò consentirà al cluster che esegue l'applicazione Structured Streaming di sfruttare RocksDB, che può gestire lo stato in modo più efficiente nella memoria nativa e sfruttare il disco locale/SSD invece di mantenere tutto lo stato in memoria.
Oltre a monitorare l'utilizzo della memoria e le metriche di garbage collection, ci sono altri indicatori e metriche chiave che dovrebbero essere raccolti e tracciati quando si ha a che fare con Watermarking e Structured Streaming. Per accedere a queste metriche puoi consultare gli oggetti StreamingQueryProgress e StateOperatorProgress. Consulta la nostra documentazione per esempi su come utilizzarli qui.
Nell'oggetto StreamingQueryProgress, c'è un metodo chiamato "eventTime" che può essere chiamato e che restituirà i timestamp max, min, avg e watermark. I primi tre sono il tempo evento massimo, minimo e medio visto in quel trigger. L'ultimo è il watermark utilizzato nel trigger.
Esempio abbreviato di un oggetto StreamingQueryProgress
Questi dati possono essere utilizzati per riconciliare i dati nelle tabelle di risultato che le tue query di streaming producono e anche per verificare che il watermark utilizzato sia il timestamp eventTime previsto. Questo può diventare importante quando si uniscono stream di dati.
All'interno dell'oggetto StateOperatorProgress c'è la metrica numRowsDroppedByWatermark. Questa metrica mostrerà quante righe vengono considerate troppo tardive per essere incluse nell'aggregazione stateful. Nota che questa metrica misura le righe scartate post-aggregazione e non le righe di input grezze, quindi il numero non è preciso ma può dare un'indicazione che ci sono dati tardivi che vengono scartati. Questo, in combinazione con le informazioni dall'oggetto StreamingQueryProgress, può aiutare gli sviluppatori a determinare se i watermarks sono configurati correttamente.
Un limite ancora presente nelle query Structured Streaming è la concatenazione di più operatori stateful (ad esempio, aggregazioni, join in streaming) in un'unica query di streaming. Questo limite di un singolo watermark globale per le aggregazioni stateful è qualcosa per cui noi di Databricks stiamo lavorando a una soluzione e rilasceremo maggiori informazioni nei prossimi mesi. Dai un'occhiata al nostro blog su Project Lightspeed per saperne di più: Project Lightspeed: Elaborazione di stream più veloce e semplice con Apache Spark (databricks.com).
Con Structured Streaming e Watermarking su Databricks, le organizzazioni, come quella con il caso d'uso descritto sopra, possono creare applicazioni resilienti in tempo reale che garantiscono che le metriche guidate da aggregazioni in tempo reale vengano calcolate accuratamente anche se i dati non sono ordinati correttamente o non arrivano in tempo. Per saperne di più su come creare applicazioni in tempo reale con Databricks, contatta il tuo rappresentante Databricks.
(Questo post sul blog è stato tradotto utilizzando strumenti basati sull'intelligenza artificiale) Post originale
