Direkt zum Hauptinhalt

Einführung von transformWithState in Apache Spark™ Structured Streaming

Erstellen Sie flexible, skalierbare zustandsbehaftete Streaming-Abfragen

Introducing transformWithState in Apache Spark™ Structured Streaming

Summary

  • Flexiblere zustandsbehaftete Verarbeitung – transformWithState erweitert Apache Spark™ Structured Streaming um flexibles Zustandsmanagement, zusammengesetzte Datentypen und ereignisgesteuerte Programmierung.
  • Verbesserte Leistung & Einfachheit – Funktionen wie TTL-basierte Zustandsablaufsteuerung, zusammengesetzte Typen, Operator-Verkettung und nahtlose Zustandsinitialisierung reduzieren die Komplexität und steigern die Effizienz.
  • Zukunftsfähig & Skalierbar – Native Schema-Evolution und tiefe Integration mit Databricks-Funktionen gewährleisten zuverlässige, skalierbare Streaming-Anwendungen.

Einleitung

Zustandsbehaftete Stream-Verarbeitung bezieht sich auf die Echtzeitverarbeitung eines kontinuierlichen Ereignisstroms, wobei der Zustand basierend auf den bisher gesehenen Ereignissen beibehalten wird. Dies ermöglicht es dem System, Änderungen und Muster im Laufe der Zeit im Ereignisstrom zu verfolgen und Entscheidungen oder Aktionen auf der Grundlage dieser Informationen zu treffen.

Die zustandsbehaftete Stream-Verarbeitung in Apache Spark Structured Streaming wird mithilfe integrierter Operatoren (wie z. B. aggregierte Fensterfunktionen, Stream-Stream-Joins, Duplikate entfernen usw.) für vordefinierte Logik und mithilfe von flatMapGroupWithState oder mapGroupWithState für beliebige Logik unterstützt. Die beliebige Logik ermöglicht es Benutzern, ihren benutzerdefinierten Zustandsmanipulationscode in ihren Pipelines zu schreiben. Mit zunehmender Verbreitung von Streaming im Unternehmen erfordern jedoch immer komplexere und anspruchsvollere Streaming-Anwendungen mehrere zusätzliche Funktionen, um es Entwicklern zu erleichtern, zustandsbehaftete Streaming-Pipelines zu schreiben.

Um diese neuen, wachsenden zustandsbehafteten Streaming-Anwendungen oder operativen Anwendungsfälle zu unterstützen, führt die Spark-Community einen neuen Spark-Operator namens transformWithState ein. Dieser Operator ermöglicht flexible Datenmodellierung, zusammengesetzte Datentypen, Timer, TTL, Verknüpfung zustandsbehafteter Operatoren nach transformWithState, Schema-Evolution, Wiederverwendung von Zuständen aus einer anderen Abfrage und Integration mit einer Vielzahl anderer Databricks-Funktionen wie Unity Catalog, Delta Live Tables und Spark Connect. Mit diesem Operator können Kunden ihre geschäftskritischen, komplexen zustandsbehafteten operativen Anwendungsfälle zuverlässig und effizient auf der Databricks-Plattform mit beliebten Sprachen wie Scala, Java oder Python entwickeln und ausführen.

Anwendungen/Anwendungsfälle mit zustandsbehafteter Stream-Verarbeitung

Viele ereignisgesteuerte Anwendungen verlassen sich auf zustandsbehaftete Berechnungen, um Aktionen auszulösen oder Ausgabereignisse zu emittieren, die normalerweise in ein anderes Ereignisprotokoll/eine andere Nachrichtenwarteschlange wie Apache Kafka/Apache Pulsar/Google Pub-Sub usw. geschrieben werden. Diese Anwendungen implementieren normalerweise eine Zustandsmaschine, die Regeln validiert, Anomalien erkennt, Sitzungen verfolgt usw. und die abgeleiteten Ergebnisse generiert, die normalerweise verwendet werden, um Aktionen auf nachgelagerten Systemen auszulösen, basierend auf:

  • Eingangsereignisse
  • Zustand
  • Zeit (Fähigkeit, mit Verarbeitungszeit und Ereigniszeit zu arbeiten)
  • Ausgabeereignisse

Beispiele für solche Anwendungen sind BenutzererlebnisverfolgungAnomalieerkennungÜberwachung von Geschäftsprozessen und Entscheidungsbäume.

Einführung von transformWithState: Eine leistungsfähigere API für zustandsbehaftete Verarbeitung

Apache Spark führt jetzt transformWithState ein, einen zustandsbehafteten Verarbeitungsoperator der nächsten Generation, der die Erstellung komplexer Echtzeit-Streaming-Anwendungen flexibler, effizienter und skalierbarer macht. Diese neue API eröffnet erweiterte Funktionen für Zustandsverwaltung, Ereignisverarbeitung, Timer-Verwaltung und Schema-Evolution, sodass Benutzer komplexe Streaming-Logik einfach implementieren können.

High-Level-Design

Wir führen einen neuen, flexiblen, erweiterbaren API-Ansatz mit mehreren Ebenen ein, um die oben genannten Einschränkungen zu beheben. Ein Architekturdiagramm der Schichtenarchitektur und die zugehörigen Funktionen auf verschiedenen Ebenen ist unten dargestellt.

Layered State API

Wie in der Abbildung gezeigt, verwenden wir weiterhin die derzeit verfügbaren State-Backends. Derzeit unterstützt Apache Spark zwei State-Store-Backends:

  • HDFSBackedStateStoreProvider
  • RocksDBStateStoreProvider

Der neue transformWithState-Operator wird zunächst nur mit dem RocksDB State Store Provider unterstützt. Wir nutzen verschiedene RocksDB-Funktionalitäten wie Range Scans, Merge-Operatoren usw., um eine optimale Leistung für die verschiedenen Funktionen innerhalb von transformWithState zu gewährleisten. Auf dieser Ebene bauen wir eine weitere Abstraktionsebene auf, die den StatefulProcessorHandle verwendet, um mit zusammengesetzten Typen, Timern, Abfrage-Metadaten usw. zu arbeiten. Auf der Operatorebene ermöglichen wir die Verwendung eines StatefulProcessor, der die Anwendungslogik einbetten kann, die zur Bereitstellung dieser leistungsstarken Streaming-Anwendungen verwendet wird. Schließlich können Sie den StatefulProcessor in Apache Spark-Abfragen basierend auf den DataFrame-APIs verwenden.

Hier ist ein Beispiel für eine Apache Spark Streaming-Abfrage, die den transformWithState-Operator verwendet:

5-FACHER LEADER

Gartner®: Databricks als Leader für Cloud-Datenbanken

Hauptmerkmale von transformWithState

Flexible Datenmodellierung mit Zustandsvariablen

Mit transformWithState können Benutzer jetzt mehrere unabhängige Zustandsvariablen innerhalb eines StatefulProcessor basierend auf dem objektorientierten Programmiermodell definieren. Diese Variablen funktionieren wie private Klassenmember und ermöglichen eine granulare Zustandsverwaltung, ohne dass eine monolithische Zustandsstruktur erforderlich ist. Dies erleichtert die Weiterentwicklung der Anwendungslogik im Laufe der Zeit durch Hinzufügen oder Ändern von Zustandsvariablen, ohne Abfragen aus einem neuen Checkpoint-Verzeichnis neu starten zu müssen.

Timer und Callbacks für ereignisgesteuerte Verarbeitung

Benutzer können jetzt Timer registrieren, um ereignisgesteuerte Anwendungslogik auszulösen. Die API unterstützt sowohl Verarbeitungszeit (basierend auf der Wanduhr) als auch Ereigniszeit (spaltenbasiert) Timer. Wenn ein Timer ausgelöst wird, wird ein Callback ausgegeben, der eine effiziente Ereignisbehandlung, Zustandsaktualisierungen und die Generierung von Ausgaben ermöglicht. Die Möglichkeit, Timer aufzulisten, zu registrieren und zu löschen, gewährleistet eine präzise Kontrolle über die Ereignisverarbeitung.

Native Unterstützung für zusammengesetzte Datentypen

Die Zustandsverwaltung ist jetzt intuitiver mit integrierter Unterstützung für zusammengesetzte Datenstrukturen:

  • ValueState: Speichert einen einzelnen Wert pro Gruppierungsschlüssel.
  • ListState: Verwaltet eine Liste von Werten pro Schlüssel und unterstützt effiziente Anfügeoperationen.
  • MapState: Ermöglicht die Speicherung von Schlüssel-Wert-Paaren innerhalb jedes Gruppierungsschlüssels mit effizienten Punkt-Lookups.

Spark kodiert und speichert diese Zustandstypen automatisch, wodurch die Notwendigkeit manueller Serialisierung reduziert und die Leistung verbessert wird.

Automatische Zustandsablaufverfolgung mit TTL

Für Compliance- und betriebliche Effizienz führt transformWithState native Time-to-Live (TTL)-Unterstützung für Zustandsvariablen ein. Dies ermöglicht es Benutzern, Ablaufrichtlinien zu definieren, sodass alte Zustandsdaten automatisch entfernt werden, ohne dass eine manuelle Bereinigung erforderlich ist.

Verknüpfung von Operatoren nach transformWithState

Mit dieser neuen API können zustandsbehaftete Operatoren jetzt nach transformWithState verkettet werden, auch wenn die Ereigniszeit als Zeitmodus verwendet wird. Durch die explizite Referenzierung von Ereigniszeitspalten im Ausgabeschema können nachgelagerte Operatoren die Filterung von späten Datensätzen und die Zustandsbereinigung nahtlos durchführen – wodurch komplexe Workarounds mit mehreren Pipelines und externem Speicher entfallen.

Vereinfachte Zustandsinitialisierung

Benutzer können Zustände aus vorhandenen Abfragen initialisieren, was das Neustarten oder Klonen von Streaming-Jobs erleichtert. Die API ermöglicht eine nahtlose Integration mit dem State Data Source Reader, sodass neue Abfragen zuvor geschriebene Zustände ohne komplexe Migrationsprozesse nutzen können.

Schema-Evolution für zustandsbehaftete Abfragen

transformWithState unterstützt die Schema-Evolution und ermöglicht Änderungen wie:

  • Hinzufügen oder Entfernen von Feldern
  • Neuanordnung von Feldern
  • Aktualisieren von Datentypen

Apache Spark erkennt und wendet automatisch kompatible Schema-Updates an, sodass Abfragen im selben Checkpoint-Verzeichnis weiter ausgeführt werden können. Dies macht vollständige Zustandsneuerstellungen und Neuverarbeitungen überflüssig und reduziert Ausfallzeiten und betriebliche Komplexität erheblich.

Native Integration mit dem State Data Source Reader

Für einfacheres Debugging und bessere Beobachtbarkeit ist transformWithState nativ in den State Data Source Reader integriert. Benutzer können Zustandsvariablen inspizieren und Zustandsdaten direkt abfragen, was die Fehlerbehebung und Analyse vereinfacht, einschließlich erweiterter Funktionen wie readChangeFeed usw.

Verfügbarkeit

Die transformWithState API ist ab der Databricks Runtime 16.2-Version für No-Isolation- und Unity Catalog Dedicated Clusters verfügbar. Unterstützung für Unity Catalog Standard Clusters und Serverless Compute wird bald hinzugefügt. Die API soll auch mit der Apache Spark™ 4.0-Version Open Source verfügbar sein.

Fazit

Wir glauben, dass all die Funktionsverbesserungen, die in der neuen transformWithState API enthalten sind, den Aufbau einer neuen Klasse zuverlässiger, skalierbarer und geschäftskritischer operativer Workloads ermöglichen werden, die die wichtigsten Anwendungsfälle für unsere Kunden und Benutzer unterstützen, und das alles bequem und einfach mit den Apache Spark DataFrame APIs. Wichtig ist, dass diese Änderungen auch die Grundlage für zukünftige Verbesserungen bestehender sowie neuer zustandsbehafteter Operatoren in Apache Spark Structured Streaming bilden. Wir freuen uns über die Fortschritte im State Management in Apache Spark™ Structured Streaming in den letzten Jahren und blicken den geplanten Entwicklungen in diesem Bereich in naher Zukunft mit Spannung entgegen.

Weitere Informationen zu zustandsbehafteter Stream-Verarbeitung und transformWithState auf Databricks finden Sie hier.

(Dieser Blogbeitrag wurde mit KI-gestützten Tools übersetzt.) Originalbeitrag

Verpassen Sie keinen Beitrag von Databricks

Abonnieren Sie unseren Blog und erhalten Sie die neuesten Beiträge direkt in Ihren Posteingang.