Passa al contenuto principale

Implementazione di un data warehouse dimensionale con Databricks SQL, parte 3

Costruzione dei Flussi di Lavoro ETL dei Fatti

dimensional data modeling pt 3

Pubblicato: 27 maggio 2025

Soluzioni12 min di lettura

Summary

  • Estratti Delta: Implementazione di estratti dati incrementali dai sistemi operativi utilizzando timestamp per identificare record nuovi o aggiornati.
  • Membri in Arrivo Tardivo: Gestione dei dati dimensionali in arrivo tardivo inserendo i record mancanti per garantire l'integrità referenziale con le tabelle dei fatti.
  • Pubblicazione Tabelle dei Fatti: Pubblicazione dei dati nelle tabelle dei fatti abbinando chiavi di business con chiavi surrogate dalle tabelle delle dimensioni.

La modellazione dimensionale è un approccio collaudato per la creazione di data warehouse pronti per l'analisi. Sebbene molte organizzazioni si stiano spostando verso piattaforme moderne come Databricks, queste tecniche fondamentali sono ancora valide.

Nella Parte 1, abbiamo progettato il nostro schema dimensionale. Nella Parte 2, abbiamo creato pipeline ETL per le tabelle delle dimensioni. Ora, nella Parte 3, implementiamo la logica ETL per le tabelle dei fatti, enfatizzando efficienza e integrità.

Tabelle dei fatti ed estratti delta

Nel primo articolo, abbiamo definito la tabella dei fatti, FactInternetSales, come mostrato di seguito. Rispetto alle nostre tabelle delle dimensioni, la tabella dei fatti è relativamente stretta in termini di lunghezza dei record, con solo riferimenti di chiavi esterne alle nostre tabelle delle dimensioni, le nostre misure dei fatti, i campi delle dimensioni degenerate e un singolo campo di metadati presente:

NOTA: Nell'esempio seguente, abbiamo modificato l'istruzione CREATE TABLE dal nostro primo articolo per includere le definizioni delle chiavi esterne invece di definirle in istruzioni ALTER TABLE separate. Abbiamo anche incluso un vincolo di chiave primaria sui campi delle dimensioni degenerate per essere più espliciti sul loro ruolo in questa tabella dei fatti.

La definizione della tabella è abbastanza semplice, ma vale la pena soffermarsi un attimo sul campo di metadati LastModifiedDateTime. Mentre le tabelle dei fatti sono relativamente strette in termini di numero di campi, tendono ad essere molto profonde in termini di numero di righe. Le tabelle dei fatti spesso contengono milioni, se non miliardi, di record, spesso derivati da attività operative ad alto volume. Invece di tentare di ricaricare la tabella con un estratto completo ad ogni ciclo ETL, di solito limiteremo i nostri sforzi ai nuovi record e a quelli che sono stati modificati.

A seconda del sistema sorgente e della sua infrastruttura sottostante, ci sono molti modi per identificare quali record operativi devono essere estratti con un dato ciclo ETL. Le capacità di change data capture (CDC) implementate sul lato operativo sono i meccanismi più affidabili. Ma quando questi non sono disponibili, spesso ci affidiamo ai timestamp registrati con ogni record di transazione al momento della sua creazione e modifica. Questo approccio non è infallibile per il rilevamento delle modifiche, ma come ogni sviluppatore ETL esperto confermerà, è spesso il meglio che abbiamo.

NOTA: L'introduzione di Lakeflow Connect offre un'opzione interessante per eseguire il change data capture su database relazionali. Questa funzionalità è in anteprima al momento della stesura di questo articolo. Tuttavia, man mano che la funzionalità matura per espandersi a sempre più RDBMS, ci aspettiamo che fornisca un meccanismo efficace ed efficiente per gli estratti incrementali.

Nella nostra tabella dei fatti, il campo LastModifiedDateTime cattura tale valore di timestamp registrato nel sistema operativo. Prima di estrarre i dati dal nostro sistema operativo, esamineremo la tabella dei fatti per identificare il valore più recente di questo campo che abbiamo registrato. Quel valore sarà il punto di partenza per il nostro estratto incrementale (noto anche come delta).

GUIDA

La tua guida compatta all'analitica moderna

Il flusso di lavoro ETL dei fatti

Il flusso di lavoro di alto livello per il nostro ETL dei fatti procederà come segue:

  1. Recupera il valore più recente di LastModifiedDateTime dalla nostra tabella dei fatti.
  2. Estrai i dati transazionali pertinenti dal sistema sorgente con timestamp uguali o successivi al valore più recente di LastModifiedDateTime.
  3. Esegui eventuali passaggi aggiuntivi di pulizia dei dati richiesti sui dati estratti.
  4. Pubblica eventuali valori di membri arrivati in ritardo nelle dimensioni associate.
  5. Esegui il lookup dei valori delle chiavi esterne dalle dimensioni associate.
  6. Pubblica i dati nella tabella dei fatti.

Per rendere questo flusso di lavoro più facile da capire, descriveremo le sue fasi chiave nelle sezioni seguenti. A differenza del post sull'ETL dimensionale, implementeremo la nostra logica per questo flusso di lavoro utilizzando una combinazione di SQL e Python, in base a quale linguaggio rende ogni passaggio più semplice da implementare. Ancora una volta, uno dei punti di forza della Databricks Platform è il suo supporto per più linguaggi. Invece di presentarlo come una scelta obbligata presa all'inizio di un'implementazione, mostreremo come gli ingegneri dei dati possono passare rapidamente da uno all'altro all'interno di una singola implementazione.

Passaggi 1-3: Fase di estrazione Delta

I primi due passaggi del nostro flusso di lavoro si concentrano sull'estrazione di informazioni nuove e recentemente aggiornate dal nostro sistema operativo. Nel primo passaggio, eseguiamo una semplice ricerca del valore registrato più recente per LastModifiedDateTime. Se la tabella dei fatti è vuota, come dovrebbe essere all'inizializzazione, definiamo un valore predefinito sufficientemente lontano nel tempo da catturare tutti i dati pertinenti nel sistema di origine:

Ora possiamo estrarre i dati richiesti dal nostro sistema operativo utilizzando quel valore. Sebbene questa query includa molti dettagli, concentra la tua attenzione sulla clausola WHERE, dove utilizziamo il valore del timestamp osservato più di recente dal passaggio precedente per recuperare gli elementi di riga individuali che sono nuovi o modificati (o associati a ordini di vendita nuovi o modificati):

Come prima, i dati estratti vengono persistiti in una tabella nel nostro schema di staging, accessibile solo ai nostri ingegneri dei dati, prima di procedere ai passaggi successivi del flusso di lavoro. Se dobbiamo eseguire ulteriori pulizie dei dati, dovremmo farlo ora.

Fase 4: Fase dei membri in arrivo tardivo

La sequenza tipica in un ciclo ETL di data warehouse consiste nell'eseguire i nostri flussi di lavoro ETL dimensionali e poi i nostri flussi di lavoro di fatto poco dopo. Organizzando i nostri processi in questo modo, possiamo garantire meglio che tutte le informazioni necessarie per collegare i nostri record di fatto ai dati dimensionali siano disponibili. Tuttavia, esiste una stretta finestra temporale entro la quale arrivano nuovi dati orientati alla dimensione e vengono acquisiti da un record transazionale rilevante per il fatto. Tale finestra si allarga se si verifica un errore nel ciclo ETL complessivo che ritarda l'estrazione dei dati di fatto. E, naturalmente, ci possono sempre essere errori referenziali nei sistemi di origine che consentono a dati discutibili di apparire in un record transazionale.

Per proteggerci da questo problema, inseriremo in una data tabella dimensionale tutti i valori delle chiavi di business trovati nei nostri dati di fatto in staging ma non nell'insieme dei record correnti (non scaduti) per quella dimensione. Questo approccio creerà un record con una chiave di business (naturale) e una chiave surrogata a cui la tabella dei fatti può fare riferimento. Questi record verranno contrassegnati come in arrivo tardivo se la dimensione di destinazione è una SCD di Tipo 2, in modo da poterli aggiornare in modo appropriato nel ciclo ETL successivo.

Per iniziare, compileremo un elenco dei campi chiave di business nei nostri dati di staging. Qui, stiamo sfruttando rigide convenzioni di denominazione che ci consentono di identificare questi campi dinamicamente:

NOTA: Stiamo passando a Python per i seguenti esempi di codice. Databricks supporta l'uso di più linguaggi, anche all'interno dello stesso flusso di lavoro. In questo esempio, Python ci offre un po' più di flessibilità pur rimanendo allineato ai concetti SQL, rendendo questo approccio accessibile a sviluppatori SQL più tradizionali.

Notare che abbiamo separato le nostre chiavi di data dalle altre chiavi di business. Ci torneremo tra un po', ma per ora concentriamoci sulle chiavi non di data (altre) in questa tabella.

Per ogni chiave di business non di data, possiamo utilizzare le nostre convenzioni di denominazione dei campi e delle tabelle per identificare la tabella dimensionale che dovrebbe contenere tale chiave ed eseguire quindi un left-semi join (simile a un confronto NOT IN() ma supportando il confronto multi-colonna se necessario) per identificare eventuali valori per quella colonna nella tabella di staging ma non nella tabella dimensionale. Quando troviamo un valore non corrispondente, lo inseriamo semplicemente nella tabella dimensionale con l'impostazione appropriata per il campo IsLateArriving:

Questa logica funzionerebbe bene per i nostri riferimenti dimensionali di data se volessimo garantire che i nostri record di fatto si collegassero a voci valide. Tuttavia, molti sistemi BI downstream implementano logiche che richiedono che la dimensione data contenga una serie continua e ininterrotta di date tra i valori più bassi e più alti registrati. Qualora incontrassimo una data prima o dopo l'intervallo di valori nella tabella, non solo dovremmo inserire il membro mancante, ma creare i valori aggiuntivi necessari per preservare un intervallo ininterrotto. Per questo motivo, abbiamo bisogno di una logica leggermente diversa per eventuali date in arrivo tardivo:

Se non hai molta esperienza con Databricks o Spark SQL, la query al centro di quest'ultimo passaggio è probabilmente estranea.  La funzione sequence() crea una sequenza di valori basata su un inizio e una fine specificati.  Il risultato è un array che possiamo poi espandere (usando la funzione explode()) in modo che ogni elemento dell'array formi una riga in un set di risultati.  Da lì, confrontiamo semplicemente l'intervallo richiesto con ciò che è presente nella tabella delle dimensioni per identificare quali elementi devono essere inseriti. Con quell'inserimento, ci assicuriamo di avere una chiave surrogata implementata in questa dimensione come smart key in modo che i nostri record di fatto abbiano qualcosa a cui fare riferimento. 

Passaggi 5 - 6: Fase di pubblicazione dei dati

Ora che possiamo essere sicuri che tutte le chiavi di business nella nostra tabella di staging possano essere associate a record nelle loro dimensioni corrispondenti, possiamo procedere alla pubblicazione nella tabella dei fatti.

Il primo passo in questo processo è cercare i valori delle chiavi esterne per queste chiavi di business.  Questo può essere fatto come parte di un unico passaggio di pubblicazione, ma l'elevato numero di join nella query spesso rende questo approccio difficile da mantenere. Per questo motivo, potremmo adottare l'approccio meno efficiente ma più facile da comprendere e modificare di cercare i valori delle chiavi esterne una chiave di business alla volta e aggiungere tali valori alla nostra tabella di staging:

Ancora una volta, stiamo sfruttando le convenzioni di denominazione per rendere questa logica più semplice da implementare.  Poiché la nostra dimensione data è una dimensione role-playing e quindi segue una convenzione di denominazione più variabile, implementiamo una logica leggermente diversa per quelle chiavi di business.

A questo punto, la nostra tabella di staging contiene chiavi di business e valori di chiavi surrogate insieme alle nostre misure, campi di dimensione degeneri e il valore LastModifiedDate estratto dal nostro sistema sorgente. Per rendere la pubblicazione più gestibile, dovremmo allineare i campi disponibili con quelli supportati dalla tabella dei fatti.  Per fare ciò, dobbiamo eliminare le chiavi di business:

NOTA: Il dataframe source è definito nel blocco di codice precedente.

Con i campi allineati, il passaggio di pubblicazione è semplice. Abbiniamo i nostri record in arrivo a quelli nella tabella dei fatti in base ai campi di dimensione degeneri, che fungono da identificatore univoco per i nostri record di fatti, e quindi aggiorniamo o inseriamo i valori secondo necessità:

Prossimi passi

Speriamo che questa serie di blog sia stata informativa per coloro che cercano di costruire modelli dimensionali sulla Databricks Platform.  Ci aspettiamo che molti esperti di questo approccio di modellazione dei dati e dei flussi di lavoro ETL associati troveranno Databricks familiare, accessibile e in grado di supportare pattern consolidati con modifiche minime rispetto a quanto potrebbe essere stato implementato su piattaforme RDBMS. Laddove emergono modifiche, come la capacità di implementare la logica del flusso di lavoro utilizzando una combinazione di Python e SQL, speriamo che gli ingegneri dei dati trovino che ciò renda il loro lavoro più semplice da implementare e supportare nel tempo.

Per saperne di più su Databricks SQL, visita il nostro sito web o leggi la documentazione. Puoi anche dare un'occhiata al tour del prodotto per Databricks SQL. Supponiamo che tu voglia migrare il tuo data warehouse esistente a un data warehouse serverless ad alte prestazioni con un'ottima esperienza utente e un costo totale inferiore. In tal caso, Databricks SQL è la soluzione — provalo gratuitamente.

(Questo post sul blog è stato tradotto utilizzando strumenti basati sull'intelligenza artificiale) Post originale

Non perdere mai un post di Databricks

Iscriviti al nostro blog e ricevi gli ultimi post direttamente nella tua casella di posta elettronica.