Revenir au contenu principal

Présentation de l'API DLT Sink : Écrire des pipelines vers Kafka et des tables Delta externes

Introducing the DLT Sink API: Write Pipelines to Kafka and External Delta Tables

Publié: 17 février 2025

Produit8 min de lecture

Summary

  • Intégration de l'état des données : les nouveaux récepteurs DLT permettent un flux de données transparent vers des systèmes externes tels que Kafka, Event Hubs et les tables Delta.
  • Configuration facile : l'API create_sink simplifie la configuration des pipelines en temps réel avec des options flexibles pour Kafka et Delta.
  • Cas d'utilisation en temps réel : des exemples montrent comment créer des pipelines pour l'analyse, la détection d'anomalies et les flux de travail pilotés par les événements.

Si vous débutez avec Delta Live Tables, avant de lire cet article, nous vous recommandons de lire Premiers pas avec Delta Live Tables, qui explique comment créer des pipelines évolutifs et fiables en utilisant les définitions et instructions ETL déclaratives de Delta Live Tables (DLT).

Introduction

Delta Live Tables (DLT) offre une plateforme robuste pour construire des pipelines de traitement de données fiables, maintenables et testables au sein de Databricks. En tirant parti de son cadre déclaratif et en provisionnant automatiquement une puissance de calcul serverless optimale, DLT simplifie les complexités du streaming, de la transformation et de la gestion des données, offrant ainsi évolutivité et efficacité pour les flux de travail de données modernes.

Traditionnellement, les pipelines DLT offraient un moyen efficace d'ingérer et de traiter les données sous forme de Tables de streaming ou de Vues matérialisées gérées par Unity Catalog. Bien que cette approche réponde à la plupart des besoins de traitement de données, il existe des cas où les pipelines de données doivent se connecter à des systèmes externes ou utiliser des récepteurs Structured Streaming au lieu d'écrire dans des tables de streaming ou des vues matérialisées.

L'introduction de la nouvelle API Sinks dans DLT répond à ce besoin en permettant aux utilisateurs d'écrire les données traitées vers des flux d'événements externes, tels qu'Apache Kafka, Azure Event Hubs, ainsi que d'écrire dans une table Delta. Cette nouvelle capacité élargit la portée des pipelines DLT, permettant une intégration transparente avec les plateformes externes.

Ces fonctionnalités sont maintenant en préversion publique et nous continuerons à ajouter d'autres sinks de Databricks Runtime à DLT au fil du temps, pour finalement les prendre en charge toutes. La prochaine que nous développons est foreachBatch, qui permet aux clients d'écrire vers des sinks de données arbitraires et d'effectuer des mises à jour personnalisées dans les tables Delta.

L'API Sink est disponible dans le package Python dlt et peut être utilisée avec create_sink() comme indiqué ci-dessous :

L'API accepte trois arguments clés pour définir le sink :

  • Nom du Sink : Une chaîne de caractères qui identifie de manière unique le sink dans votre pipeline. Ce nom vous permet de référencer et de gérer le sink.
  • Spécification du Format : Une chaîne de caractères qui détermine le format de sortie, avec la prise en charge de "kafka" ou "delta".
  • Options du Sink : Un dictionnaire de paires clé-valeur, où les clés et les valeurs sont des chaînes de caractères. Pour les sinks Kafka, toutes les options de configuration disponibles dans Structured Streaming peuvent être utilisées, y compris les paramètres d'authentification, les stratégies de partitionnement, etc. Veuillez vous référer aux docs pour une liste complète des options de configuration prises en charge par Kafka. Les sinks Delta offrent une configuration plus simple en vous permettant soit de définir un chemin de stockage à l'aide de l'attribut path, soit d'écrire directement dans une table de Unity Catalog à l'aide de l'attribut tableName.

Écrire dans un Sink

L'API @append_flow a été améliorée pour permettre l'écriture de données dans des sinks cibles identifiés par leurs noms. Traditionnellement, cette API permettait aux utilisateurs de charger des données de manière transparente à partir de plusieurs sources vers une seule table de streaming. Avec la nouvelle amélioration, les utilisateurs peuvent désormais ajouter des données à des sinks spécifiques. Voici un exemple montrant comment configurer cela :

Construction du pipeline

Construisons maintenant un pipeline DLT qui traite les données de clickstream, empaquetées dans les jeux de données Databricks. Ce pipeline analysera les données pour identifier les événements liés à une page Apache Spark, puis écrira ces données dans des sinks Event Hubs et Delta. Nous structurerons le pipeline en utilisant l'Architecture Medallion, qui organise les données en différentes couches pour améliorer la qualité et l'efficacité du traitement.

Nous commençons par charger les données JSON brutes dans la couche Bronze à l'aide d'Auto Loader. Ensuite, nous nettoyons les données et appliquons des normes de qualité dans la couche Silver pour garantir leur intégrité. Enfin, dans la couche Gold, nous filtrons les entrées dont le titre de page actuel est Apache_Spark et les stockons dans une table nommée spark_referrers, qui servira de source pour nos sinks. Veuillez vous référer à l'Annexe pour le code complet.

Configuration du Sink Azure Event Hubs

Dans cette section, nous utiliserons l'API create_sink pour établir un sink Event Hubs. Cela suppose que vous disposez d'un flux Kafka ou Event Hubs opérationnel. Notre pipeline diffusera des données vers des Event Hubs compatibles Kafka en utilisant une stratégie d'accès partagé, la chaîne de connexion étant stockée en toute sécurité dans Databricks Secrets. Alternativement, vous pouvez utiliser un principal de service pour l'intégration au lieu d'une stratégie SAS. Assurez-vous de mettre à jour les propriétés de connexion et les secrets en conséquence. Voici le code pour configurer le sink Event Hubs :

GUIDE

Votre guide compact de l'analytique moderne

Configuration du Sink Delta

En plus du sink Event Hubs, nous pouvons utiliser l'API create_sink pour configurer un sink Delta. Ce sink écrit les données dans un emplacement spécifié du Databricks File System (DBFS), mais il peut également être configuré pour écrire dans un emplacement de stockage objet tel qu'Amazon S3 ou ADLS.

Voici un exemple de configuration d'un sink Delta :

Création de flux pour alimenter les sinks Kafka et Delta

Les sinks Event Hubs et Delta étant établis, la prochaine étape consiste à les alimenter à l'aide du décorateur append_flow. Ce processus implique la diffusion en continu des données vers les sinks, garantissant qu'ils sont continuellement mis à jour avec les dernières informations.

Pour le sink Event Hubs, le paramètre value est obligatoire, tandis que des paramètres supplémentaires tels que key, partition, headers et topic peuvent être spécifiés de manière facultative. Vous trouverez ci-dessous des exemples de configuration de flux pour les sinks Kafka et Delta :

La fonction applyInPandasWithState est également désormais prise en charge dans DLT, permettant aux utilisateurs de tirer parti de la puissance de Pandas pour le traitement avec état au sein de leurs pipelines DLT. Cette amélioration permet des transformations et des agrégations de données plus complexes en utilisant l'API Pandas familière. Avec l'API DLT Sink, les utilisateurs peuvent facilement diffuser ces données traitées avec état vers des topics Kafka. Cette intégration est particulièrement utile pour l'analyse en temps réel et les architectures pilotées par les événements, garantissant que les pipelines de données peuvent gérer et distribuer efficacement les données en streaming vers des systèmes externes.

Rassembler le tout

L'approche démontrée ci-dessus montre comment construire un pipeline DLT qui transforme efficacement les données tout en utilisant la nouvelle API Sink pour livrer de manière transparente les résultats aux tables Delta externes et aux Event Hubs compatibles Kafka.

Cette fonctionnalité est particulièrement précieuse pour les pipelines d'analyse en temps réel, permettant de diffuser des données dans des flux Kafka pour des applications telles que la détection d'anomalies, la maintenance prédictive et d'autres cas d'utilisation sensibles au temps. Elle permet également les architectures pilotées par les événements, où les processus en aval peuvent être déclenchés instantanément par la diffusion d'événements vers des topics Kafka, permettant un traitement rapide des données nouvellement arrivées.

Appel à l'action

La fonctionnalité DLT Sinks est maintenant disponible en aperçu public pour tous les clients Databricks ! Cette nouvelle capacité puissante vous permet d'étendre de manière transparente vos pipelines DLT à des systèmes externes tels que Kafka et les tables Delta, garantissant un flux de données en temps réel et des intégrations simplifiées. Pour plus d'informations, veuillez consulter les ressources suivantes :

Annexe :

Code du pipeline :

(Cet article de blog a été traduit à l'aide d'outils basés sur l'intelligence artificielle) Article original

Ne manquez jamais un article Databricks

Abonnez-vous à notre blog et recevez les derniers articles dans votre boîte mail.