Passa al contenuto principale

Fornire dati convenienti in tempo reale con dbt e Databricks

Delivering cost-effective data in real-time with dbt and Databricks

Pubblicato: 12 dicembre 2023

Soluzioni7 min di lettura

Man mano che le aziende crescono, i volumi di dati passano da GB a TB (o più) e le richieste di latenza passano da ore a minuti (o meno), diventa sempre più costoso fornire insight aggiornati al business. Storicamente, gli ingegneri di dati Python e Scala si sono rivolti allo streaming per soddisfare queste esigenze, elaborando in modo efficiente nuovi dati in tempo reale, ma gli ingegneri di analisi che necessitavano di scalare pipeline dbt basate su SQL non avevano questa opzione.

Non più! Questo blog cerca di illustrare come possiamo utilizzare le nuove tabelle in streaming e le viste materializzate su Databricks per fornire insight freschi e in tempo reale alle aziende con la semplicità di SQL e dbt.

Background

Al Data + AI Summit 2023, abbiamo introdotto Streaming Tables e Materialized Views in Databricks SQL. Questa fantastica funzionalità ha offerto agli utenti di Databricks SQL un facile accesso a nuove potenti materializzazioni di tabelle introdotte per la prima volta in Delta Live Tables, consentendo loro di incrementare grandi query, eseguire lo streaming direttamente da origini dati di eventi e altro ancora.

Oltre all'utilizzo nativo di Streaming Tables e Materialized Views all'interno di un ambiente Databricks, funzionano anche per gli utenti dbt su Databricks. dbt-databricks è diventato uno dei modi più popolari per costruire modelli di dati su Databricks, sfruttando tutte le potenti capacità di Databricks SQL, incluso il motore di calcolo Photon, i Data Warehouse SQL Serverless che scalano istantaneamente e il modello di governance Unity Catalog, con l'ubiquità del framework di trasformazione di dbt.

Cosa è cambiato in dbt-databricks?

A partire da dbt v1.6+, dbt-databricks si è evoluto in tre aspetti chiave:

  1. Nuove materializzazioni: "streaming_table" e "materialized_view"
  2. Nuova sintassi per leggere direttamente dallo storage dati cloud senza mettere in staging le origini come tabelle
  3. Accesso a concetti di streaming avanzati come aggregazioni con finestra, watermarking e join stream-stream

Nota: Tieni d'occhio l'imminente rilascio di dbt v1.7.3 che affinerà ulteriormente le capacità sopra menzionate!

Diamo un'occhiata a come possiamo utilizzare queste nuove funzionalità con la demo Airline Trips.

La demo Airline Trips

La demo Airline Trips è stata creata per dimostrare come ingerire e trasformare incrementalmente dati di eventi live per insight aziendali aggiornati su Databricks, sia per dashboard che per modelli di AI. Il set di dati rappresenta tutti i viaggi aerei effettuati negli Stati Uniti nel tempo, catturando i ritardi nelle partenze e negli arrivi per ogni viaggio.

Un notebook di supporto incluso stabilisce uno stream simulato da questo set di dati, mentre il progetto dbt mostra un modello di dati che prende questi eventi JSON grezzi e li trasforma tramite ETL in streaming in uno strato di Materialized Views, tabelle di feature e altro ancora.

Il repository è disponibile pubblicamente qui e utilizza dati di esempio inclusi in tutti gli spazi di lavoro Databricks "out-of-the-box". Sentiti libero di seguirci!

Il modello di dati dei viaggi aerei
Il modello di dati dei viaggi aerei
GUIDA

La tua guida compatta all'analitica moderna

Ingestione di dati dallo storage dati cloud

Uno dei modi più semplici per iniziare a utilizzare le tabelle in streaming è per l'ingestione di dati dallo storage dati cloud, come S3 per AWS o ADLS per Azure. Potresti avere una sorgente dati upstream che genera dati di eventi ad alto volume e un processo per farli atterrare come file grezzi in una posizione di storage, tipicamente json, csv, parquet o avro.

Nella nostra demo, immagina di ricevere un feed live di ogni viaggio aereo effettuato negli Stati Uniti da una parte esterna e di volerlo ingerire incrementalmente man mano che arriva.

Invece di mettere in staging i file come tabella esterna, o utilizzare uno strumento di terze parti per materializzare una tabella Delta per la sorgente dati, possiamo semplicemente usare le tabelle in streaming per risolvere questo problema. Prendi il modello seguente per il nostro feed di viaggi aerei bronze:

I due punti chiave da notare sono:

  • La strategia di materializzazione è impostata su 'streaming_table'
    • Questo eseguirà un comando CREATE OR REFRESH STREAMING TABLE in Databricks
  • La sintassi per leggere dallo storage cloud sfrutta Auto Loader sotto il cofano
    • read_files() elencherà i nuovi file json nella cartella specificata e inizierà a elaborarli. Poiché utilizziamo dbt, abbiamo sfruttato la funzione var() in dbt per passare dinamicamente un percorso di cartella s3 (della forma "s3://…")
    • La parola chiave STREAM indica di eseguire lo streaming da questa posizione. In alternativa, senza di essa possiamo ancora usare read_files() con materialized='table' per eseguire una lettura batch direttamente dalla cartella specificata

Per inciso, mentre Auto Loader richiede la minima configurazione, puoi anche eseguire lo streaming direttamente da una piattaforma di streaming di eventi come Kafka, Kinesis o Event Hubs per una latenza ancora inferiore utilizzando una sintassi molto simile. Vedi qui per maggiori dettagli.

Arricchimento incrementale dei dati per lo strato silver

Lo streaming non deve fermarsi allo stadio di ingestione. Se vogliamo eseguire alcune join downstream o aggiungere una chiave surrogata, ma vogliamo limitarlo solo ai nuovi dati per risparmiare sui costi di calcolo, possiamo continuare a utilizzare la materializzazione Streaming Table. Ad esempio, prendi lo snippet dal nostro prossimo modello per lo strato silver, il feed arricchito dei viaggi aerei, dove uniamo tabelle di mappatura per i codici aeroportuali ai dati grezzi:

Ancora una volta, abbiamo utilizzato la materializzazione Streaming Table e siamo stati in grado di sfruttare la funzionalità standard di dbt per tutta la nostra logica. Questo include:

  • Sfruttare il pacchetto dbt_utils per scorciatoie utili come la generazione di una chiave surrogata
  • Utilizzare l'istruzione ref() per mantenere la lineage completa

L'unica vera modifica al nostro SQL è stata l'aggiunta della parola chiave STREAM() attorno all'istruzione ref() per airline_trips_bronze, per indicare che questa tabella viene letta in modo incrementale, mentre la tabella airport_codes che viene unita è una tabella di mappatura letta per intero. Questo è chiamato un join stream-static.

Creazione di uno strato gold efficiente dal punto di vista computazionale con Materialized Views

Con le nostre tabelle silver arricchite pronte, possiamo ora pensare a come vogliamo servire insight aggregati ai nostri utenti aziendali finali. Tipicamente, se utilizzassimo una materializzazione di tabella, dovremmo ricalcolare tutti i risultati storici ogni volta.

Per sfruttare le tabelle in streaming upstream che elaborano solo nuovi dati in ogni esecuzione, ci rivolgiamo invece alle Viste Materializzate per questo compito!

La buona notizia in Databricks è che un modello che crea una Vista Materializzata non è diverso da un modello che crea una tabella! Prendiamo il nostro esempio per una Vista Materializzata dello strato gold per calcolare la percentuale di voli in ritardo ogni giorno:

Abbiamo cambiato solo la configurazione di materializzazione!

Ricorda, le Viste Materializzate possono essere aggiornate incrementalmente quando ci sono modifiche alle tabelle di base. Nello scenario sopra, mentre elaboriamo in streaming nuovi dati, la Vista Materializzata determina quali gruppi richiedono il ricalcolo ed elabora solo questi, lasciando invariate le aggregazioni esistenti e riducendo i costi computazionali complessivi. Questo è più facile da visualizzare nell'esempio poiché aggreghiamo per ArrDate, la data di arrivo dei voli, il che significa che i nuovi giorni di dati rientreranno naturalmente in nuovi gruppi e i gruppi esistenti rimarranno invariati.

Analizzando i log degli eventi della Vista Materializzata (illustrata di seguito) dopo diverse esecuzioni del modello, possiamo vedere la gradualità in azione. La prima esecuzione è un calcolo completo come qualsiasi tabella, ma una seconda esecuzione per aggiornare le aggregazioni con nuovi dati sfrutta un aggiornamento incrementale riga per riga. Un'ultima esecuzione del modello ha riconosciuto che nessun nuovo dato era stato ingerito upstream e semplicemente non ha fatto nulla.

Materialized view event log
Materialized view event log

Cos'altro posso trovare nel repository di demo?

Abbiamo coperto le basi per portare i dati direttamente dalla sorgente di eventi fino a una Materialized View pronta per la BI, ma il repository di demo contiene molto di più.

Nel repository sono inclusi esempi su come monitorare i log per Streaming Tables e Materialized Views per capire come vengono elaborati i dati, oltre a un esempio avanzato non trattato in questo blog su come unire due stream insieme in un join stream-stream usando solo SQL!

Clona il repository nel tuo ambiente Databricks per iniziare, oppure connetti dbt Cloud a Databricks senza costi aggiuntivi tramite partner connect. Puoi anche saperne di più con la documentazione per Materialized Views e Streaming Tables.

(Questo post sul blog è stato tradotto utilizzando strumenti basati sull'intelligenza artificiale) Post originale

Non perdere mai un post di Databricks

Iscriviti al nostro blog e ricevi gli ultimi post direttamente nella tua casella di posta elettronica.