<em>Obtenez un aperçu en avant-première du nouveau livre électronique d'O'Reilly, </em><a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=operationsmetricsdl-blog-oreillydlupandrunning" target="_blank"><em>O'Reilly's new ebook</em></a><em> pour obtenir les instructions étape par étape dont vous avez besoin pour commencer à utiliser Delta Lake.</em></p><blockquote><p>Essayez ce <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb" rel="noopener noreferrer" target="_blank">notebook</a> pour reproduire les étapes décrites ci-dessous.</p></blockquote><p>Nous avons récemment annoncé la publication de <a href="https://github.com/delta-io/delta/releases/tag/v0.6.0">Delta Lake 0.6.0</a>, qui introduit l'évolution du schéma et des améliorations de performance dans les métriques de fusion et opérationnelles dans l'historique des tables. Les principales fonctionnalités de cette version sont :</p><ul><li>Prise en charge de l'évolution du schéma dans les opérations de fusion (<a href="https://github.com/delta-io/delta/issues/170">#170</a>) - Vous pouvez désormais faire évoluer automatiquement le schéma de la table avec l'opération de fusion. Ceci est utile dans les scénarios où vous souhaitez mettre à jour des données de modification dans une table et que le schéma des données change avec le temps. Au lieu de détecter et d'appliquer les modifications de schéma avant la mise à jour, la fusion peut faire évoluer simultanément le schéma et mettre à jour les modifications.</li><li><strong>Amélioration des performances de fusion avec repartitionnement automatique</strong> (<a href="https://github.com/delta-io/delta/issues/349">#349</a>) - Lors de la fusion dans des tables partitionnées, vous pouvez choisir de repartitionner automatiquement les données par les colonnes de partition avant d'écrire dans la table. Dans les cas où l'opération de fusion sur une table partitionnée est lente car elle génère trop de petits fichiers (<a href="https://github.com/delta-io/delta/issues/345">#345</a>), l'activation du repartitionnement automatique (spark.delta.merge.repartitionBeforeWrite) peut améliorer les performances.</li><li><strong>Amélioration des performances lorsqu'il n'y a pas de clause d'insertion</strong> (<a href="https://github.com/delta-io/delta/issues/342">#342</a>) - Vous pouvez désormais obtenir de meilleures performances dans une opération de fusion si elle ne contient aucune clause d'insertion.</li><li><strong>Métriques d'opération dans DESCRIBE HISTORY</strong> (<a href="https://github.com/delta-io/delta/issues/312">#312</a>) - Vous pouvez désormais voir les métriques d'opération (par exemple, le nombre de fichiers et de lignes modifiés) pour toutes les écritures, mises à jour et suppressions sur une table Delta dans l'historique de la table.</li><li><strong>Prise en charge de la lecture des tables Delta à partir de n'importe quel système de fichiers</strong> (<a href="https://github.com/delta-io/delta/issues/347">#347</a>) - Vous pouvez désormais lire les tables Delta sur n'importe quel système de stockage avec une implémentation Hadoop FileSystem. Cependant, l'écriture dans les tables Delta nécessite toujours la configuration d'une implémentation LogStore qui offre les garanties nécessaires sur le système de stockage.</li></ul><h2>Évolution du schéma dans les opérations de fusion</h2><p>Comme indiqué dans les versions précédentes de Delta Lake, Delta Lake inclut la capacité d'<a href="https://www.databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html" rel="noopener noreferrer" target="_blank">exécuter des opérations de fusion</a> pour simplifier vos <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">opérations d'insertion/mise à jour/suppression en une seule opération atomique</a>, ainsi que la capacité d'<a href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" rel="noopener noreferrer" target="_blank">appliquer et faire évoluer votre schéma</a> (plus de détails sont également disponibles dans ce <a href="https://www.youtube.com/watch?v=tjb10n5wVs8">tech talk</a>). Avec la publication de Delta Lake 0.6.0, vous pouvez désormais faire évoluer votre schéma au sein d'une opération de fusion.</p><p>Illustrons cela en utilisant un exemple d'actualité ; vous pouvez trouver l'exemple de code original dans <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb">ce notebook</a>. Nous commencerons avec un petit sous-ensemble du <a href="https://github.com/CSSEGISandData/COVID-19">référentiel de données COVID-19 (2019-nCoV) 2019 de Johns Hopkins CSSE</a> que nous avons rendu disponible dans <a href="https://docs.databricks.com/data/databricks-datasets.html">/databricks-datasets</a>. Il s'agit d'un ensemble de données couramment utilisé par les chercheurs et les analystes pour obtenir un aperçu du nombre de cas de COVID-19 dans le monde. L'un des problèmes avec les données est que le schéma change avec le temps.</p><p>Par exemple, les fichiers représentant les cas de COVID-19 du 1er mars au 21 mars (au 30 avril 2020) ont le schéma suivant :</p><pre># Importer les anciennes données old_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../03-21-2020.csv)) old_data.printSchema() root |-- Province/State: string (nullable = true) |-- Country/Region: string (nullable = true) |-- Last Update: timestamp (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Latitude: double (nullable = true) |-- Longitude: double (nullable = true) </pre><p>Mais les fichiers du 22 mars et suivants (au 30 avril) contenaient des colonnes supplémentaires, notamment FIPS, Admin2, Active et Combined_Key.</p><pre>new_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../04-21-2020.csv)) new_data.printSchema() root |-- FIPS: integer (nullable = true) |-- Admin2: string (nullable = true) |-- Province_State: string (nullable = true) |-- Country_Region: string (nullable = true) |-- Last_Update: string (nullable = true) |-- Lat: double (nullable = true) |-- Long_: double (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Active: integer (nullable = true) |-- Combined_Key: string (nullable = true) </pre><p><em><i class="text-center">Dans notre exemple de code, nous avons renommé certaines des colonnes (par exemple, Long_ -> Longitude, Province/State -> Province_State, etc.) car elles sont sémantiquement identiques. Au lieu de faire évoluer le schéma de la table, nous avons simplement renommé les colonnes.</i></em></p><p>Si la préoccupation principale était simplement de fusionner les schémas, nous pourrions utiliser la fonctionnalité d'évolution de schéma de Delta Lake en utilisant l'option « mergeSchema » dans <code>DataFrame.write()</code>, comme indiqué dans l'instruction suivante.</p><pre>new_data.write.option("mergeSchema", "true").mode("append").save(path) </pre><p>Mais que se passe-t-il si vous devez mettre à jour une valeur existante et fusionner le schéma en même temps ? Avec Delta Lake 0.6.0, cela peut être réalisé avec l'<em>évolution du schéma pour les opérations de fusion</em>. Pour visualiser cela, commençons par examiner les anciennes données qui correspondent à <em>une seule ligne</em>.</p><pre>old_data.select("process_date", "Province_State", "Country_Region", "Last_Update", "Confirmed").show() +------------+--------------+--------------+-------------------+---------+ |process_date|Province_State|Country_Region| Last_Update|Confirmed| +------------+--------------+--------------+-------------------+---------+ | 2020-03-21| Washington| US|2020-03-21 22:43:04| 1793| +------------+--------------+--------------+-------------------+---------+ </pre><p>Ensuite, simulons une entrée de mise à jour qui suit le schéma de new_data</p><pre># Simuler une entrée mise à jour items = [(53, '', 'Washington', 'US', '2020-04-27T19:00:00', 47.4009, -121.4905, 1793, 94, 0, '', '', '2020-03-21', 2)] cols = ['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Latitude', 'Longitude', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key', 'process_date', 'level'] simulated_update = spark.createDataFrame(items, cols) </pre><p>et combinons simulated_update et new_data avec un total de <em>40 lignes</em>.</p><pre>new_data.select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed").sort(col("FIPS")).show(5) +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed| +------------+-----+--------------+--------------+-------------------+---------+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| +------------+-----+--------------+--------------+-------------------+---------+ </pre><p>Nous avons défini le paramètre suivant pour configurer votre environnement pour l'<a href="https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution">évolution automatique du schéma</a> :</p><pre># Activer l'évolution automatique du schéma spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true") </pre><p>Nous pouvons maintenant exécuter une opération atomique unique pour mettre à jour les valeurs (à partir du 21/03/2020) ainsi que pour fusionner le nouveau schéma avec l'instruction suivante.</p><pre>from delta.tables import * deltaTable = DeltaTable.forPath(spark, DELTA_PATH) # Évolution du schéma avec une opération de fusion deltaTable.alias("t").merge( new_data.alias("s"), "s.process_date = t.process_date AND s.province_state = t.province_state AND s.country_region = t.country_region AND s.level = t.level" ).whenMatchedUpdateAll( ).whenNotMatchedInsertAll( ).execute() </pre><p>Examinons la table Delta Lake avec l'instruction suivante :</p><pre># Load the data spark.read.format("delta").load(DELTA_PATH) .select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed", "Admin2") .sort(col("FIPS")) .show() +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed|Admin| +------------+-----+--------------+--------------+-------------------+---------+-----+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| Adams | | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| Asotin | | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| Benton | | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| Chelan | +------------+-----+--------------+--------------+-------------------+---------+-----+ </pre><h2>Operational Metrics</h2><p>You can further dive into the operational metrics by looking at the Delta Lake Table History (operationMetrics column) in the Spark UI by running the following statement:</p><pre>deltaTable.history().show() </pre><p>Below is an abbreviated output from the preceding command.</p><pre>+-------+------+---------+--------------------+ |version|userId|operation| operationMetrics| +-------+------+---------+--------------------+ | 1|100802| MERGE|[numTargetRowsCop...| | 0|100802| WRITE|[numFiles -> 1, n...| +-------+------+---------+--------------------+ </pre><p>You will notice two versions of the table, one for the old schema and another version for the new schema. When reviewing the operational metrics below, it notes that there were 39 rows inserted and 1 row updated.</p><pre>{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0",{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0", "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } </pre><p>You can understand more about the details behind these operational metrics by going to the SQL tab within the Spark UI.</p><p style="max-width:450px;"><a href="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" data-lightbox=" "><img class="aligncenter size-full wp-image-109752" style="width:500px;" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" alt="An example of the operational metrics now available for review in the Spark UI through Delta Lake 0.6.0" height="410"></a></p><p>The animated GIF calls out the main components of the Spark UI for your review.</p><ol><li>39 initial rows from one file (for 4/11/2020 with the new schema) that created the initial new_data DataFrame</li><li>1 simulated update row generated that would union with the new_data DataFrame</li><li>1 row from the one file (for 3/21/2020 with the old schema) that created the old_data DataFrame.</li><li>A SortMergeJoin used to join the two DataFrames together to be persisted in our Delta Lake table.</li></ol><p>To dive further into how to interpret these operational metrics, check out the <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work tech talk</a>.</p><p><a class="lightbox-trigger" href="https://www.youtube.com/watch?v=7ewmcdrylsA" data-lightbox=" "><img class="aligncenter size-full wp-image-97884" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-2.png" alt="ADiving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work tech talk." height="630"></a></p>
Try out Delta Lake with the preceding code snippets on your Apache Spark 2.4.5 (or greater) instance (on Databricks, try this with DBR 6.6+). Delta Lake makes your data lakes more reliable (whether you create a new one or migrate an existing data lake). To learn more, refer to https://delta.io/, and join the Delta Lake community via Slack and Google Group. You can track all the upcoming releases and planned features in GitHub milestones. You can also try out Managed Delta Lake on Databricks with a free account.
We want to thank the following contributors for updates, doc changes, and contributions in Delta Lake 0.6.0: Ali Afroozeh, Andrew Fogarty, Anurag870, Burak Yavuz, Erik LaBianca, Gengliang Wang, IonutBoicuAms, Jakub Orłowski, Jose Torres, KevinKarlBob, Michael Armbrust, Pranav Anand, Rahul Govind, Rahul Mahadev, Shixiong Zhu, Steve Suh, Tathagata Das, Timothy Zhang, Tom van Bussel, Wesley Hoffman, Xiao Li, chet, Eugene Koifman, Herman van Hovell, hongdd, lswyyy, lys0716, Mahmoud Mahdi, Maryann Xue
(Cet article de blog a été traduit à l'aide d'outils basés sur l'intelligence artificielle) Article original
