ステートフルストリーム処理とは、これまでに見たイベントに基づいて状態を維持しながら、連続するイベントストリームをリアルタイムで処理することを指します。これにより、システムはイベントストリームの時間経過に伴う変化とパターンを追跡し、この情報に基づいて決定を下したり行動を起こしたりすることが可能になります。
Apache Spark Structured Streamingにおけるステートフルなストリーム処理は、組み込みのオペレータ(ウィンドウ化集約、ストリーム-ストリーム結合、重複の削除など)を使用して事前定義されたロジックをサポートし、任意のロジックにはflatMapGroupWithStateまたはmapGroupWithStateを使用します。任意のロジックは、ユーザーがパイプライン内でカスタムの状態操作コードを書くことを可能にします。しかし、ストリーミングの採用が企業で増えるにつれて、より複雑で洗練されたストリーミングアプリケーションは、開発者がステートフルなストリーミングパイプラインを書くのを容易にするためのいくつかの追加機能を要求します。
これらの新しく、成長しているステートフルストリーミングアプリケーションや運用ユースケースをサポートするために、SparkコミュニティはtransformWithStateという新しいSparkオペレータを導入しています。このオペレータは、柔軟なデータモデリング、複合型、タイマー、TTL、transformWithStateの後にステートフルオペレータをチェーン化、スキーマ進化、別のクエリからのステートの再利用、Unity Catalog、Delta Live Tables、Spark Connectなどの他のDatabricks機能との統合を可能にします。このオペレータを使用することで、顧客はScala、Java、Pythonなどの人気のある言語を使用して、Databricksプラットフォーム上で信頼性と効率性を持ってミッションクリティカルな複雑なステートフルな運用ユースケースを開発し、実行することができます。
多くのイベント駆動型アプリケーションは、アクションをトリガーするためや、通常はApache Kafka/Apache Pulsar/Google Pub-Subなどの別のイベントログ/メッセージバスに書き込まれる出力イベントを発行するために、ステートフルな計算を行うことに依存しています。これらのアプリケーションは通常、ルールを検証し、異常を検出し、セッションを追跡するなどの状態マシンを実装し、派生結果を生成します。これらの結果は通常、以下に基づいて下流システムでのアクションをトリガーするために使用されます:
そのようなアプリケーションの例には、ユーザーエクスペリエンストラッキング、異常検出、ビジネスプロセスモニタリング、および決定木が含まれます。
Apache Sparkは現在、transformWithStateという次世代のステートフル処理オペレータを導入しています。これは、複雑なリアルタイムストリーミングアプリケーションの構築をより柔軟で効率的でスケーラブルにするために設計されています。この新しいAPIは、状態管理、イベント処理、タイマー管理、スキーマ進化のための高度な機能を解放し、ユーザーが洗練されたストリーミングロジックを簡単に実装できるようにします。
我々は、上記の制限を解決するために、新たなレイヤー化された柔軟で拡張可能なAPIアプローチを導入しています。レイヤー化アーキテクチャと各レイヤーでの関連機能の高レベルのアーキテクチャ図を以下に示します。
図に示すように、私たちは今日利用可能なステートバックエンドを引き続き使用しています。現在、Apache Sparkは2つの状態ストアバックエンドをサポートしています:
新しいtransformWithStateオペレータは、最初はRocksDBステートストアプロバイダーでのみサポートされます。我々は、transformWithState内で使用されるさまざまな機能の最適なパフォーマンスを確保するために、範囲スキャン、マージオペレーターなどのさまざまなRocksDB機能を利用します。このレイヤーの上に、StatefulProcessorHandleを使用して複合型、タイマー、クエリメタデータなどを操作する別の抽象化レイヤーを構築します。オペレーターレベルでは、これらの強力なストリーミングアプリケーションを提供するために使用されるアプリケーションロジックを組み込むことができるStatefulProcessorの使用を可能にします。最終的には、DataFrame APIに基づいたApache Sparkクエリ内でStatefulProcessorを使用することができます。
以下は、transformWithStateオペレータを使用したApache Sparkストリーミングクエリの例です:
transformWithStateを使用すると、ユーザーはオブジェクト指向プログラミングモデルに基づいてStatefulProcessor内に複数の独立したstate variables を定義することができます。これらの変数はプライベートクラスメンバーのように機能し、モノリシックな状態構造を必要とせずに細かい状態管理を可能にします。これにより、新しいチェックポイントディレクトリからのクエリを再起動することなく、ステート変数を追加または変更してアプリケーションロジックを時間とともに進化させることが容易になります。
ユーザーはタイマーを登録してイベント駆動型のアプリケーションロジックをトリガーすることができます。APIは処理時間(壁時計ベース)とイベント時間(列ベース)のタイマーを両方サポートしています。タイマーが発火すると、コールバックが発行され、効率的なイベント処理、状態更新、および出力生成が可能になります。タイマーをリスト化、登録、削除する事ができ、イベント処理の精密な制御を保証します。
組み込みのサポートにより、複合データ構造に対する状態管理がより直感的になりました:
Sparkは自動的にこれらのステートタイプをエンコードし、永続化します。これにより、手動でのシリアライゼーションの必要性が減少し、パフォーマンスが向上します。
コンプライアンスと運用効率のために、transformWithStateはネイティブのtime-to-live (TTL)サポートを導入します。これにより、ユーザーは有効期限ポリシーを定義し、古いステートデータが自動的に削除されることを保証し、手動でのクリーンアップを必要としません。
この新しいAPIを使用すると、イベント時間を時間モードとして使用している場合でも、ステートフルオペレータをtransformWithStateの後にチェーン化することができます。明示的に出力スキーマのイベント時間列を参照することで、ダウンストリームのオペレータは遅延レコードのフィルタリングとステートの排出をシームレスに行うことができ、複数のパイプラインと外部ストレージを含む複雑な回避策の必要性を排除します。
ユーザーは既存のクエリから状態を初期化することができ、ストリーミングジョブの再開やクローン作成を容易にします。APIは状態データソースリーダーとのシームレスな統合を可能にし、新しいクエリが複雑な移行プロセスなしに以前に書かれた状態を活用できるようにします。
transformWithStateはスキーマ進化をサポートし、以下のような変更を可能にします:
Apache Sparkは自動的に互換性のあるスキーマの更新を検出し適用し、クエリが同じチェックポイントディレクトリ内で実行し続けることができるようにします。これにより、完全な状態の再構築と再処理の必要性がなくなり、ダウンタイムと運用の複雑さが大幅に削減されます。
より簡単なデバッグと観察のために、transformWithStateは状態データソースリーダーとネイティブに統合されています。ユーザーは状態変数を検査し、状態データを直接クエリすることで、トラブルシューティングと分析を効率化できます。これには、readChangeFeedなどの高度な機能が含まれます。
transformWithState APIは、現在Databricks Runtime 16.2のNo-IsolationとUnity Catalog Dedicated Clustersで利用可能です。Unity Catalog Standard ClustersとServerless Computeへのサポートは近日中に対応予定です。また、このAPIはApache Spark™ 4.0リリースでオープンソースで利用可能になる予定です。
新しいtransformWithState APIに詰め込まれたすべての機能改善が、お客様とユーザーの最も重要なユースケースを支える信頼性の高い、スケーラブルな、ミッションクリティカルな運用ワークロードの新しいクラスを構築することを可能にすると信じています。これらはすべて、Apache Spark DataFrame APIsの快適さと使いやすさの中で実現されます。さらに重要なのは、これらの変更が Apache Spark Structured Streaming に組み込まれたものだけでなく、新しいステートフルオペレーターの将来的な機能強化の基盤となることです。私たちは、過去数年間にApache Spark™ Structured Streamingのステート管理の改善に興奮しており、近い将来に予定されているこの分野のロードマップ開発を楽しみにしています。
ステートフルストリーム処理とDatabricks上のtransformWithStateについてはこちらで詳しく読むことができます。