Revenir au contenu principal

Plongée dans les fonctionnalités : Watermarking dans Apache Spark Structured Streaming

blog OG

Publié: 22 août 2022

Produit13 min de lecture

Points clés à retenir

  • Les watermarks aident Spark à comprendre la progression du traitement en fonction du temps d'événement, quand produire des agrégats fenêtrés et quand supprimer l'état des agrégations
  • Lors de la jointure de flux de données, Spark utilise par défaut un watermark global unique qui supprime l'état en fonction du temps d'événement minimum observé sur les flux d'entrée
  • RocksDB peut être utilisé pour réduire la pression sur la mémoire du cluster et les pauses GC
  • Les objets StreamingQueryProgress et StateOperatorProgress contiennent des informations clés sur la façon dont les watermarks affectent votre flux

Introduction

Lors de la construction de pipelines en temps réel, une des réalités avec lesquelles les équipes doivent composer est que l'ingestion de données distribuées est intrinsèquement désordonnée. De plus, dans le contexte des opérations de streaming avec état, les équipes doivent être capables de suivre correctement la progression du temps d'événement dans le flux de données qu'elles ingèrent pour le calcul approprié des agrégations par fenêtre de temps et d'autres opérations avec état. Nous pouvons résoudre tout cela en utilisant Structured Streaming.

Par exemple, supposons que nous soyons une équipe travaillant à la construction d'un pipeline pour aider notre entreprise à effectuer la maintenance proactive de nos machines minières que nous louons à nos clients. Ces machines doivent toujours fonctionner dans des conditions optimales, nous les surveillons donc en temps réel. Nous devrons effectuer des agrégations avec état sur les données de streaming pour comprendre et identifier les problèmes des machines.

C'est là que nous devons exploiter Structured Streaming et le Watermarking pour produire les agrégations avec état nécessaires qui aideront à éclairer les décisions concernant la maintenance prédictive et plus encore pour ces machines.

GUIDE

Votre guide compact de l'analytique moderne

Qu'est-ce que le Watermarking ?

D'une manière générale, lorsque l'on travaille avec des données de streaming en temps réel, il y aura des retards entre le temps d'événement et le temps de traitement en raison de la manière dont les données sont ingérées et si l'application globale rencontre des problèmes tels que des temps d'arrêt. En raison de ces retards variables potentiels, le moteur que vous utilisez pour traiter ces données doit disposer d'un mécanisme pour décider quand fermer les fenêtres d'agrégation et produire le résultat agrégé.

Bien que l'inclinaison naturelle pour remédier à ces problèmes puisse être d'utiliser un délai fixe basé sur l'heure de l'horloge murale, nous montrerons dans cet exemple à venir pourquoi ce n'est pas la meilleure solution.

Pour expliquer cela visuellement, prenons un scénario où nous recevons des données à différents moments, d'environ 10h50 à 11h20. Nous créons des fenêtres de 10 minutes qui calculent la moyenne des lectures de température et de pression qui sont arrivées pendant la période fenêtrée.

Dans cette première image, nous avons les fenêtres de déclenchement à 11h00, 11h10 et 11h20, ce qui conduit aux tables de résultats montrées aux heures respectives. Lorsque le deuxième lot de données arrive vers 11h10 avec des données dont le temps d'événement est de 10h53, cela est incorporé dans les moyennes de température et de pression calculées pour la fenêtre de 11h00 à 11h10 qui se ferme à 11h10, ce qui ne donne pas le bon résultat.

Représentation visuelle d'un pipeline Structured Streaming ingérant des lots de données de température et de pression

Pour garantir que nous obtenons les bons résultats pour les agrégats que nous souhaitons produire, nous devons définir un watermark qui permettra à Spark de comprendre quand fermer la fenêtre d'agrégation et produire le résultat agrégé correct.

Dans les applications Structured Streaming, nous pouvons nous assurer que toutes les données pertinentes pour les agrégations que nous voulons calculer sont collectées en utilisant une fonctionnalité appelée watermarking. Au sens le plus basique, en définissant un watermark, Spark Structured Streaming sait quand il a ingéré toutes les données jusqu'à un certain temps, T, (en fonction d'une attente de latence définie) afin qu'il puisse fermer et produire les agrégats fenêtrés jusqu'au timestamp T.

Cette deuxième image montre l'effet de la mise en œuvre d'un watermark de 10 minutes et de l'utilisation du mode Append dans Spark Structured Streaming.

Représentation visuelle de l'effet d'un watermark de 10 minutes appliqué au pipeline Structured Streaming.

Contrairement au premier scénario où Spark émet l'agrégation fenêtrée des dix minutes précédentes toutes les dix minutes (c'est-à-dire qu'il émet la fenêtre de 11h00 à 11h10 à 11h10), Spark attend maintenant pour fermer et sortir l'agrégation fenêtrée une fois que le temps d'événement maximum vu moins le watermark spécifié est supérieur à la borne supérieure de la fenêtre.

En d'autres termes, Spark a dû attendre de voir des points de données où le temps d'événement maximum vu moins 10 minutes était supérieur à 11h00 pour émettre la fenêtre d'agrégation de 10h50 à 11h00. À 11h00, il ne le voit pas, il initialise donc seulement le calcul agrégé dans le magasin d'état interne de Spark. À 11h10, cette condition n'est toujours pas remplie, mais nous avons un nouveau point de données pour 10h53, donc l'état interne est mis à jour, mais pas émis. Ensuite, finalement, à 11h20, Spark a vu un point de données avec un temps d'événement de 11h15 et comme 11h15 moins 10 minutes est 11h05, ce qui est postérieur à 11h00, la fenêtre de 10h50 à 11h00 peut être émise dans la table de résultats.

Cela produit le résultat correct en incorporant correctement les données en fonction de la latence attendue définie par le watermark. Une fois les résultats émis, l'état correspondant est supprimé du magasin d'état.

Intégration du Watermarking dans vos Pipelines

Pour comprendre comment intégrer ces watermarks dans nos pipelines Structured Streaming, nous allons explorer ce scénario en parcourant un exemple de code réel basé sur notre cas d'utilisation énoncé dans la section d'introduction de ce blog.

Disons que nous ingérons toutes nos données de capteurs à partir d'un cluster Kafka dans le cloud et que nous voulons calculer les moyennes de température et de pression toutes les dix minutes avec un décalage temporel attendu de dix minutes. Le pipeline Structured Streaming avec watermarking ressemblerait à ceci :

PySpark

Ici, nous lisons simplement depuis Kafka, appliquons nos transformations et agrégations, puis écrivons dans des tables Delta Lake qui seront visualisées et surveillées dans Databricks SQL. La sortie écrite dans la table pour un échantillon de données particulier ressemblerait à ceci :

Sortie de la requête de streaming définie dans l'exemple de code PySpark ci-dessus

Pour intégrer le watermarking, nous avons d'abord dû identifier deux éléments :

  1. La colonne qui représente le temps d'événement de la lecture du capteur
  2. Le décalage temporel attendu estimé des données

Tiré de l'exemple précédent, nous pouvons voir le watermark défini par la méthode .withWatermark() avec la colonne eventTimestamp utilisée comme colonne de temps d'événement et 10 minutes pour représenter le décalage temporel attendu.

PySpark

Maintenant que nous savons comment implémenter les watermarks dans notre pipeline Structured Streaming, il sera important de comprendre comment d'autres éléments comme les opérations de jointure de streaming et la gestion de l'état sont affectés par les watermarks. De plus, à mesure que nous faisons évoluer nos pipelines, il y aura des métriques clés dont nos ingénieurs de données devront être conscients et qu'ils devront surveiller pour éviter les problèmes de performance. Nous explorerons tout cela en approfondissant le watermarking.

Watermarks dans différents modes de sortie

Avant d'approfondir, il est important de comprendre comment votre choix de mode de sortie affecte le comportement des watermarks que vous définissez.

Les watermarks ne peuvent être utilisés que lorsque vous exécutez votre application de streaming en modes de sortie append ou update. Il existe un troisième mode de sortie, le mode complete, dans lequel toute la table de résultats est écrite dans le stockage. Ce mode ne peut pas être utilisé car il nécessite que toutes les données agrégées soient conservées, et ne peut donc pas utiliser le watermarking pour supprimer l'état intermédiaire.

Dans le contexte de l'agrégation par fenêtre et des watermarks, ces modes de sortie impliquent que dans le mode « append », un agrégat ne peut être produit qu'une seule fois et ne peut pas être mis à jour. Par conséquent, une fois l'agrégat produit, le moteur peut supprimer son état et ainsi maintenir l'état d'agrégation global borné. Les enregistrements tardifs – ceux pour lesquels l'heuristique approximative du watermark ne s'est pas appliquée (ils étaient plus anciens que le délai du watermark) – doivent donc être abandonnés par nécessité – l'agrégat a été produit et l'état de l'agrégat supprimé.

Inversement, pour le mode « update », l'agrégat peut être produit de manière répétée à partir du premier enregistrement et à chaque enregistrement reçu, donc un watermark est facultatif. Le watermark est seulement utile pour élaguer l'état une fois que le moteur sait de manière heuristique qu'aucun autre enregistrement pour cet agrégat ne sera reçu. Une fois l'état supprimé, là encore, les enregistrements tardifs doivent être abandonnés car la valeur agrégée a été perdue et ne peut pas être mise à jour.

Il est important de comprendre comment l'état, les enregistrements arrivant tardivement et les différents modes de sortie peuvent entraîner des comportements différents de votre application fonctionnant sur Spark. Le principal point à retenir ici est que dans les modes append et update, une fois que le watermark indique que toutes les données ont été reçues pour une fenêtre de temps d'agrégation, le moteur peut élaguer l'état de la fenêtre. En mode append, l'agrégat est produit uniquement à la fermeture de la fenêtre de temps plus le délai du watermark, tandis qu'en mode update, il est produit à chaque mise à jour de la fenêtre.

Enfin, en augmentant votre fenêtre de délai de watermark, vous ferez attendre le pipeline plus longtemps pour les données et potentiellement abandonnerez moins de données – une précision plus élevée, mais aussi une latence plus élevée pour produire les agrégats. À l'inverse, un délai de watermark plus court entraîne une précision plus faible, mais aussi une latence plus faible pour produire les agrégats.

Délai de fenêtre Précision Latence
Fenêtre de délai plus longue Précision plus élevée Latence plus élevée
Fenêtre de délai plus courte Précision plus faible Latence plus faible

Plongée plus profonde dans les Watermarks

Jointures et Watermarks

Il y a quelques considérations à connaître lors de l'exécution d'opérations de jointure dans vos applications de streaming, en particulier lors de la jointure de deux flux. Disons, pour notre cas d'utilisation, que nous voulons joindre le jeu de données en flux de relevés de température et de pression avec des valeurs supplémentaires capturées par d'autres capteurs sur les machines.

Il existe trois types généraux de jointures de flux à flux qui peuvent être implémentés dans Structured Streaming : les jointures internes, externes et semi-jointures. Le principal problème avec les jointures dans les applications de streaming est que vous pouvez avoir une image incomplète d'un côté de la jointure. Donner à Spark une compréhension du moment où il n'y aura plus de correspondances futures est similaire au problème précédent avec les agrégations où Spark devait comprendre quand il n'y avait plus de nouvelles lignes à incorporer dans le calcul de l'agrégation avant de l'émettre.

Pour permettre à Spark de gérer cela, nous pouvons exploiter une combinaison de watermarks et de contraintes de temps d'événement dans la condition de jointure de la jointure de flux à flux. Cette combinaison permet à Spark de filtrer les enregistrements tardifs et d'élaguer l'état pour l'opération de jointure via une condition de plage de temps sur la jointure. Nous démontrons cela dans l'exemple ci-dessous :

PySpark

Cependant, contrairement à l'exemple ci-dessus, il arrivera que chaque flux nécessite des décalages temporels différents pour ses watermarks. Dans ce scénario, Spark a une politique pour gérer plusieurs définitions de watermark. Spark maintient un watermark global basé sur le flux le plus lent pour garantir une sécurité maximale afin de ne pas manquer de données.

Les développeurs ont la possibilité de modifier ce comportement en changeant spark.sql.streaming.multipleWatermarkPolicy en max; cependant, cela signifie que les données du flux le plus lent seront abandonnées.

Pour voir la gamme complète des opérations de jointure qui nécessitent ou pourraient bénéficier des watermarks, consultez cette section de la documentation de Spark.

Surveillance et gestion des flux avec les Watermarks

Lors de la gestion d'une requête de streaming où Spark peut avoir besoin de gérer des millions de clés et de conserver l'état pour chacune d'elles, le magasin d'état par défaut fourni avec les clusters Databricks peut ne pas être efficace. Vous pourriez commencer à observer une utilisation accrue de la mémoire, puis des pauses de garbage collection plus longues. Ces deux éléments entraveront les performances et la scalabilité de votre application Structured Streaming.

C'est là qu'intervient RocksDB. Vous pouvez exploiter RocksDB nativement dans Databricks en l'activant comme suit dans la configuration Spark :

Cela permettra au cluster exécutant l'application Structured Streaming d'utiliser RocksDB, qui peut gérer plus efficacement l'état dans la mémoire native et tirer parti du disque local/SSD au lieu de conserver tout l'état en mémoire.

Au-delà du suivi de l'utilisation de la mémoire et des métriques de garbage collection, il existe d'autres indicateurs et métriques clés qui doivent être collectés et suivis lors de la gestion des watermarks et de Structured Streaming. Pour accéder à ces métriques, vous pouvez consulter les objets StreamingQueryProgress et StateOperatorProgress. Consultez notre documentation pour des exemples sur la façon de les utiliser ici.

Dans l'objet StreamingQueryProgress, il existe une méthode appelée « eventTime » qui peut être appelée et qui renverra les horodatages max, min, avg et watermark. Les trois premiers sont les temps d'événement max, min et moyen vus dans ce déclencheur. Le dernier est le watermark utilisé dans le déclencheur.

Exemple abrégé d'un objet StreamingQueryProgress

Ces informations peuvent être utilisées pour rapprocher les données dans les tables de résultats que vos requêtes de streaming génèrent et également pour vérifier que le watermark utilisé est bien l'horodatage eventTime prévu. Cela peut devenir important lorsque vous joignez des flux de données.

Dans l'objet StateOperatorProgress, il y a la métrique numRowsDroppedByWatermark. Cette métrique indique combien de lignes sont considérées comme trop tardives pour être incluses dans l'agrégation avec état. Notez que cette métrique mesure les lignes abandonnées *après agrégation* et non les lignes d'entrée brutes, donc le nombre n'est pas précis mais peut indiquer que des données tardives sont abandonnées. Ceci, en conjonction avec les informations de l'objet StreamingQueryProgress, peut aider les développeurs à déterminer si les watermarks sont correctement configurés.

Agrégations multiples, streaming et watermarks

Une limitation restante des requêtes Structured Streaming est l'enchaînement de plusieurs opérateurs stateful (par exemple, agrégations, jointures en continu) dans une seule requête en continu. Cette limitation d'un filigrane global singulier pour les agrégations stateful est quelque chose pour lequel nous, chez Databricks, travaillons sur une solution et publierons plus d'informations dans les mois à venir. Consultez notre blog sur Project Lightspeed pour en savoir plus : Project Lightspeed : Traitement de flux plus rapide et plus simple avec Apache Spark (databricks.com).

Conclusion

Avec Structured Streaming et Watermarking sur Databricks, les organisations, comme celle dont le cas d'utilisation est décrit ci-dessus, peuvent créer des applications résilientes en temps réel qui garantissent que les métriques pilotées par des agrégations en temps réel sont calculées avec précision, même si les données ne sont pas correctement ordonnées ou ne sont pas à l'heure. Pour en savoir plus sur la façon dont vous pouvez créer des applications en temps réel avec Databricks, contactez votre représentant Databricks.

(Cet article de blog a été traduit à l'aide d'outils basés sur l'intelligence artificielle) Article original

Ne manquez jamais un article Databricks

Abonnez-vous à notre blog et recevez les derniers articles dans votre boîte mail.