Le traitement de flux avec état (stateful stream processing) consiste à traiter un flux continu d'événements en temps réel tout en maintenant un état basé sur les événements observés jusqu'à présent. Cela permet au système de suivre les changements et les modèles au fil du temps dans le flux d'événements, et de prendre des décisions ou d'entreprendre des actions basées sur ces informations.
Le traitement de flux avec état dans Apache Spark Structured Streaming est pris en charge à l'aide d'opérateurs intégrés (tels que l'agrégation par fenêtre, la jointure de flux à flux, la suppression des doublons, etc.) pour une logique prédéfinie et à l'aide de flatMapGroupWithState ou mapGroupWithState pour une logique arbitraire. La logique arbitraire permet aux utilisateurs d'écrire leur code personnalisé de manipulation d'état dans leurs pipelines. Cependant, à mesure que l'adoption du streaming augmente dans l'entreprise, des applications de streaming plus complexes et sophistiquées exigent plusieurs fonctionnalités supplémentaires pour faciliter aux développeurs la création de pipelines de streaming avec état.
Afin de prendre en charge ces nouvelles applications de streaming avec état ou ces cas d'utilisation opérationnels en pleine croissance, la communauté Spark introduit un nouvel opérateur Spark appelé transformWithState. Cet opérateur permettra une modélisation de données flexible, des types composites, des minuteurs, le TTL, l'enchaînement d'opérateurs avec état après transformWithState, l'évolution de schéma, la réutilisation de l'état d'une requête différente et l'intégration avec une multitude d'autres fonctionnalités Databricks telles que Unity Catalog, Delta Live Tables et Spark Connect. En utilisant cet opérateur, les clients peuvent développer et exécuter leurs cas d'utilisation opérationnels critiques et complexes avec état de manière fiable et efficace sur la plateforme Databricks en utilisant des langages populaires tels que Scala, Java ou Python.
De nombreuses applications pilotées par les événements s'appuient sur l'exécution de calculs avec état pour déclencher des actions ou émettre des événements de sortie qui sont généralement écrits dans un autre journal d'événements/bus de messages tel qu'Apache Kafka/Apache Pulsar/Google Pub-Sub, etc. Ces applications implémentent généralement une machine d'état qui valide les règles, détecte les anomalies, suit les sessions, etc., et génère les résultats dérivés, qui sont généralement utilisés pour déclencher des actions sur les systèmes en aval, en se basant sur :
Parmi les exemples de telles applications, citons le Suivi de l'expérience utilisateur, la Détection d'anomalies, la Surveillance des processus métier et les Arbres de décision.
Apache Spark introduit désormais transformWithState, un opérateur de traitement avec état de nouvelle génération conçu pour rendre la création d'applications de streaming complexes en temps réel plus flexible, efficace et évolutive. Cette nouvelle API débloque des capacités avancées pour la gestion de l'état, le traitement des événements, la gestion des minuteurs et l'évolution des schémas, permettant aux utilisateurs d'implémenter facilement une logique de streaming sophistiquée.
Nous introduisons une nouvelle approche d'API en couches, flexible et extensible pour résoudre les limitations susmentionnées. Un diagramme d'architecture de haut niveau de l'architecture en couches et des fonctionnalités associées à différents niveaux est présenté ci-dessous.

Comme le montre la figure, nous continuons d'utiliser les backends d'état disponibles aujourd'hui. Actuellement, Apache Spark prend en charge deux backends de magasin d'état :
Le nouvel opérateur transformWithState sera initialement pris en charge uniquement avec le fournisseur de magasin d'état RocksDB. Nous utilisons diverses fonctionnalités de RocksDB concernant les scans de plages, les opérateurs de fusion, etc., pour garantir des performances optimales pour les différentes fonctionnalités utilisées dans transformWithState. Au-dessus de cette couche, nous construisons une autre couche d'abstraction qui utilise le StatefulProcessorHandle pour travailler avec des types composites, des minuteurs, des métadonnées de requête, etc. Au niveau de l'opérateur, nous permettons l'utilisation d'un StatefulProcessor qui peut intégrer la logique de l'application utilisée pour fournir ces puissantes applications de streaming. Enfin, vous pouvez utiliser le StatefulProcessor dans les requêtes Apache Spark basées sur les API DataFrame.
Voici un exemple de requête de streaming Apache Spark utilisant l'opérateur transformWithState :
Avec transformWithState, les utilisateurs peuvent désormais définir plusieurs variables d'état indépendantes au sein d'un StatefulProcessor basé sur le modèle de programmation orientée objet. Ces variables fonctionnent comme des membres de classe privés, permettant une gestion granulaire de l'état sans nécessiter une structure d'état monolithique. Cela facilite l'évolution de la logique de l'application au fil du temps en ajoutant ou en modifiant des variables d'état sans redémarrer les requêtes à partir d'un nouveau répertoire de points de contrôle.
Les utilisateurs peuvent désormais enregistrer des minuteurs pour déclencher une logique d'application pilotée par les événements. L'API prend en charge les minuteurs basés sur le temps de traitement (horloge murale) et le temps d'événement (basé sur une colonne). Lorsqu'un minuteur se déclenche, un rappel est émis, permettant un traitement efficace des événements, des mises à jour de l'état et la génération de sorties. La capacité de lister, d'enregistrer et de supprimer des minuteurs garantit un contrôle précis sur le traitement des événements.
La gestion de l'état est désormais plus intuitive avec la prise en charge intégrée des structures de données composites :
Spark encode et persiste automatiquement ces types d'état, réduisant ainsi le besoin de sérialisation manuelle et améliorant les performances.
Pour la conformité et l'efficacité opérationnelle, transformWithState introduit la prise en charge native du délai d'expiration (TTL) pour les variables d'état. Cela permet aux utilisateurs de définir des politiques d'expiration, garantissant que les anciennes données d'état sont automatiquement supprimées sans nécessiter de nettoyage manuel.
Avec cette nouvelle API, les opérateurs avec état peuvent désormais être enchaînés après transformWithState, même lors de l'utilisation du temps d'événement comme mode temporel. En référençant explicitement les colonnes de temps d'événement dans le schéma de sortie, les opérateurs en aval peuvent effectuer le filtrage des enregistrements tardifs et l'éviction de l'état de manière transparente, éliminant ainsi le besoin de solutions de contournement complexes impliquant plusieurs pipelines et un stockage externe.
Les utilisateurs peuvent initialiser l'état à partir de requêtes existantes, ce qui facilite le redémarrage ou le clonage des tâches de streaming. L'API permet une intégration transparente avec le lecteur de source de données d'état, permettant aux nouvelles requêtes de tirer parti de l'état précédemment écrit sans processus de migration complexes.
transformWithState prend en charge l'évolution de schéma, permettant des modifications telles que :
Apache Spark détecte et applique automatiquement les mises à jour de schéma compatibles, garantissant que les requêtes peuvent continuer à s'exécuter dans le même répertoire de points de contrôle. Cela élimine le besoin de reconstructions complètes de l'état et de réexécutions, réduisant considérablement les temps d'arrêt et la complexité opérationnelle.
Pour faciliter le débogage et l'observabilité, transformWithState est intégré nativement au lecteur de source de données d'état. Les utilisateurs peuvent inspecter les variables d'état et interroger les données d'état directement, ce qui simplifie le dépannage et l'analyse, y compris les fonctionnalités avancées telles que readChangeFeed, etc.
L'API transformWithState est disponible dès maintenant avec la version 16.2 de Databricks Runtime sur les clusters dédiés No-Isolation et Unity Catalog. La prise en charge des clusters standard Unity Catalog et du calcul serverless sera ajoutée prochainement. L'API sera également disponible en open-source avec la version 4.0 d'Apache Spark™.
Nous pensons que toutes les améliorations de fonctionnalités intégrées dans la nouvelle API transformWithState permettront de construire une nouvelle classe de charges de travail opérationnelles fiables, évolutives et critiques, alimentant les cas d'utilisation les plus importants pour nos clients et utilisateurs, le tout dans le confort et la facilité d'utilisation des API DataFrame d'Apache Spark. Il est important de noter que ces changements jettent également les bases des futures améliorations des opérateurs d'état intégrés ainsi que des nouveaux opérateurs d'état dans Apache Spark Structured Streaming. Nous sommes enthousiasmés par les améliorations de la gestion d'état dans Apache Spark™ Structured Streaming au cours des dernières années et attendons avec impatience les développements prévus sur la feuille de route dans ce domaine dans un avenir proche.
Vous pouvez en savoir plus sur le traitement de flux stateful et transformWithState sur Databricks ici.
(Cet article de blog a été traduit à l'aide d'outils basés sur l'intelligence artificielle) Article original
