Dimensional modeling is a time-tested approach to building analytics-ready data warehouses. While many organizations are shifting to modern platforms like Databricks, these foundational techniques still apply.
In Part 1, we designed our dimensional schema. In Part 2, we built ETL pipelines for dimension tables. Now in Part 3, we implement the ETL logic for fact tables, emphasizing efficiency and integrity.
In the first blog, we defined the fact table, FactInternetSales, as shown below. Compared to our dimension tables, the fact table is relatively narrow in terms of record length, with only foreign key references to our dimension tables, our fact measures, our degenerate dimension fields and a single metadata field present:
NOTE: In the example below, we’ve altered the CREATE TABLE statement from our first post to include the foreign key definitions instead of defining these in separate ALTER TABLE statements. We’ve also included a primary key constraint on the degenerate dimension fields to be more explicit about their role more explicit in this fact table.
The table definition is fairly straightforward, but it’s worth taking a moment to discuss the LastModifiedDateTime metadata field. While fact tables are relatively narrow in terms of field count, they tend to be very deep in terms of row count. Fact tables often house millions, if not billions, of records, often derived from high-volume operational activities. Instead of attempting to reload the table with a full extract on each ETL cycle, we will typically limit our efforts to new records and those that have been changed.
Depending on the source system and its underlying infrastructure, there are many ways to identify which operational records need to be extracted with a given ETL cycle. Change data capture (CDC) capabilities implemented on the operational side are the most reliable mechanisms. But when these are unavailable, we often fall back to timestamps recorded with each transaction record as it is created and modified. The approach is not bulletproof for change detection, but as any experienced ETL developer will attest, it’s often the best we’ve got.
NOTE: The introduction of Lakeflow Connect provides an interesting option for performing change data capture on relational databases. This capability is in preview at the time of the writing of this blog. Still, as the capability matures to expand more and more RDBMSs, we expect this to provide an effective and efficient mechanism for incremental extracts.
In our fact table, the LastModifiedDateTime field captures such a timestamp value recorded in the operational system. Before extracting data from our operational system, we will review the fact table to identify the latest value for this field we’ve recorded. That value will be the starting point for our incremental (aka delta) extract.
The high-level workflow for our fact ETL will proceed as follows:
Um diesen Workflow leichter verständlich zu machen, beschreiben wir seine wichtigsten Phasen in den folgenden Abschnitten. Im Gegensatz zum Beitrag über Dimension-ETL implementieren wir die Logik für diesen Workflow mithilfe einer Kombination aus SQL und Python, je nachdem, welche Sprache die einzelnen Schritte am einfachsten umsetzbar macht. Einer der Stärken der Databricks Platform ist die Unterstützung mehrerer Sprachen. Anstatt dies als eine Alles-oder-Nichts-Entscheidung am Anfang einer Implementierung darzustellen, zeigen wir, wie Data Engineers innerhalb einer einzigen Implementierung schnell zwischen den beiden wechseln können.
Die ersten beiden Schritte unseres Workflows konzentrieren sich auf die Extraktion neuer und neu aktualisierter Informationen aus unserem operativen System. Im ersten Schritt führen wir eine einfache Abfrage des zuletzt aufgezeichneten Werts für LastModifiedDateTime. durch. Wenn die Faktentabelle leer ist, wie es bei der Initialisierung sein sollte, definieren wir einen Standardwert, der weit genug in der Vergangenheit liegt, dass wir glauben, alle relevanten Daten im Quellsystem zu erfassen:
Wir können nun die erforderlichen Daten aus unserem operativen System mithilfe dieses Werts extrahieren. Obwohl diese Abfrage recht detailliert ist, konzentrieren Sie Ihre Aufmerksamkeit auf die WHERE-Klausel, in der wir den zuletzt beobachteten Zeitstempelwert aus dem vorherigen Schritt verwenden, um die einzelnen Zeilenelemente abzurufen, die neu oder geändert sind (oder mit neuen oder geänderten Verkaufsaufträgen verknüpft sind):
Wie zuvor werden die extrahierten Daten in einer Tabelle in unserem Staging-Schema gespeichert, die nur für unsere Data Engineers zugänglich ist, bevor wir zu den nachfolgenden Schritten im Workflow übergehen. Wenn wir zusätzliche Datenbereinigungen durchführen müssen, sollten wir dies jetzt tun.
Die typische Reihenfolge in einem Data Warehouse ETL-Zyklus besteht darin, zuerst unsere Dimension-ETL-Workflows und kurz danach unsere Fakt-Workflows auszuführen. Durch die Organisation unserer Prozesse auf diese Weise können wir besser sicherstellen, dass alle Informationen, die zur Verknüpfung unserer Fakt-Datensätze mit Dimensionsdaten erforderlich sind, vorhanden sind. Es gibt jedoch ein schmales Zeitfenster, in dem neue, dimensionsorientierte Daten eintreffen und von einem faktisch relevanten Transaktionsdatensatz erfasst werden. Dieses Fenster vergrößert sich, wenn es bei einem Fehler im gesamten ETL-Zyklus zu Verzögerungen bei der Faktendatenextraktion kommt. Und natürlich kann es immer zu referenziellen Fehlern in Quellsystemen kommen, die fragwürdige Daten in einem Transaktionsdatensatz zulassen.
Um uns vor diesem Problem zu schützen, fügen wir in eine gegebene Dimensionstabelle alle Business-Schlüsselwerte ein, die in unseren gestagten Faktendaten vorhanden sind, aber nicht in der Menge der aktuellen (nicht abgelaufenen) Datensätze für diese Dimension. Dieser Ansatz erstellt einen Datensatz mit einem Business-Schlüssel (natürlicher Schlüssel) und einem Surrogatschlüssel, auf den unsere Faktentabelle verweisen kann. Diese Datensätze werden als spät ankommend gekennzeichnet, wenn die Ziel-Dimension ein Typ-2 SCD ist, damit wir sie im nächsten ETL-Zyklus entsprechend aktualisieren können.
Um uns den Einstieg zu erleichtern, stellen wir eine Liste der wichtigsten Geschäftsfelder in unseren Staging-Daten zusammen. Hier nutzen wir strenge Namenskonventionen, die es uns ermöglichen, diese Felder dynamisch zu identifizieren:
HINWEIS: Wir wechseln für die folgenden Codebeispiele zu Python. Databricks unterstützt die Verwendung mehrerer Sprachen, auch innerhalb desselben Workflows. In diesem Beispiel bietet Python etwas mehr Flexibilität und orientiert sich dennoch an SQL-Konzepten, wodurch dieser Ansatz für traditionellere SQL-Entwickler zugänglich wird.
Beachten Sie, dass wir unsere Datumsschlüssel von den anderen Business-Schlüsseln getrennt haben. Wir werden uns später wieder damit befassen, aber konzentrieren wir uns zunächst auf die Nicht-Datums-Schlüssel (andere Schlüssel) in dieser Tabelle.
Für jeden Nicht-Datums-Business-Schlüssel können wir unsere Feld- und Tabellennamenkonventionen verwenden, um die Dimensionstabelle zu identifizieren, die diesen Schlüssel enthalten sollte, und dann einen Left-Semi Join (ähnlich einem NOT IN()-Vergleich, aber unterstützt auch Multi-Column-Matching, falls erforderlich) verwenden, um alle Werte für diese Spalte in der Staging-Tabelle zu identifizieren, die nicht in der Dimensionstabelle vorhanden sind. Wenn wir einen nicht übereinstimmenden Wert finden, fügen wir ihn einfach in die Dimensionstabelle ein, mit der entsprechenden Einstellung für das Feld IsLateArriving:
Diese Logik würde für unsere Datumsdimensionsreferenzen gut funktionieren, wenn wir sicherstellen wollten, dass unsere Faktendatensätze mit gültigen Einträgen verknüpft sind. Viele nachgelagerte BI-Systeme implementieren jedoch Logik, die erfordert, dass die Datumdimension eine kontinuierliche, ununterbrochene Reihe von Daten zwischen den frühesten und spätesten aufgezeichneten Werten enthält. Sollten wir auf ein Datum vor oder nach dem Wertebereich in der Tabelle stoßen, müssen wir nicht nur das fehlende Element eingeben, sondern auch die zusätzlichen Werte erstellen, die erforderlich sind, um einen ununterbrochenen Bereich zu erhalten. Aus diesem Grund benötigen wir für spät ankommende Daten eine etwas andere Logik:
Wenn Sie nicht viel mit Databricks oder Spark SQL gearbeitet haben, ist die Abfrage im Kern dieses letzten Schritts wahrscheinlich fremd. Die sequence() Funktion erstellt eine Sequenz von Werten basierend auf einem angegebenen Start- und Endpunkt. Das Ergebnis ist ein Array, das wir dann mit explode() (explodieren) können, sodass jedes Element im Array eine Zeile in einem Ergebnis-Set bildet. Von dort vergleichen wir einfach den erforderlichen Bereich mit dem, was in der Dimensionstabelle vorhanden ist, um zu identifizieren, welche Elemente eingefügt werden müssen. Mit dieser Einfügung stellen wir sicher, dass wir einen Surrogate Key-Wert in dieser Dimension als Smart Key implementiert haben, damit unsere Fakt-Datensätze etwas zum Referenzieren haben.
Jetzt, da wir sicher sein können, dass alle Business Keys in unserer Staging-Tabelle mit Einträgen in ihren entsprechenden Dimensionen übereinstimmen, können wir mit der Veröffentlichung in die Fakt-Tabelle fortfahren.
Der erste Schritt in diesem Prozess besteht darin, die Fremdschlüsselwerte für diese Business Keys nachzuschlagen. Dies kann als Teil eines einzigen Veröffentlichungsschritts erfolgen, aber die große Anzahl von Joins in der Abfrage macht diesen Ansatz oft schwer zu warten. Aus diesem Grund könnten wir den weniger effizienten, aber leichter verständlichen und modifizierbaren Ansatz wählen, Fremdschlüsselwerte einzeln nachzuschlagen und diese Werte an unsere Staging-Tabelle anzuhängen:
Auch hier nutzen wir Namenskonventionen, um diese Logik einfacher zu implementieren. Da unsere Datum-Dimension eine Rollenspiel-Dimension ist und daher einer variableren Namenskonvention folgt, implementieren wir für diese Business Keys eine etwas andere Logik.
An diesem Punkt enthält unsere Staging-Tabelle Business Keys und Surrogate Key-Werte zusammen mit unseren Measures, Degenerate Dimension-Feldern und dem LastModifiedDate Wert, der aus unserem Quellsystem extrahiert wurde. Um die Veröffentlichung besser handhaben zu können, sollten wir die verfügbaren Felder an die des Fakt-Tabellen-Supports anpassen. Dazu müssen wir die Business Keys löschen:
HINWEIS: Der source DataFrame ist im vorherigen Codeblock definiert.
Mit den angepassten Feldern ist der Veröffentlichungsschritt unkompliziert. Wir gleichen unsere eingehenden Datensätze mit denen in der Fakt-Tabelle ab, basierend auf den Degenerate Dimension-Feldern, die als eindeutiger Identifikator für unsere Fakt-Datensätze dienen, und aktualisieren oder fügen dann Werte nach Bedarf ein:
Wir hoffen, dass diese Blog-Serie für diejenigen, die dimensionale Modelle auf der Databricks Plattform erstellen möchten, informativ war. Wir gehen davon aus, dass viele, die mit diesem Datenmodellierungsansatz und den damit verbundenen ETL-Workflows vertraut sind, Databricks vertraut, zugänglich und in der Lage finden werden, etablierte Muster mit minimalen Änderungen im Vergleich zu dem, was auf RDBMS-Plattformen implementiert worden sein mag, zu unterstützen. Wo Änderungen auftreten, wie z. B. die Möglichkeit, Workflow-Logik mithilfe einer Kombination aus Python und SQL zu implementieren, hoffen wir, dass Daten-Ingenieure feststellen werden, dass dies ihre Arbeit im Laufe der Zeit einfacher zu implementieren und zu unterstützen macht.
Um mehr über Databricks SQL zu erfahren, besuchen Sie unsere Website oder lesen Sie die Dokumentation. Sie können auch die Produkt-Tour für Databricks SQL ansehen. Wenn Sie Ihr bestehendes Warehouse in ein Hochleistungs-Data-Warehouse ohne Server mit großartiger Benutzererfahrung und niedrigeren Gesamtkosten migrieren möchten, ist Databricks SQL die Lösung — probieren Sie es kostenlos aus.
(Dieser Blogbeitrag wurde mit KI-gestützten Tools übersetzt.) Originalbeitrag
