Veröffentlicht: 24. September 2019
von Burak Yavuz, Brenner Heintz und Denny Lee
Probieren Sie diese Notebook-Serie in Databricks aus
Daten entwickeln und sammeln sich, wie unsere Erfahrungen, stetig an. Um Schritt zu halten, müssen unsere mentalen Modelle der Welt sich an neue Daten anpassen, von denen einige neue Dimensionen enthalten – neue Arten, Dinge zu sehen, von denen wir vorher keine Vorstellung hatten. Diese mentalen Modelle ähneln dem Schema einer Tabelle, das definiert, wie wir neue Informationen kategorisieren und verarbeiten.
Dies bringt uns zum Thema Schemamanagement. So wie sich Geschäftsprobleme und -anforderungen im Laufe der Zeit weiterentwickeln, so entwickelt sich auch die Struktur Ihrer Daten. Mit Delta Lake, ist die Einbeziehung neuer Dimensionen einfach, wenn sich die Daten ändern. Benutzer haben Zugriff auf einfache Semantik, um das Schema ihrer Tabellen zu steuern. Zu diesen Tools gehören die Schemaerzwingung, die verhindert, dass Benutzer ihre Tabellen versehentlich mit Fehlern oder ungültigen Daten verunreinigen, sowie die Schemaentwicklung, die es ihnen ermöglicht, automatisch neue Spalten mit umfangreichen Daten hinzuzufügen, wenn diese Spalten dazugehören. In diesem Blog werden wir uns mit der Verwendung dieser Tools befassen.
Jeder DataFrame in Apache Spark™ enthält ein Schema, einen Bauplan, der die Form der Daten definiert, wie z. B. Datentypen und Spalten, sowie Metadaten. Mit Delta Lake wird das Schema der Tabelle im JSON-Format innerhalb des Transaktionsprotokolls gespeichert.
Die Schemaerzwingung, auch bekannt als Schemavalidierung, ist eine Schutzmaßnahme in Delta Lake, die die Datenqualität sicherstellt, indem sie Schreibvorgänge in eine Tabelle ablehnt, die nicht mit dem Schema der Tabelle übereinstimmen. Wie der Empfangsmitarbeiter in einem belebten Restaurant, der nur Reservierungen annimmt, prüft er, ob jede Spalte in den in die Tabelle eingefügten Daten auf seiner Liste der erwarteten Spalten steht (mit anderen Worten, ob jede eine "Reservierung" hat), und lehnt alle Schreibvorgänge mit Spalten ab, die nicht auf der Liste stehen.
Delta Lake verwendet die Schemavalidierung beim Schreiben, was bedeutet, dass alle neuen Schreibvorgänge in eine Tabelle zum Zeitpunkt des Schreibens auf Kompatibilität mit dem Schema der Zieltabelle geprüft werden. Wenn das Schema nicht kompatibel ist, bricht Delta Lake die gesamte Transaktion ab (es werden keine Daten geschrieben) und löst eine Ausnahme aus, um den Benutzer über die Nichtübereinstimmung zu informieren.
Um festzustellen, ob ein Schreibvorgang in eine Tabelle kompatibel ist, verwendet Delta Lake die folgenden Regeln. Der zu schreibende DataFrame:
Um dies zu veranschaulichen, sehen Sie sich an, was im folgenden Code passiert, wenn versucht wird, einige neu berechnete Spalten an eine Delta Lake-Tabelle anzuhängen, die noch nicht für die Annahme eingerichtet ist.
Anstatt die neuen Spalten automatisch hinzuzufügen, erzwingt Delta Lake das Schema und verhindert, dass der Schreibvorgang stattfindet. Um zu ermitteln, welche Spalte(n) die Nichtübereinstimmung verursacht hat, gibt Spark beide Schemas im Stack-Trace zum Vergleich aus.
Da es sich um eine so strenge Prüfung handelt, ist die Schemaerzwingung ein hervorragendes Werkzeug, das als Gatekeeper für einen sauberen, vollständig transformierten Datensatz verwendet werden kann, der für die Produktion oder den Verbrauch bereit ist. Sie wird in der Regel für Tabellen erzwungen, die Folgendes direkt speisen:
Um ihre Daten für diese letzte Hürde vorzubereiten, verwenden viele Benutzer eine einfache "Multi-Hop"-Architektur, die ihren Tabellen schrittweise Struktur hinzufügt. Weitere Informationen finden Sie im Beitrag Productionizing Machine Learning With Delta Lake.
Natürlich kann die Schemaerzwingung überall in Ihrer Pipeline verwendet werden, aber seien Sie sich bewusst, dass es etwas frustrierend sein kann, wenn Ihr Streaming-Schreibvorgang in eine Tabelle fehlschlägt, weil Sie beispielsweise vergessen haben, dass Sie der eingehenden Daten eine einzelne Spalte hinzugefügt haben.
An diesem Punkt fragen Sie sich vielleicht, was soll der ganze Aufwand? Schließlich kann ein unerwarteter Fehler "Schema-Nichtübereinstimmung" Ihren Workflow durcheinander bringen, insbesondere wenn Sie neu bei Delta Lake sind. Warum nicht einfach das Schema so ändern lassen, wie es sein muss, damit ich meinen DataFrame schreiben kann, egal was passiert?
Wie das alte Sprichwort sagt: "Eine Unze Vorbeugung ist mehr wert als ein Pfund Heilung." Irgendwann, wenn Sie Ihr Schema nicht erzwingen, werden Probleme mit der Datenkompatibilität ihre hässlichen Köpfe erheben – scheinbar homogene Rohdatensourcen können Edge-Fälle, beschädigte Spalten, falsch formatierte Zuordnungen oder andere beängstigende Dinge enthalten, die in der Nacht passieren. Ein viel besserer Ansatz ist es, diese Feinde an den Toren aufzuhalten – mithilfe der Schemaerzwingung – und sich im Tageslicht mit ihnen auseinanderzusetzen, anstatt später, wenn sie in den schattigen Nischen Ihres Produktionscodes lauern.
Die Schemaerzwingung bietet Ihnen die Gewissheit, dass sich das Schema Ihrer Tabelle nur dann ändert, wenn Sie die positive Entscheidung treffen, es zu ändern. Sie verhindert die "Verwässerung" von Daten, die auftreten kann, wenn neue Spalten so häufig angehängt werden, dass ehemals umfangreiche, prägnante Tabellen aufgrund der Datenflut ihre Bedeutung und Nützlichkeit verlieren. Indem sie Sie ermutigt, absichtlich zu handeln, hohe Standards zu setzen und hohe Qualität zu erwarten, tut die Schemaerzwingung genau das, wofür sie entwickelt wurde – sie hält Sie ehrlich und Ihre Tabellen sauber.
Wenn Sie nach weiterer Überprüfung feststellen, dass Sie diese neue Spalte wirklich hinzufügen wollten, ist dies eine einfache Einzeilenkorrektur, wie unten beschrieben. Die Lösung ist die Schemaentwicklung!
Die Schemaentwicklung ist eine Funktion, mit der Benutzer das aktuelle Schema einer Tabelle einfach ändern können, um Daten aufzunehmen, die sich im Laufe der Zeit ändern. Am häufigsten wird sie bei der Durchführung eines Anfüge- oder Überschreibvorgangs verwendet, um das Schema automatisch an eine oder mehrere neue Spalten anzupassen.
Im Anschluss an das Beispiel aus dem vorherigen Abschnitt können Entwickler die Schemaentwicklung einfach verwenden, um die neuen Spalten hinzuzufügen, die zuvor aufgrund einer Schema-Nichtübereinstimmung abgelehnt wurden. Die Schemaentwicklung wird aktiviert, indem Sie Ihrem .write- oder .writeStream-Spark-Befehl .option('mergeSchema', 'true') hinzufügen.
Um das Diagramm anzuzeigen, führen Sie die folgende Spark SQL-Anweisung aus.
Alternativ können Sie diese Option für die gesamte Spark-Sitzung festlegen, indem Sie spark.databricks.delta.schema.autoMerge = True zu Ihrer Spark-Konfiguration hinzufügen. Seien Sie vorsichtig bei der Verwendung, da die Schemaerzwingung Sie nicht mehr vor unbeabsichtigten Schema-Nichtübereinstimmungen warnt.
Durch die Einbeziehung der Option mergeSchema in Ihre Abfrage werden alle Spalten, die im DataFrame, aber nicht in der Zieltabelle vorhanden sind, automatisch als Teil einer Schreibtransaktion am Ende des Schemas hinzugefügt. Verschachtelte Felder können ebenfalls hinzugefügt werden, und diese Felder werden auch am Ende ihrer jeweiligen Strukturspalten hinzugefügt.
Dateningenieure und -wissenschaftler können diese Option verwenden, um ihren bestehenden Produktions-Tabellen für maschinelles Lernen neue Spalten hinzuzufügen (vielleicht eine neu verfolgte Metrik oder eine Spalte mit den Umsatzzahlen dieses Monats), ohne bestehende Modelle zu beschädigen, die auf den alten Spalten basieren.
Die folgenden Arten von Schemaänderungen sind für die Schemaentwicklung während Tabellenanhängen oder -überschreibungen berechtigt:
Andere Änderungen, die nicht für die Schemaentwicklung berechtigt sind, erfordern, dass das Schema und die Daten durch Hinzufügen von .option("overwriteSchema", "true") überschrieben werden. Wenn beispielsweise die Spalte "Foo" ursprünglich ein integer-Datentyp war und das neue Schema ein String-Datentyp wäre, müssten alle Parquet-Dateien (Daten) neu geschrieben werden. Zu diesen Änderungen gehören:
Schließlich wird mit der kommenden Version von Spark 3.0 explizites DDL (mit ALTER TABLE) vollständig unterstützt, sodass Benutzer die folgenden Aktionen an Tabellenschemas durchführen können:
Die Schemaentwicklung kann immer dann verwendet werden, wenn Sie beabsichtigen, das Schema Ihrer Tabelle zu ändern (im Gegensatz zu dem Fall, in dem Sie versehentlich Spalten zu Ihrem DataFrame hinzugefügt haben, die dort nicht sein sollten). Sie ist der einfachste Weg, Ihr Schema zu migrieren, da sie automatisch die richtigen Spaltennamen und Datentypen hinzufügt, ohne dass Sie diese explizit deklarieren müssen.
Die Schemaerzwingung lehnt alle neuen Spalten oder andere Schemaänderungen ab, die nicht mit Ihrer Tabelle kompatibel sind. Durch das Festlegen und Aufrechterhalten dieser hohen Standards können Analysten und Ingenieure darauf vertrauen, dass ihre Daten ein Höchstmaß an Integrität aufweisen, und sie mit Klarheit begründen, sodass sie bessere Geschäftsentscheidungen treffen können.
Auf der anderen Seite der Medaille ergänzt die Schemaentwicklung die Erzwingung, indem sie es einfach macht, beabsichtigte Schemaänderungen automatisch vorzunehmen. Schließlich sollte es nicht schwer sein, eine Spalte hinzuzufügen.
Die Schemaerzwingung ist das Yin zur Schemaentwicklungs Yang. Wenn sie zusammen verwendet werden, machen diese Funktionen es einfacher denn je, den Lärm auszublenden und sich auf das Signal einzustellen.
Wir möchten uns auch bei Mukul Murthy und Pranav Anand für ihre Beiträge zu diesem Blog bedanken.
Artikel in dieser Serie:
Eintauchen in Delta Lake #1: Auspacken des Transaktionsprotokolls
Eintauchen in Delta Lake #2: Schemaerzwingung und -entwicklung
Eintauchen in Delta Lake #3: DML-Interna (Aktualisieren, Löschen, Zusammenführen)
Productionizing Machine Learning With Delta Lake
Was ist ein Data Lake?
(Dieser Blogbeitrag wurde mit KI-gestützten Tools übersetzt.) Originalbeitrag
