Ir al contenido principal

Evolución del esquema en operaciones de fusión y métricas operativas en Delta Lake

Delta Lake 0.6.0 introduce la evolución del esquema y mejoras de rendimiento en las métricas de fusión y operativas en el historial de la tabla

An example of the operational metrics now available for review in the Spark UI through Delta Lake 0.6.0

Publicado: 19 de mayo de 2020

Soluciones8 min de lectura

 <em>Obtén una vista previa anticipada del nuevo ebook de O'Reilly de </em><a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=operationsmetricsdl-blog-oreillydlupandrunning" target="_blank"><em>O'Reilly</em></a><em> para obtener la guía paso a paso que necesitas para empezar a usar Delta Lake.</em></p><blockquote><p>Prueba este <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb" rel="noopener noreferrer" target="_blank">notebook</a> para reproducir los pasos descritos a continuación.</p></blockquote><p>Recientemente anunciamos el lanzamiento de <a href="https://github.com/delta-io/delta/releases/tag/v0.6.0">Delta Lake 0.6.0</a>, que introduce la evolución del esquema y mejoras de rendimiento en las operaciones de fusión (merge) y métricas operativas en el historial de tablas. Las características clave de esta versión son:</p><ul><li>Soporte para la evolución del esquema en operaciones de fusión (<a href="https://github.com/delta-io/delta/issues/170">#170</a>): ahora puedes evolucionar automáticamente el esquema de la tabla con la operación de fusión. Esto es útil en escenarios donde deseas actualizar datos de cambios en una tabla y el esquema de los datos cambia con el tiempo. En lugar de detectar y aplicar cambios de esquema antes de la actualización, la fusión puede evolucionar el esquema y actualizar los cambios simultáneamente.</li><li><strong>Mejora del rendimiento de la fusión con reasignación automática</strong> (<a href="https://github.com/delta-io/delta/issues/349">#349</a>): al fusionar tablas particionadas, puedes optar por reasignar automáticamente los datos por las columnas de partición antes de escribir en la tabla. En casos donde la operación de fusión en una tabla particionada es lenta porque genera demasiados archivos pequeños (<a href="https://github.com/delta-io/delta/issues/345">#345</a>), habilitar la reasignación automática (spark.delta.merge.repartitionBeforeWrite) puede mejorar el rendimiento.</li><li><strong>Mejora del rendimiento cuando no hay cláusula de inserción</strong> (<a href="https://github.com/delta-io/delta/issues/342">#342</a>): ahora puedes obtener un mejor rendimiento en una operación de fusión si no tiene ninguna cláusula de inserción.</li><li><strong>Métricas de operación en DESCRIBE HISTORY</strong> (<a href="https://github.com/delta-io/delta/issues/312">#312</a>): ahora puedes ver métricas de operación (por ejemplo, número de archivos y filas modificadas) para todas las escrituras, actualizaciones y eliminaciones en una tabla Delta en el historial de la tabla.</li><li><strong>Soporte para leer tablas Delta desde cualquier sistema de archivos</strong> (<a href="https://github.com/delta-io/delta/issues/347">#347</a>): ahora puedes leer tablas Delta en cualquier sistema de almacenamiento con una implementación de Hadoop FileSystem. Sin embargo, escribir en tablas Delta todavía requiere configurar una implementación de LogStore que proporcione las garantías necesarias en el sistema de almacenamiento.</li></ul><h2>Evolución del Esquema en Operaciones de Fusión</h2><p>Como se señaló en versiones anteriores de Delta Lake, Delta Lake incluye la capacidad de <a href="https://www.databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html" rel="noopener noreferrer" target="_blank">ejecutar operaciones de fusión</a> para simplificar tus <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">operaciones de inserción/actualización/eliminación en una sola operación atómica</a>, así como la capacidad de <a href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" rel="noopener noreferrer" target="_blank">aplicar y evolucionar tu esquema</a> (más detalles también se pueden encontrar en esta <a href="https://www.youtube.com/watch?v=tjb10n5wVs8">charla técnica</a>). Con el lanzamiento de Delta Lake 0.6.0, ahora puedes evolucionar tu esquema dentro de una operación de fusión.</p><p>Mostremos esto usando un ejemplo oportuno; puedes encontrar la muestra de código original en <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb">este notebook</a>. Comenzaremos con un pequeño subconjunto del <a href="https://github.com/CSSEGISandData/COVID-19">Repositorio de Datos COVID-19 (2019-nCoV) del 2019 del Laboratorio de Datos de la Universidad Johns Hopkins CSSE</a> que hemos puesto a disposición en <a href="https://docs.databricks.com/data/databricks-datasets.html">/databricks-datasets</a>. Este es un conjunto de datos comúnmente utilizado por investigadores y analistas para obtener información sobre el número de casos de COVID-19 en todo el mundo. Uno de los problemas con los datos es que el esquema cambia con el tiempo.</p><p>Por ejemplo, los archivos que representan los casos de COVID-19 del 1 de marzo al 21 de marzo (a partir del 30 de abril de 2020) tienen el siguiente esquema:</p><pre># Importar old_data old_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../03-21-2020.csv)) old_data.printSchema() root |-- Province/State: string (nullable = true) |-- Country/Region: string (nullable = true) |-- Last Update: timestamp (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Latitude: double (nullable = true) |-- Longitude: double (nullable = true) </pre><p>Pero los archivos del 22 de marzo en adelante (a partir del 30 de abril) tenían columnas adicionales, incluyendo FIPS, Admin2, Active y Combined_Key.</p><pre>new_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../04-21-2020.csv)) new_data.printSchema() root |-- FIPS: integer (nullable = true) |-- Admin2: string (nullable = true) |-- Province_State: string (nullable = true) |-- Country_Region: string (nullable = true) |-- Last_Update: string (nullable = true) |-- Lat: double (nullable = true) |-- Long_: double (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Active: integer (nullable = true) |-- Combined_Key: string (nullable = true) </pre><p><em><i class="text-center">En nuestro código de ejemplo, renombramos algunas de las columnas (por ejemplo, Long_ -&gt; Longitude, Province/State -&gt; Province_State, etc.) ya que son semánticamente iguales. En lugar de evolucionar el esquema de la tabla, simplemente renombramos las columnas.</i></em></p><p>Si la preocupación principal fuera simplemente fusionar los esquemas, podríamos usar la característica de evolución de esquemas de Delta Lake utilizando la opción “mergeSchema” en <code>DataFrame.write()</code>, como se muestra en la siguiente declaración.</p><pre>new_data.write.option("mergeSchema", "true").mode("append").save(path) </pre><p>Pero, ¿qué sucede si necesitas actualizar un valor existente y fusionar el esquema al mismo tiempo? Con Delta Lake 0.6.0, esto se puede lograr con la <em>evolución del esquema para operaciones de fusión</em>. Para visualizar esto, comencemos revisando old_data, que es <em>una fila</em>.</p><pre>old_data.select("process_date", "Province_State", "Country_Region", "Last_Update", "Confirmed").show() +------------+--------------+--------------+-------------------+---------+ |process_date|Province_State|Country_Region| Last_Update|Confirmed| +------------+--------------+--------------+-------------------+---------+ | 2020-03-21| Washington| US|2020-03-21 22:43:04| 1793| +------------+--------------+--------------+-------------------+---------+ </pre><p>A continuación, simularemos una entrada de actualización que sigue el esquema de new_data</p><pre># Simular una entrada actualizada items = [(53, '', 'Washington', 'US', '2020-04-27T19:00:00', 47.4009, -121.4905, 1793, 94, 0, '', '', '2020-03-21', 2)] cols = ['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Latitude', 'Longitude', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key', 'process_date', 'level'] simulated_update = spark.createDataFrame(items, cols) </pre><p>y unimos simulated_update y new_data con un total de <em>40 filas</em>.</p><pre>new_data.select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed").sort(col("FIPS")).show(5) +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed| +------------+-----+--------------+--------------+-------------------+---------+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| +------------+-----+--------------+--------------+-------------------+---------+ </pre><p>Establecemos el siguiente parámetro para configurar tu entorno para la <a href="https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution">evolución automática del esquema</a>:</p><pre># Habilitar la evolución automática del esquema spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true") </pre><p>Ahora podemos ejecutar una operación atómica única para actualizar los valores (del 21/03/2020) y fusionar el nuevo esquema con la siguiente declaración.</p><pre>from delta.tables import * deltaTable = DeltaTable.forPath(spark, DELTA_PATH) # Evolución del esquema con una operación de fusión deltaTable.alias("t").merge( new_data.alias("s"), "s.process_date = t.process_date AND s.province_state = t.province_state AND s.country_region = t.country_region AND s.level = t.level" ).whenMatchedUpdateAll( ).whenNotMatchedInsertAll( ).execute() </pre><p>Revisemos la tabla Delta Lake con la siguiente declaración:</p><pre># Cargar los datos spark.read.format("delta").load(DELTA_PATH) .select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed", "Admin2") .sort(col("FIPS")) .show() +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed|Admin| +------------+-----+--------------+--------------+-------------------+---------+-----+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| Adams | | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| Asotin | | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| Benton | | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| Chelan | +------------+-----+--------------+--------------+-------------------+---------+-----+ </pre><h2>Métricas Operacionales</h2><p>Puedes profundizar en las métricas operacionales revisando el historial de la tabla Delta Lake (columna operationMetrics) en la Spark UI ejecutando la siguiente instrucción:</p><pre>deltaTable.history().show() </pre><p>A continuación, se muestra una salida abreviada del comando anterior.</p><pre>+-------+------+---------+--------------------+ |version|userId|operation| operationMetrics| +-------+------+---------+--------------------+ | 1|100802| MERGE|[numTargetRowsCop...| | 0|100802| WRITE|[numFiles -&gt; 1, n...| +-------+------+---------+--------------------+ </pre><p>Notarás dos versiones de la tabla, una para el esquema antiguo y otra para el esquema nuevo. Al revisar las métricas operacionales a continuación, se indica que se insertaron 39 filas y se actualizó 1 fila.</p><pre>{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0",{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0", "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } </pre><p>Puedes entender más detalles sobre estas métricas operacionales yendo a la pestaña SQL dentro de la Spark UI.</p><p style="max-width:450px;"><a href="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" data-lightbox=" "><img class="aligncenter size-full wp-image-109752" style="width:500px;" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" alt="Un ejemplo de las métricas operacionales ahora disponibles para revisar en la Spark UI a través de Delta Lake 0.6.0" height="410"></a></p><p>El GIF animado destaca los componentes principales de la Spark UI para tu revisión.</p><ol><li>39 filas iniciales de un archivo (para el 4/11/2020 con el esquema nuevo) que crearon el DataFrame new_data inicial</li><li>1 fila de actualización simulada generada que se uniría con el DataFrame new_data</li><li>1 fila del único archivo (para el 3/21/2020 con el esquema antiguo) que creó el DataFrame old_data.</li><li>Un SortMergeJoin utilizado para unir los dos DataFrames y persistirlos en nuestra tabla Delta Lake.</li></ol><p>Para profundizar en cómo interpretar estas métricas operacionales, consulta la charla técnica <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work</a>.</p><p><a class="lightbox-trigger" href="https://www.youtube.com/watch?v=7ewmcdrylsA" data-lightbox=" "><img class="aligncenter size-full wp-image-97884" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-2.png" alt="Charla técnica: Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work." height="630"></a></p>

LÍDER 5X

Gartner®: Databricks, líder en bases de datos en la nube

Comienza con Delta Lake 0.6.0

Prueba Delta Lake con los fragmentos de código anteriores en tu instancia de Apache Spark 2.4.5 (o superior) (en Databricks, prueba esto con DBR 6.6+). Delta Lake hace que tus data lakes sean más confiables (ya sea que crees uno nuevo o migres uno existente). Para obtener más información, consulta https://delta.io/ y únete a la comunidad de Delta Lake a través de Slack y el Grupo de Google. Puedes seguir todos los próximos lanzamientos y funciones planificadas en GitHub milestones. También puedes probar Delta Lake administrado en Databricks con una cuenta gratuita.

Créditos

Queremos agradecer a los siguientes colaboradores por las actualizaciones, cambios en la documentación y contribuciones en Delta Lake 0.6.0: Ali Afroozeh, Andrew Fogarty, Anurag870, Burak Yavuz, Erik LaBianca, Gengliang Wang, IonutBoicuAms, Jakub Orłowski, Jose Torres, KevinKarlBob, Michael Armbrust, Pranav Anand, Rahul Govind, Rahul Mahadev, Shixiong Zhu, Steve Suh, Tathagata Das, Timothy Zhang, Tom van Bussel, Wesley Hoffman, Xiao Li, chet, Eugene Koifman, Herman van Hovell, hongdd, lswyyy, lys0716, Mahmoud Mahdi, Maryann Xue

(Esta entrada del blog ha sido traducida utilizando herramientas basadas en inteligencia artificial) Publicación original

No te pierdas ninguna publicación de Databricks.

Suscríbete a nuestro blog y recibe las últimas publicaciones en tu bandeja de entrada.