<em>Erhalten Sie eine frühe Vorschau von </em><a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=operationsmetricsdl-blog-oreillydlupandrunning" target="_blank"><em>O'Reillys neuem E-Book</em></a><em> für die Schritt-für-Schritt-Anleitung, die Sie für die Verwendung von Delta Lake benötigen.</em></p><blockquote><p>Probieren Sie dieses <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb" rel="noopener noreferrer" target="_blank">Notebook</a> aus, um die unten beschriebenen Schritte zu reproduzieren</p></blockquote><p>Wir haben kürzlich die Veröffentlichung von <a href="https://github.com/delta-io/delta/releases/tag/v0.6.0">Delta Lake 0.6.0</a> angekündigt, das Schema-Evolution und Leistungsverbesserungen bei Merge- und operativen Metriken in der Tabellenhistorie einführt. Die Hauptfunktionen dieser Version sind:</p><ul><li>Unterstützung für Schema-Evolution bei Merge-Operationen (<a href="https://github.com/delta-io/delta/issues/170">#170</a>) - Sie können jetzt das Schema der Tabelle mit der Merge-Operation automatisch weiterentwickeln. Dies ist nützlich in Szenarien, in denen Sie Änderungsdaten in eine Tabelle einfügen möchten und sich das Schema der Daten im Laufe der Zeit ändert. Anstatt Schemaänderungen vor dem Einfügen zu erkennen und anzuwenden, kann Merge gleichzeitig das Schema weiterentwickeln und die Änderungen einfügen.</li><li><strong>Verbesserte Merge-Leistung mit automatischer Repartitionierung</strong> (<a href="https://github.com/delta-io/delta/issues/349">#349</a>) - Beim Zusammenführen in partitionierte Tabellen können Sie die Daten vor dem Schreiben in die Tabelle automatisch nach den Partitionierungsspalten neu partitionieren. In Fällen, in denen die Merge-Operation auf einer partitionierten Tabelle langsam ist, da sie zu viele kleine Dateien generiert (<a href="https://github.com/delta-io/delta/issues/345">#345</a>), kann die Aktivierung der automatischen Repartitionierung (spark.delta.merge.repartitionBeforeWrite) die Leistung verbessern.</li><li><strong>Verbesserte Leistung, wenn keine INSERT-Klausel vorhanden ist</strong> (<a href="https://github.com/delta-io/delta/issues/342">#342</a>) - Sie können jetzt eine bessere Leistung bei einer Merge-Operation erzielen, wenn diese keine INSERT-Klausel enthält.</li><li><strong>Operative Metriken in DESCRIBE HISTORY</strong> (<a href="https://github.com/delta-io/delta/issues/312">#312</a>) - Sie können jetzt operative Metriken (z. B. Anzahl der geänderten Dateien und Zeilen) für alle Schreib-, Update- und Löschvorgänge in einer Delta-Tabelle in der Tabellenhistorie sehen.</li><li><strong>Unterstützung für das Lesen von Delta-Tabellen von jedem Dateisystem</strong> (<a href="https://github.com/delta-io/delta/issues/347">#347</a>) - Sie können jetzt Delta-Tabellen auf jedem Speichersystem mit einer Hadoop FileSystem-Implementierung lesen. Das Schreiben in Delta-Tabellen erfordert jedoch weiterhin die Konfiguration einer LogStore-Implementierung, die die erforderlichen Garantien für das Speichersystem bietet.</li></ul><h2>Schema-Evolution bei Merge-Operationen</h2><p>Wie in früheren Versionen von Delta Lake erwähnt, bietet Delta Lake die Möglichkeit, <a href="https://www.databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html" rel="noopener noreferrer" target="_blank">Merge-Operationen auszuführen</a>, um Ihre <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">Insert/Update/Delete-Operationen in einer einzigen atomaren Operation</a> zu vereinfachen und die Möglichkeit, <a href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" rel="noopener noreferrer" target="_blank">Ihr Schema zu erzwingen und weiterzuentwickeln</a> (weitere Details finden Sie auch in diesem <a href="https://www.youtube.com/watch?v=tjb10n5wVs8">Tech Talk</a>). Mit der Veröffentlichung von Delta Lake 0.6.0 können Sie jetzt Ihr Schema innerhalb einer Merge-Operation weiterentwickeln.</p><p>Lassen Sie uns dies anhand eines aktuellen Beispiels veranschaulichen. Den ursprünglichen Codeausschnitt finden Sie in <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb">diesem Notebook</a>. Wir beginnen mit einer kleinen Teilmenge des <a href="https://github.com/CSSEGISandData/COVID-19">2019 Novel Coronavirus COVID-19 (2019-nCoV) Data Repository von Johns Hopkins CSSE</a> Datensatzes, den wir in <a href="https://docs.databricks.com/data/databricks-datasets.html">/databricks-datasets</a> verfügbar gemacht haben. Dies ist ein Datensatz, der häufig von Forschern und Analysten verwendet wird, um Einblicke in die Anzahl der COVID-19-Fälle weltweit zu gewinnen. Eines der Probleme mit den Daten ist, dass sich das Schema im Laufe der Zeit ändert.</p><p>Zum Beispiel haben die Dateien, die COVID-19-Fälle vom 1. März bis 21. März darstellen (Stand 30. April 2020), folgendes Schema:</p><pre># Import old_data old_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../03-21-2020.csv)) old_data.printSchema() root |-- Province/State: string (nullable = true) |-- Country/Region: string (nullable = true) |-- Last Update: timestamp (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Latitude: double (nullable = true) |-- Longitude: double (nullable = true) </pre><p>Aber die Dateien ab dem 22. März (Stand 30. April) enthielten zusätzliche Spalten, darunter FIPS, Admin2, Active und Combined_Key.</p><pre>new_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../04-21-2020.csv)) new_data.printSchema() root |-- FIPS: integer (nullable = true) |-- Admin2: string (nullable = true) |-- Province_State: string (nullable = true) |-- Country_Region: string (nullable = true) |-- Last_Update: string (nullable = true) |-- Lat: double (nullable = true) |-- Long_: double (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Active: integer (nullable = true) |-- Combined_Key: string (nullable = true) </pre><p><em><i class="text-center">In unserem Beispielcode haben wir einige der Spalten umbenannt (z. B. Long_ -> Longitude, Province/State -> Province_State usw.), da sie semantisch gleich sind. Anstatt das Tabellenschema weiterzuentwickeln, haben wir einfach die Spalten umbenannt.</i></em></p><p>Wenn das Hauptanliegen nur das Zusammenführen der Schemata gewesen wäre, könnten wir die Schema-Evolution-Funktion von Delta Lake mit der Option „mergeSchema“ in <code>DataFrame.write()</code> verwenden, wie in der folgenden Anweisung gezeigt.</p><pre>new_data.write.option("mergeSchema", "true").mode("append").save(path) </pre><p>Aber was passiert, wenn Sie einen vorhandenen Wert aktualisieren und gleichzeitig das Schema zusammenführen müssen? Mit Delta Lake 0.6.0 kann dies durch <em>Schema-Evolution für Merge-Operationen</em> erreicht werden. Um dies zu veranschaulichen, beginnen wir mit der Überprüfung der alten Daten, die <em>eine Zeile</em> darstellen.</p><pre>old_data.select("process_date", "Province_State", "Country_Region", "Last_Update", "Confirmed").show() +------------+--------------+--------------+-------------------+---------+ |process_date|Province_State|Country_Region| Last_Update|Confirmed| +------------+--------------+--------------+-------------------+---------+ | 2020-03-21| Washington| US|2020-03-21 22:43:04| 1793| +------------+--------------+--------------+-------------------+---------+ </pre><p>Als Nächstes simulieren wir einen Update-Eintrag, der dem Schema von new_data folgt</p><pre># Simulate an Updated Entry items = [(53, '', 'Washington', 'US', '2020-04-27T19:00:00', 47.4009, -121.4905, 1793, 94, 0, '', '', '2020-03-21', 2)] cols = ['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Latitude', 'Longitude', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key', 'process_date', 'level'] simulated_update = spark.createDataFrame(items, cols) </pre><p>und vereinigen simulated_update und new_data mit insgesamt <em>40 Zeilen</em>.</p><pre>new_data.select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed").sort(col("FIPS")).show(5) +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed| +------------+-----+--------------+--------------+-------------------+---------+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| +------------+-----+--------------+--------------+-------------------+---------+ </pre><p>Wir setzen den folgenden Parameter, um Ihre Umgebung für die <a href="https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution">automatische Schema-Evolution</a> zu konfigurieren:</p><pre># Enable automatic schema evolution spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true") </pre><p>Jetzt können wir eine einzelne atomare Operation ausführen, um die Werte (vom 21.03.2020) zu aktualisieren und das neue Schema mit der folgenden Anweisung zusammenzuführen.</p><pre>from delta.tables import * deltaTable = DeltaTable.forPath(spark, DELTA_PATH) # Schema Evolution with a Merge Operation deltaTable.alias("t").merge( new_data.alias("s"), "s.process_date = t.process_date AND s.province_state = t.province_state AND s.country_region = t.country_region AND s.level = t.level" ).whenMatchedUpdateAll( ).whenNotMatchedInsertAll( ).execute() </pre><p>Lassen Sie uns die Delta Lake-Tabelle mit der folgenden Anweisung überprüfen:</p><pre># Load the data spark.read.format("delta").load(DELTA_PATH) .select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed", "Admin2") .sort(col("FIPS")) .show() +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed|Admin| +------------+-----+--------------+--------------+-------------------+---------+-----+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| Adams | | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| Asotin | | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| Benton | | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| Chelan | +------------+-----+--------------+--------------+-------------------+---------+-----+ </pre><h2>Operational Metrics</h2><p>Sie können die operativen Metriken weiter untersuchen, indem Sie sich die Delta Lake Table History (Spalte operationMetrics) in der Spark UI ansehen, indem Sie die folgende Anweisung ausführen:</p><pre>deltaTable.history().show() </pre><p>Unten sehen Sie eine gekürzte Ausgabe des vorherigen Befehls.</p><pre>+-------+------+---------+--------------------+ |version|userId|operation| operationMetrics| +-------+------+---------+--------------------+ | 1|100802| MERGE|[numTargetRowsCop...| | 0|100802| WRITE|[numFiles -> 1, n...| +-------+------+---------+--------------------+ </pre><p>Sie werden zwei Versionen der Tabelle bemerken, eine für das alte Schema und eine weitere Version für das neue Schema. Wenn Sie die nachstehenden operativen Metriken überprüfen, wird darauf hingewiesen, dass 39 Zeilen eingefügt und 1 Zeile aktualisiert wurden.</p><pre>{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0",{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0", "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } </pre><p>Sie können mehr über die Details hinter diesen operativen Metriken erfahren, indem Sie zum SQL-Tab in der Spark UI gehen.</p><p style="max-width:450px;"><a href="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" data-lightbox=" "><img class="aligncenter size-full wp-image-109752" style="width:500px;" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" alt="Ein Beispiel für die operativen Metriken, die jetzt über Delta Lake 0.6.0 in der Spark UI überprüft werden können" height="410"></a></p><p>Die animierte GIF-Datei hebt die Hauptkomponenten der Spark UI hervor, die Sie überprüfen können.</p><ol><li>39 anfängliche Zeilen aus einer Datei (für den 11.04.2020 mit dem neuen Schema), die den anfänglichen new_data DataFrame erstellten</li><li>1 simulierte Update-Zeile, die mit dem new_data DataFrame vereinigt würde</li><li>1 Zeile aus der einen Datei (für den 21.03.2020 mit dem alten Schema), die den old_data DataFrame erstellte.</li><li>Ein SortMergeJoin, der verwendet wird, um die beiden DataFrames zusammenzuführen und in unserer Delta Lake-Tabelle zu speichern.</li></ol><p>Um tiefer in die Interpretation dieser operativen Metriken einzutauchen, sehen Sie sich das <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">Tech-Talk Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work</a> an.</p><p><a class="lightbox-trigger" href="https://www.youtube.com/watch?v=7ewmcdrylsA" data-lightbox=" "><img class="aligncenter size-full wp-image-97884" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-2.png" alt="Ein Tech-Talk: Wie DELETE, UPDATE und MERGE funktionieren" height="630"></a></p>
Probieren Sie Delta Lake mit den vorangegangenen Code-Snippets auf Ihrer Apache Spark 2.4.5 (oder höher) Instanz aus (auf Databricks versuchen Sie dies mit DBR 6.6+). Delta Lake macht Ihre Data Lakes zuverlässiger (egal, ob Sie einen neuen erstellen oder einen bestehenden Data Lake migrieren). Weitere Informationen finden Sie unter https://delta.io/, und treten Sie der Delta Lake-Community über Slack und die Google Group bei. Sie können alle kommenden Releases und geplanten Features in den GitHub-Meilensteinen verfolgen. Sie können auch Managed Delta Lake auf Databricks mit einem kostenlosen Konto ausprobieren.
Wir möchten den folgenden Mitwirkenden für Updates, Dokumentationsänderungen und Beiträge zu Delta Lake 0.6.0 danken: Ali Afroozeh, Andrew Fogarty, Anurag870, Burak Yavuz, Erik LaBianca, Gengliang Wang, IonutBoicuAms, Jakub Orłowski, Jose Torres, KevinKarlBob, Michael Armbrust, Pranav Anand, Rahul Govind, Rahul Mahadev, Shixiong Zhu, Steve Suh, Tathagata Das, Timothy Zhang, Tom van Bussel, Wesley Hoffman, Xiao Li, chet, Eugene Koifman, Herman van Hovell, hongdd, lswyyy, lys0716, Mahmoud Mahdi, Maryann Xue
(Dieser Blogbeitrag wurde mit KI-gestützten Tools übersetzt.) Originalbeitrag
