<em>Ottieni un'anteprima di </em><a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=operationsmetricsdl-blog-oreillydlupandrunning" target="_blank"><em>l'ebook di O'Reilly</em></a><em> per la guida passo passo necessaria per iniziare a usare Delta Lake.</em></p><blockquote><p>Prova questo <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb" rel="noopener noreferrer" target="_blank">notebook</a> per riprodurre i passaggi descritti di seguito</p></blockquote><p>Abbiamo recentemente annunciato il rilascio di <a href="https://github.com/delta-io/delta/releases/tag/v0.6.0">Delta Lake 0.6.0</a>, che introduce l'evoluzione dello schema e miglioramenti delle prestazioni nelle operazioni di merge e metriche operative nella cronologia delle tabelle. Le funzionalità principali di questo rilascio sono:</p><ul><li>Supporto per l'evoluzione dello schema nelle operazioni di merge (<a href="https://github.com/delta-io/delta/issues/170">#170</a>) - Ora puoi far evolvere automaticamente lo schema della tabella con l'operazione di merge. Questo è utile in scenari in cui desideri aggiornare i dati di modifica in una tabella e lo schema dei dati cambia nel tempo. Invece di rilevare e applicare le modifiche dello schema prima dell'aggiornamento, il merge può far evolvere contemporaneamente lo schema e aggiornare le modifiche.</li><li><strong>Prestazioni di merge migliorate con ripartizionamento automatico</strong> (<a href="https://github.com/delta-io/delta/issues/349">#349</a>) - Quando esegui il merge in tabelle partizionate, puoi scegliere di ripartizionare automaticamente i dati in base alle colonne di partizione prima di scrivere nella tabella. Nei casi in cui l'operazione di merge su una tabella partizionata è lenta perché genera troppi file piccoli (<a href="https://github.com/delta-io/delta/issues/345">#345</a>), l'abilitazione del ripartizionamento automatico (spark.delta.merge.repartitionBeforeWrite) può migliorare le prestazioni.</li><li><strong>Prestazioni migliorate quando non è presente una clausola insert</strong> (<a href="https://github.com/delta-io/delta/issues/342">#342</a>) - Ora puoi ottenere prestazioni migliori in un'operazione di merge se non ha alcuna clausola insert.</li><li><strong>Metriche operative in DESCRIBE HISTORY</strong> (<a href="https://github.com/delta-io/delta/issues/312">#312</a>) - Ora puoi vedere le metriche operative (ad esempio, numero di file e righe modificate) per tutte le scritture, aggiornamenti ed eliminazioni su una tabella Delta nella cronologia della tabella.</li><li><strong>Supporto per la lettura di tabelle Delta da qualsiasi file system</strong> (<a href="https://github.com/delta-io/delta/issues/347">#347</a>) - Ora puoi leggere tabelle Delta su qualsiasi sistema di archiviazione con un'implementazione Hadoop FileSystem. Tuttavia, la scrittura su tabelle Delta richiede ancora la configurazione di un'implementazione LogStore che fornisca le garanzie necessarie sul sistema di archiviazione.</li></ul><h2>Evoluzione dello schema nelle operazioni di merge</h2><p>Come notato nelle versioni precedenti di Delta Lake, Delta Lake include la capacità di <a href="https://www.databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html" rel="noopener noreferrer" target="_blank">eseguire operazioni di merge</a> per semplificare le tue <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">operazioni di insert/update/delete in un'unica operazione atomica</a>, nonché la capacità di <a href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" rel="noopener noreferrer" target="_blank">applicare ed evolvere il tuo schema</a> (maggiori dettagli sono disponibili anche in questo <a href="https://www.youtube.com/watch?v=tjb10n5wVs8">tech talk</a>). Con il rilascio di Delta Lake 0.6.0, ora puoi far evolvere il tuo schema all'interno di un'operazione di merge.</p><p>Mostriamo questo utilizzando un esempio tempestivo; puoi trovare il campione di codice originale in <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb">questo notebook</a>. Inizieremo con un piccolo sottoinsieme del <a href="https://github.com/CSSEGISandData/COVID-19">2019 Novel Coronavirus COVID-19 (2019-nCoV) Data Repository by Johns Hopkins CSSE</a> che abbiamo reso disponibile in <a href="https://docs.databricks.com/data/databricks-datasets.html">/databricks-datasets</a>. Questo è un set di dati comunemente utilizzato da ricercatori e analisti per ottenere informazioni sul numero di casi di COVID-19 in tutto il mondo. Uno dei problemi con i dati è che lo schema cambia nel tempo.</p><p>Ad esempio, i file che rappresentano i casi di COVID-19 dal 1 marzo al 21 marzo (al 30 aprile 2020) hanno lo schema seguente:</p><pre># Import old_data old_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../03-21-2020.csv)) old_data.printSchema() root |-- Province/State: string (nullable = true) |-- Country/Region: string (nullable = true) |-- Last Update: timestamp (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Latitude: double (nullable = true) |-- Longitude: double (nullable = true) </pre><p>Ma i file dal 22 marzo in poi (al 30 aprile) avevano colonne aggiuntive tra cui FIPS, Admin2, Active e Combined_Key.</p><pre>new_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../04-21-2020.csv)) new_data.printSchema() root |-- FIPS: integer (nullable = true) |-- Admin2: string (nullable = true) |-- Province_State: string (nullable = true) |-- Country_Region: string (nullable = true) |-- Last_Update: string (nullable = true) |-- Lat: double (nullable = true) |-- Long_: double (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Active: integer (nullable = true) |-- Combined_Key: string (nullable = true) </pre><p><em><i class="text-center">Nel nostro codice di esempio, abbiamo rinominato alcune delle colonne (ad es. Long_ -> Longitude, Province/State -> Province_State, ecc.) poiché sono semanticamente le stesse. Invece di far evolvere lo schema della tabella, abbiamo semplicemente rinominato le colonne.</i></em></p><p>Se la preoccupazione principale fosse solo unire gli schemi, potremmo utilizzare la funzionalità di evoluzione dello schema di Delta Lake utilizzando l'opzione “mergeSchema” in <code>DataFrame.write()</code>, come mostrato nell'istruzione seguente.</p><pre>new_data.write.option("mergeSchema", "true").mode("append").save(path) </pre><p>Ma cosa succede se è necessario aggiornare un valore esistente e unire lo schema allo stesso tempo? Con Delta Lake 0.6.0, questo può essere ottenuto con l'<em>evoluzione dello schema per le operazioni di merge</em>. Per visualizzare questo, iniziamo rivedendo i vecchi dati che sono <em>una riga</em>.</p><pre>old_data.select("process_date", "Province_State", "Country_Region", "Last_Update", "Confirmed").show() +------------+--------------+--------------+-------------------+---------+ |process_date|Province_State|Country_Region| Last_Update|Confirmed| +------------+--------------+--------------+-------------------+---------+ | 2020-03-21| Washington| US|2020-03-21 22:43:04| 1793| +------------+--------------+--------------+-------------------+---------+ </pre><p>Successivamente simuliamo una voce di aggiornamento che segue lo schema di new_data</p><pre># Simulate an Updated Entry items = [(53, '', 'Washington', 'US', '2020-04-27T19:00:00', 47.4009, -121.4905, 1793, 94, 0, '', '', '2020-03-21', 2)] cols = ['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Latitude', 'Longitude', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key', 'process_date', 'level'] simulated_update = spark.createDataFrame(items, cols) </pre><p>e uniamo simulated_update e new_data con un totale di <em>40 righe</em>.</p><pre>new_data.select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed").sort(col("FIPS")).show(5) +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed| +------------+-----+--------------+--------------+-------------------+---------+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| +------------+-----+--------------+--------------+-------------------+---------+ </pre><p>Abbiamo impostato il seguente parametro per configurare il tuo ambiente per l'<a href="https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution">evoluzione automatica dello schema</a>:</p><pre># Enable automatic schema evolution spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true") </pre><p>Ora possiamo eseguire una singola operazione atomica per aggiornare i valori (dal 21/03/2020) e unire il nuovo schema con la seguente istruzione.</p><pre>from delta.tables import * deltaTable = DeltaTable.forPath(spark, DELTA_PATH) # Schema Evolution with a Merge Operation deltaTable.alias("t").merge( new_data.alias("s"), "s.process_date = t.process_date AND s.province_state = t.province_state AND s.country_region = t.country_region AND s.level = t.level" ).whenMatchedUpdateAll( ).whenNotMatchedInsertAll( ).execute() </pre><p>Rivediamo la tabella Delta Lake con la seguente istruzione:</p><pre># Load the data spark.read.format("delta").load(DELTA_PATH) .select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed", "Admin2") .sort(col("FIPS")) .show() +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed|Admin| +------------+-----+--------------+--------------+-------------------+---------+-----+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| Adams | | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| Asotin | | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| Benton | | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| Chelan | +------------+-----+--------------+--------------+-------------------+---------+-----+ </pre><h2>Metriche Operative</h2><p>Puoi approfondire le metriche operative esaminando la Cronologia della Tabella Delta Lake (colonna operationMetrics) nell'interfaccia utente di Spark eseguendo il seguente comando:</p><pre>deltaTable.history().show() </pre><p>Di seguito è riportato un output abbreviato del comando precedente.</p><pre>+-------+------+---------+--------------------+ |version|userId|operation| operationMetrics| +-------+------+---------+--------------------+ | 1|100802| MERGE|[numTargetRowsCop...| | 0|100802| WRITE|[numFiles -> 1, n...| +-------+------+---------+--------------------+ </pre><p>Noterai due versioni della tabella, una per il vecchio schema e un'altra versione per il nuovo schema. Rivedendo le metriche operative di seguito, si nota che sono state inserite 39 righe e aggiornata 1 riga.</p><pre>{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0",{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0", "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } </pre><p>Puoi comprendere meglio i dettagli di queste metriche operative andando alla scheda SQL all'interno dell'interfaccia utente di Spark.</p><p style="max-width:450px;"><a href="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" data-lightbox=" "><img class="aligncenter size-full wp-image-109752" style="width:500px;" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" alt="Un esempio delle metriche operative ora disponibili per la revisione nell'interfaccia utente di Spark tramite Delta Lake 0.6.0" height="410"></a></p><p>La GIF animata evidenzia i componenti principali dell'interfaccia utente di Spark per la tua revisione.</p><ol><li>39 righe iniziali da un file (per il 4/11/2020 con il nuovo schema) che hanno creato il DataFrame new_data iniziale</li><li>1 riga di aggiornamento simulata generata che verrebbe unita al DataFrame new_data</li><li>1 riga da un file (per il 3/21/2020 con il vecchio schema) che ha creato il DataFrame old_data.</li><li>Un SortMergeJoin utilizzato per unire i due DataFrame per essere persistiti nella nostra tabella Delta Lake.</li></ol><p>Per approfondire come interpretare queste metriche operative, consulta la tech talk <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work</a>.</p><p><a class="lightbox-trigger" href="https://www.youtube.com/watch?v=7ewmcdrylsA" data-lightbox=" "><img class="aligncenter size-full wp-image-97884" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-2.png" alt="Una tech talk: Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work." height="630"></a></p>
Prova Delta Lake con gli snippet di codice precedenti sulla tua istanza Apache Spark 2.4.5 (o superiore) (su Databricks, prova con DBR 6.6+). Delta Lake rende i tuoi data lake più affidabili (sia che tu ne crei uno nuovo o migri un data lake esistente). Per saperne di più, consulta https://delta.io/ e unisciti alla community di Delta Lake tramite Slack e Google Group. Puoi monitorare tutte le prossime release e le funzionalità pianificate in GitHub milestones. Puoi anche provare Managed Delta Lake su Databricks con un account gratuito.
Desideriamo ringraziare i seguenti collaboratori per gli aggiornamenti, le modifiche alla documentazione e i contributi in Delta Lake 0.6.0: Ali Afroozeh, Andrew Fogarty, Anurag870, Burak Yavuz, Erik LaBianca, Gengliang Wang, IonutBoicuAms, Jakub Orłowski, Jose Torres, KevinKarlBob, Michael Armbrust, Pranav Anand, Rahul Govind, Rahul Mahadev, Shixiong Zhu, Steve Suh, Tathagata Das, Timothy Zhang, Tom van Bussel, Wesley Hoffman, Xiao Li, chet, Eugene Koifman, Herman van Hovell, hongdd, lswyyy, lys0716, Mahmoud Mahdi, Maryann Xue
(Questo post sul blog è stato tradotto utilizzando strumenti basati sull'intelligenza artificiale) Post originale
