Beim Erstellen von Echtzeit-Pipelines ist eine der Realitäten, mit denen Teams umgehen müssen, dass die verteilte Datenerfassung von Natur aus ungeordnet ist. Darüber hinaus müssen Teams im Kontext von zustandsbehafteten Streaming-Operationen den Fortschritt der Ereigniszeit im Datenstrom, den sie erfassen, ordnungsgemäß verfolgen können, um Zeitfensteraggregationen und andere zustandsbehaftete Operationen korrekt zu berechnen. Wir können all dies mit Structured Streaming lösen.
Nehmen wir zum Beispiel an, wir sind ein Team, das eine Pipeline zur proaktiven Wartung unserer Bergbaumaschinen erstellt, die wir an unsere Kunden vermieten. Diese Maschinen müssen immer in Top-Zustand laufen, daher überwachen wir sie in Echtzeit. Wir müssen zustandsbehaftete Aggregationen auf den Streaming-Daten durchführen, um Probleme bei den Maschinen zu verstehen und zu identifizieren.
Hier müssen wir Structured Streaming und Watermarking nutzen, um die notwendigen zustandsbehafteten Aggregationen zu erstellen, die Entscheidungen rund um vorausschauende Wartung und mehr für diese Maschinen informieren.
Im Allgemeinen gibt es bei der Arbeit mit Echtzeit-Streaming-Daten Verzögerungen zwischen der Ereigniszeit und der Verarbeitungszeit, bedingt durch die Art der Datenerfassung und ob die gesamte Anwendung Probleme wie Ausfallzeiten aufweist. Aufgrund dieser potenziellen variablen Verzögerungen muss die Engine, die Sie zur Verarbeitung dieser Daten verwenden, über einen Mechanismus verfügen, um zu entscheiden, wann die aggregierten Fenster geschlossen und das aggregierte Ergebnis erzeugt werden soll.
Während die natürliche Neigung, diese Probleme zu beheben, darin bestehen könnte, eine feste Verzögerung basierend auf der Wanduhrzeit zu verwenden, werden wir in diesem folgenden Beispiel zeigen, warum dies nicht die beste Lösung ist.
Um dies visuell zu erklären, betrachten wir ein Szenario, in dem wir Daten zu verschiedenen Zeiten von etwa 10:50 Uhr bis 11:20 Uhr erhalten. Wir erstellen 10-minütige, rollierende Fenster, die den Durchschnitt der Temperatur- und Druckwerte berechnen, die während des Fensterzeitraums eingegangen sind.
Im ersten Bild werden die rollierenden Fenster um 11:00 Uhr, 11:10 Uhr und 11:20 Uhr ausgelöst, was zu den zu den jeweiligen Zeiten angezeigten Ergebnistabellen führt. Wenn die zweite Datencharge gegen 11:10 Uhr mit Daten eintrifft, die eine Ereigniszeit von 10:53 Uhr haben, wird diese in die für das um 11:10 Uhr geschlossene Fenster von 11:00 Uhr bis 11:10 Uhr berechneten Temperatur- und Druckdurchschnitte einbezogen, was nicht das korrekte Ergebnis liefert.

Um sicherzustellen, dass wir die korrekten Ergebnisse für die gewünschten Aggregationen erhalten, müssen wir ein Wasserzeichen definieren, das es Spark ermöglicht zu verstehen, wann das aggregierte Fenster geschlossen und das korrekte aggregierte Ergebnis erzeugt werden soll.
In Structured Streaming-Anwendungen können wir durch die Verwendung einer Funktion namens Watermarking sicherstellen, dass alle relevanten Daten für die zu berechnenden Aggregationen gesammelt werden. Im Grunde genommen weiß Spark Structured Streaming durch die Definition eines Wasserzeichens, wann es alle Daten bis zu einer bestimmten Zeit, T, erfasst hat (basierend auf einer festgelegten Erwartung der Latenz), sodass es Fensteraggregationen bis zum Zeitstempel T schließen und erzeugen kann.
Diese zweite Grafik zeigt die Auswirkung der Implementierung eines 10-minütigen Wasserzeichens und der Verwendung des Append-Modus in Spark Structured Streaming.

Im Gegensatz zum ersten Szenario, in dem Spark die Fensteraggregation für die vorherigen zehn Minuten alle zehn Minuten ausgibt (d. h. das Fenster von 11:00 bis 11:10 Uhr um 11:10 Uhr ausgibt), wartet Spark nun, bis es die Fensteraggregation schließt und ausgibt, sobald die maximale Ereigniszeit abzüglich des angegebenen Wasserzeichens größer ist als die Obergrenze des Fensters.
Mit anderen Worten, Spark musste warten, bis es Datenpunkte sah, bei denen die neueste Ereigniszeit abzüglich 10 Minuten größer als 11:00 Uhr war, um das aggregierte Fenster von 10:50 bis 11:00 Uhr auszugeben. Um 11:00 Uhr erfüllt es diese Bedingung nicht und initialisiert nur die aggregierte Berechnung im internen Zustandsspeicher von Spark. Um 11:10 Uhr ist diese Bedingung immer noch nicht erfüllt, aber wir haben einen neuen Datenpunkt für 10:53 Uhr, sodass der interne Zustand aktualisiert wird, aber nicht ausgegeben wird. Dann, schließlich um 11:20 Uhr, hat Spark einen Datenpunkt mit einer Ereigniszeit von 11:15 Uhr gesehen, und da 11:15 Uhr abzüglich 10 Minuten 11:05 Uhr ist, was später als 11:00 Uhr ist, kann die Aggregation des Fensters von 10:50 bis 11:00 Uhr in die Ergebnistabelle ausgegeben werden.
Dies liefert das korrekte Ergebnis, indem die Daten ordnungsgemäß basierend auf der durch das Wasserzeichen definierten erwarteten Latenz einbezogen werden. Sobald die Ergebnisse ausgegeben sind, wird der entsprechende Zustand aus dem Zustandsspeicher entfernt.
Um zu verstehen, wie diese Wasserzeichen in unsere Structured Streaming-Pipelines integriert werden, werden wir dieses Szenario durchgehen und ein tatsächliches Codebeispiel basierend auf unserem Anwendungsfall untersuchen, der im Einführungsteil dieses Blogs beschrieben wurde.
Nehmen wir an, wir erfassen alle unsere Sensordaten aus einem Kafka-Cluster in der Cloud und möchten alle zehn Minuten Temperatur- und Druckdurchschnitte mit einer erwarteten Zeitabweichung von zehn Minuten berechnen. Die Structured Streaming-Pipeline mit Wasserzeichen würde wie folgt aussehen:
PySpark
Hier lesen wir einfach von Kafka, wenden unsere Transformationen und Aggregationen an und schreiben dann in Delta Lake-Tabellen, die in Databricks SQL visualisiert und überwacht werden. Die in die Tabelle geschriebene Ausgabe für eine bestimmte Datenauswahl würde wie folgt aussehen:

Um Watermarking zu integrieren, mussten wir zuerst zwei Punkte identifizieren:
Aus dem vorherigen Beispiel entnommen, sehen wir das Wasserzeichen, das durch die Methode .withWatermark() definiert wird, wobei die Spalte eventTimestamp als Ereigniszeitspalte und 10 Minuten für die erwartete Zeitabweichung verwendet werden.
PySpark
Nachdem wir nun wissen, wie Wasserzeichen in unsere Structured Streaming-Pipeline integriert werden, ist es wichtig zu verstehen, wie andere Elemente wie Streaming-Join-Operationen und die Verwaltung von Zuständen von Wasserzeichen beeinflusst werden. Darüber hinaus gibt es bei der Skalierung unserer Pipelines wichtige Metriken, auf die unsere Dateningenieure achten und die sie überwachen müssen, um Leistungsprobleme zu vermeiden. Wir werden all dies untersuchen, wenn wir tiefer in das Thema Watermarking eintauchen.
Bevor wir tiefer eintauchen, ist es wichtig zu verstehen, wie Ihre Wahl des Ausgabe-Modus das Verhalten der von Ihnen festgelegten Wasserzeichen beeinflusst.
Wasserzeichen können nur verwendet werden, wenn Sie Ihre Streaming-Anwendung in den Ausgabe-Modi append oder update ausführen. Es gibt einen dritten Ausgabe-Modus, den vollständigen Modus (complete mode), in dem die gesamte Ergebnistabelle in den Speicher geschrieben wird. Dieser Modus kann nicht verwendet werden, da er erfordert, dass alle aggregierten Daten beibehalten werden, und daher keine Wasserzeichen zum Löschen von Zwischenzuständen verwendet werden können.
Die Implikation dieser Ausgabe-Modi im Kontext von Fenster-Aggregationen und Watermarks ist, dass in „append“-Modus eine Aggregation nur einmal erzeugt und nicht aktualisiert werden kann. Daher kann die Engine, sobald die Aggregation erzeugt wurde, den Zustand der Aggregation löschen und somit den gesamten Aggregationszustand begrenzt halten. Späte Datensätze – diejenigen, für die die ungefähre Watermark-Heuristik nicht angewendet wurde (sie waren älter als die Watermark-Verzögerungsperiode) – müssen daher zwangsläufig verworfen werden, da die Aggregation erzeugt wurde und der Aggregationszustand gelöscht wurde.
Umgekehrt kann im „update“-Modus die Aggregation wiederholt ab dem ersten Datensatz und bei jedem empfangenen Datensatz erzeugt werden, sodass ein Watermark optional ist. Das Watermark ist nur nützlich, um den Zustand zu kürzen, sobald die Engine heuristisch weiß, dass keine weiteren Datensätze für diese Aggregation empfangen werden können. Sobald der Zustand gelöscht ist, müssen wiederum alle späten Datensätze verworfen werden, da der Aggregationswert verloren gegangen ist und nicht aktualisiert werden kann.
Es ist wichtig zu verstehen, wie Zustand, spät ankommende Datensätze und die verschiedenen Ausgabe-Modi zu unterschiedlichen Verhaltensweisen Ihrer Anwendung auf Spark führen können. Die wichtigste Erkenntnis hier ist, dass sowohl im Append- als auch im Update-Modus, sobald das Watermark anzeigt, dass alle Daten für ein Aggregationszeitfenster empfangen wurden, die Engine den Fensterzustand kürzen kann. Im Append-Modus wird die Aggregation nur am Ende des Zeitfensters plus der Watermark-Verzögerung erzeugt, während sie im Update-Modus bei jeder Aktualisierung des Fensters erzeugt wird.
Schließlich führt die Erhöhung Ihres Watermark-Verzögerungsfensters dazu, dass die Pipeline länger auf Daten wartet und potenziell weniger Daten verworfen werden – höhere Präzision, aber auch höhere Latenz bei der Erzeugung der Aggregationen. Auf der anderen Seite führt ein kürzeres Watermark-Verzögerungsfenster zu geringerer Präzision, aber auch zu geringerer Latenz bei der Erzeugung der Aggregationen.
| Fensterverzögerungslänge | Präzision | Latenz |
|---|---|---|
| Längeres Verzögerungsfenster | Höhere Präzision | Höhere Latenz |
| Kürzeres Verzögerungsfenster | Geringere Präzision | Geringere Latenz |
Es gibt einige Überlegungen, die Sie bei Join-Operationen in Ihren Streaming-Anwendungen beachten sollten, insbesondere beim Joinen zweier Streams. Nehmen wir für unseren Anwendungsfall an, wir möchten den Streaming-Datensatz mit Temperatur- und Druckmesswerten mit zusätzlichen Werten verbinden, die von anderen Sensoren über die Maschinen erfasst werden.
Es gibt drei übergeordnete Arten von Stream-Stream-Joins, die in Structured Streaming implementiert werden können: Inner-, Outer- und Semi-Joins. Das Hauptproblem bei Joins in Streaming-Anwendungen ist, dass Sie möglicherweise kein vollständiges Bild von einer Seite des Joins haben. Spark die Information zu geben, wann keine zukünftigen Übereinstimmungen zu erwarten sind, ähnelt dem früheren Problem mit Aggregationen, bei denen Spark verstehen musste, wann keine neuen Zeilen zur Berechnung der Aggregation vorlagen, bevor diese ausgegeben wurde.
Damit Spark dies handhaben kann, können wir eine Kombination aus Watermarks und Event-Time-Bedingungen innerhalb der Join-Bedingung des Stream-Stream-Joins nutzen. Diese Kombination ermöglicht es Spark, späte Datensätze herauszufiltern und den Zustand für die Join-Operation über eine Zeitbereichsbedingung im Join zu kürzen. Dies demonstrieren wir im folgenden Beispiel:
PySpark
Im Gegensatz zum obigen Beispiel wird es jedoch Zeiten geben, in denen jeder Stream unterschiedliche Zeitverschiebungen für seine Watermarks benötigt. In diesem Szenario hat Spark eine Richtlinie für die Handhabung mehrerer Watermark-Definitionen. Spark unterhält ein globales Watermark, das auf dem langsamsten Stream basiert, um die höchste Sicherheit zu gewährleisten, wenn es darum geht, keine Daten zu verpassen.
Entwickler haben die Möglichkeit, dieses Verhalten zu ändern, indem sie spark.sql.streaming.multipleWatermarkPolicy auf max; setzen. Dies bedeutet jedoch, dass Daten aus dem langsameren Stream verworfen werden.
Um den vollen Umfang der Join-Operationen zu sehen, die Watermarks erfordern oder nutzen können, schauen Sie sich diesen Abschnitt der Spark-Dokumentation an.
Bei der Verwaltung einer Streaming-Abfrage, bei der Spark möglicherweise Millionen von Schlüsseln verwalten und Zustände für jeden einzelnen speichern muss, ist der Standard-Zustandsspeicher, der mit Databricks-Clustern geliefert wird, möglicherweise nicht effektiv. Sie könnten höhere Speicherauslastung und dann längere Garbage-Collection-Pausen feststellen. Beides beeinträchtigt die Leistung und Skalierbarkeit Ihrer Structured Streaming-Anwendung.
Hier kommt RocksDB ins Spiel. Sie können RocksDB nativ in Databricks nutzen, indem Sie es wie folgt in der Spark-Konfiguration aktivieren:
Dadurch kann der Cluster, der die Structured Streaming-Anwendung ausführt, RocksDB nutzen, das den Zustand effizienter im Arbeitsspeicher verwalten und den lokalen Speicher/SSD nutzen kann, anstatt den gesamten Zustand im Arbeitsspeicher zu halten.
Neben der Überwachung von Speicherverbrauch und Garbage-Collection-Metriken gibt es weitere wichtige Indikatoren und Metriken, die bei der Arbeit mit Watermarking und Structured Streaming gesammelt und verfolgt werden sollten. Um auf diese Metriken zuzugreifen, können Sie sich die Objekte StreamingQueryProgress und StateOperatorProgress ansehen. Beispiele für die Verwendung dieser finden Sie in unserer Dokumentation hier.
Im StreamingQueryProgress-Objekt gibt es eine Methode namens „eventTime“, die aufgerufen werden kann und die die max-, min-, avg- und watermark-Zeitstempel zurückgibt. Die ersten drei sind die maximalen, minimalen und durchschnittlichen Event-Zeiten, die in diesem Trigger gesehen wurden. Der letzte ist das im Trigger verwendete Watermark.
Abgekürztes Beispiel eines StreamingQueryProgress-Objekts
Diese Informationen können verwendet werden, um die Daten in den Ergebnistabellen, die Ihre Streaming-Abfragen ausgeben, abzugleichen und auch um zu überprüfen, ob das verwendete Watermark der beabsichtigte Event-Time-Zeitstempel ist. Dies kann wichtig werden, wenn Sie Datenströme zusammenführen.
Innerhalb des StateOperatorProgress-Objekts befindet sich die Metrik numRowsDroppedByWatermark. Diese Metrik zeigt an, wie viele Zeilen als zu spät betrachtet werden, um in die zustandsbehaftete Aggregation einbezogen zu werden. Beachten Sie, dass diese Metrik Zeilen misst, die nach der Aggregation verworfen werden, und nicht die rohen Eingabezeilen. Die Zahl ist also nicht präzise, kann aber einen Hinweis darauf geben, dass späte Daten verworfen werden. Dies kann in Verbindung mit den Informationen aus dem StreamingQueryProgress-Objekt Entwicklern helfen zu bestimmen, ob die Watermarks korrekt konfiguriert sind.
Eine verbleibende Einschränkung von Structured Streaming-Abfragen ist die Verknüpfung mehrerer zustandsbehafteter Operatoren (z. B. Aggregationen, Streaming-Joins) in einer einzigen Streaming-Abfrage. Diese Einschränkung eines einzelnen globalen Watermarks für zustandsbehaftete Aggregationen ist etwas, woran wir bei Databricks an einer Lösung arbeiten und über das wir in den kommenden Monaten weitere Informationen veröffentlichen werden. Lesen Sie mehr in unserem Blog zu Project Lightspeed: Project Lightspeed: Faster and Simpler Stream Processing With Apache Spark (databricks.com).
Mit Structured Streaming und Watermarking auf Databricks können Organisationen, wie die im obigen Anwendungsfall beschriebene, robuste Echtzeitanwendungen erstellen, die sicherstellen, dass Metriken, die durch Echtzeitaggregationen angetrieben werden, auch dann korrekt berechnet werden, wenn Daten nicht richtig sortiert oder pünktlich eintreffen. Um mehr darüber zu erfahren, wie Sie Echtzeitanwendungen mit Databricks erstellen können, wenden Sie sich an Ihren Databricks-Ansprechpartner.
(Dieser Blogbeitrag wurde mit KI-gestützten Tools übersetzt.) Originalbeitrag
