Zustandsbehaftete Stream-Verarbeitung bezieht sich auf die Echtzeitverarbeitung eines kontinuierlichen Ereignisstroms, wobei der Zustand basierend auf den bisher gesehenen Ereignissen beibehalten wird. Dies ermöglicht es dem System, Änderungen und Muster im Laufe der Zeit im Ereignisstrom zu verfolgen und Entscheidungen oder Aktionen auf der Grundlage dieser Informationen zu treffen.
Die zustandsbehaftete Stream-Verarbeitung in Apache Spark Structured Streaming wird mithilfe integrierter Operatoren (wie z. B. aggregierte Fensterfunktionen, Stream-Stream-Joins, Duplikate entfernen usw.) für vordefinierte Logik und mithilfe von flatMapGroupWithState oder mapGroupWithState für beliebige Logik unterstützt. Die beliebige Logik ermöglicht es Benutzern, ihren benutzerdefinierten Zustandsmanipulationscode in ihren Pipelines zu schreiben. Mit zunehmender Verbreitung von Streaming im Unternehmen erfordern jedoch immer komplexere und anspruchsvollere Streaming-Anwendungen mehrere zusätzliche Funktionen, um es Entwicklern zu erleichtern, zustandsbehaftete Streaming-Pipelines zu schreiben.
Um diese neuen, wachsenden zustandsbehafteten Streaming-Anwendungen oder operativen Anwendungsfälle zu unterstützen, führt die Spark-Community einen neuen Spark-Operator namens transformWithState ein. Dieser Operator ermöglicht flexible Datenmodellierung, zusammengesetzte Datentypen, Timer, TTL, Verknüpfung zustandsbehafteter Operatoren nach transformWithState, Schema-Evolution, Wiederverwendung von Zuständen aus einer anderen Abfrage und Integration mit einer Vielzahl anderer Databricks-Funktionen wie Unity Catalog, Delta Live Tables und Spark Connect. Mit diesem Operator können Kunden ihre geschäftskritischen, komplexen zustandsbehafteten operativen Anwendungsfälle zuverlässig und effizient auf der Databricks-Plattform mit beliebten Sprachen wie Scala, Java oder Python entwickeln und ausführen.
Viele ereignisgesteuerte Anwendungen verlassen sich auf zustandsbehaftete Berechnungen, um Aktionen auszulösen oder Ausgabereignisse zu emittieren, die normalerweise in ein anderes Ereignisprotokoll/eine andere Nachrichtenwarteschlange wie Apache Kafka/Apache Pulsar/Google Pub-Sub usw. geschrieben werden. Diese Anwendungen implementieren normalerweise eine Zustandsmaschine, die Regeln validiert, Anomalien erkennt, Sitzungen verfolgt usw. und die abgeleiteten Ergebnisse generiert, die normalerweise verwendet werden, um Aktionen auf nachgelagerten Systemen auszulösen, basierend auf:
Beispiele für solche Anwendungen sind Benutzererlebnisverfolgung, Anomalieerkennung, Überwachung von Geschäftsprozessen und Entscheidungsbäume.
Apache Spark führt jetzt transformWithState ein, einen zustandsbehafteten Verarbeitungsoperator der nächsten Generation, der die Erstellung komplexer Echtzeit-Streaming-Anwendungen flexibler, effizienter und skalierbarer macht. Diese neue API eröffnet erweiterte Funktionen für Zustandsverwaltung, Ereignisverarbeitung, Timer-Verwaltung und Schema-Evolution, sodass Benutzer komplexe Streaming-Logik einfach implementieren können.
Wir führen einen neuen, flexiblen, erweiterbaren API-Ansatz mit mehreren Ebenen ein, um die oben genannten Einschränkungen zu beheben. Ein Architekturdiagramm der Schichtenarchitektur und die zugehörigen Funktionen auf verschiedenen Ebenen ist unten dargestellt.

Wie in der Abbildung gezeigt, verwenden wir weiterhin die derzeit verfügbaren State-Backends. Derzeit unterstützt Apache Spark zwei State-Store-Backends:
Der neue transformWithState-Operator wird zunächst nur mit dem RocksDB State Store Provider unterstützt. Wir nutzen verschiedene RocksDB-Funktionalitäten wie Range Scans, Merge-Operatoren usw., um eine optimale Leistung für die verschiedenen Funktionen innerhalb von transformWithState zu gewährleisten. Auf dieser Ebene bauen wir eine weitere Abstraktionsebene auf, die den StatefulProcessorHandle verwendet, um mit zusammengesetzten Typen, Timern, Abfrage-Metadaten usw. zu arbeiten. Auf der Operatorebene ermöglichen wir die Verwendung eines StatefulProcessor, der die Anwendungslogik einbetten kann, die zur Bereitstellung dieser leistungsstarken Streaming-Anwendungen verwendet wird. Schließlich können Sie den StatefulProcessor in Apache Spark-Abfragen basierend auf den DataFrame-APIs verwenden.
Hier ist ein Beispiel für eine Apache Spark Streaming-Abfrage, die den transformWithState-Operator verwendet:
Mit transformWithState können Benutzer jetzt mehrere unabhängige Zustandsvariablen innerhalb eines StatefulProcessor basierend auf dem objektorientierten Programmiermodell definieren. Diese Variablen funktionieren wie private Klassenmember und ermöglichen eine granulare Zustandsverwaltung, ohne dass eine monolithische Zustandsstruktur erforderlich ist. Dies erleichtert die Weiterentwicklung der Anwendungslogik im Laufe der Zeit durch Hinzufügen oder Ändern von Zustandsvariablen, ohne Abfragen aus einem neuen Checkpoint-Verzeichnis neu starten zu müssen.
Benutzer können jetzt Timer registrieren, um ereignisgesteuerte Anwendungslogik auszulösen. Die API unterstützt sowohl Verarbeitungszeit (basierend auf der Wanduhr) als auch Ereigniszeit (spaltenbasiert) Timer. Wenn ein Timer ausgelöst wird, wird ein Callback ausgegeben, der eine effiziente Ereignisbehandlung, Zustandsaktualisierungen und die Generierung von Ausgaben ermöglicht. Die Möglichkeit, Timer aufzulisten, zu registrieren und zu löschen, gewährleistet eine präzise Kontrolle über die Ereignisverarbeitung.
Die Zustandsverwaltung ist jetzt intuitiver mit integrierter Unterstützung für zusammengesetzte Datenstrukturen:
Spark kodiert und speichert diese Zustandstypen automatisch, wodurch die Notwendigkeit manueller Serialisierung reduziert und die Leistung verbessert wird.
Für Compliance- und betriebliche Effizienz führt transformWithState native Time-to-Live (TTL)-Unterstützung für Zustandsvariablen ein. Dies ermöglicht es Benutzern, Ablaufrichtlinien zu definieren, sodass alte Zustandsdaten automatisch entfernt werden, ohne dass eine manuelle Bereinigung erforderlich ist.
Mit dieser neuen API können zustandsbehaftete Operatoren jetzt nach transformWithState verkettet werden, auch wenn die Ereigniszeit als Zeitmodus verwendet wird. Durch die explizite Referenzierung von Ereigniszeitspalten im Ausgabeschema können nachgelagerte Operatoren die Filterung von späten Datensätzen und die Zustandsbereinigung nahtlos durchführen – wodurch komplexe Workarounds mit mehreren Pipelines und externem Speicher entfallen.
Benutzer können Zustände aus vorhandenen Abfragen initialisieren, was das Neustarten oder Klonen von Streaming-Jobs erleichtert. Die API ermöglicht eine nahtlose Integration mit dem State Data Source Reader, sodass neue Abfragen zuvor geschriebene Zustände ohne komplexe Migrationsprozesse nutzen können.
transformWithState unterstützt die Schema-Evolution und ermöglicht Änderungen wie:
Apache Spark erkennt und wendet automatisch kompatible Schema-Updates an, sodass Abfragen im selben Checkpoint-Verzeichnis weiter ausgeführt werden können. Dies macht vollständige Zustandsneuerstellungen und Neuverarbeitungen überflüssig und reduziert Ausfallzeiten und betriebliche Komplexität erheblich.
Für einfacheres Debugging und bessere Beobachtbarkeit ist transformWithState nativ in den State Data Source Reader integriert. Benutzer können Zustandsvariablen inspizieren und Zustandsdaten direkt abfragen, was die Fehlerbehebung und Analyse vereinfacht, einschließlich erweiterter Funktionen wie readChangeFeed usw.
Die transformWithState API ist ab der Databricks Runtime 16.2-Version für No-Isolation- und Unity Catalog Dedicated Clusters verfügbar. Unterstützung für Unity Catalog Standard Clusters und Serverless Compute wird bald hinzugefügt. Die API soll auch mit der Apache Spark™ 4.0-Version Open Source verfügbar sein.
Wir glauben, dass all die Funktionsverbesserungen, die in der neuen transformWithState API enthalten sind, den Aufbau einer neuen Klasse zuverlässiger, skalierbarer und geschäftskritischer operativer Workloads ermöglichen werden, die die wichtigsten Anwendungsfälle für unsere Kunden und Benutzer unterstützen, und das alles bequem und einfach mit den Apache Spark DataFrame APIs. Wichtig ist, dass diese Änderungen auch die Grundlage für zukünftige Verbesserungen bestehender sowie neuer zustandsbehafteter Operatoren in Apache Spark Structured Streaming bilden. Wir freuen uns über die Fortschritte im State Management in Apache Spark™ Structured Streaming in den letzten Jahren und blicken den geplanten Entwicklungen in diesem Bereich in naher Zukunft mit Spannung entgegen.
Weitere Informationen zu zustandsbehafteter Stream-Verarbeitung und transformWithState auf Databricks finden Sie hier.
(Dieser Blogbeitrag wurde mit KI-gestützten Tools übersetzt.) Originalbeitrag
