<em>Obtenha uma prévia antecipada do novo ebook da </em><a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=operationsmetricsdl-blog-oreillydlupandrunning" target="_blank"><em>O'Reilly</em></a><em> para obter as orientações passo a passo que você precisa para começar a usar o Delta Lake.</em></p><blockquote><p>Experimente este <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb" rel="noopener noreferrer" target="_blank">notebook</a> para reproduzir as etapas descritas abaixo.</p></blockquote><p>Anunciamos recentemente o lançamento do <a href="https://github.com/delta-io/delta/releases/tag/v0.6.0">Delta Lake 0.6.0</a>, que introduz a evolução de esquema e melhorias de desempenho em operações de merge, além de métricas operacionais no histórico da tabela. Os principais recursos desta versão são:</p><ul><li>Suporte para evolução de esquema em operações de merge (<a href="https://github.com/delta-io/delta/issues/170">#170</a>) - Agora você pode evoluir automaticamente o esquema da tabela com a operação de merge. Isso é útil em cenários onde você deseja atualizar dados de alteração em uma tabela e o esquema dos dados muda ao longo do tempo. Em vez de detectar e aplicar alterações de esquema antes de atualizar, o merge pode evoluir o esquema e atualizar as alterações simultaneamente.</li><li><strong>Melhor desempenho de merge com repartitioning automático</strong> (<a href="https://github.com/delta-io/delta/issues/349">#349</a>) - Ao fazer merge em tabelas particionadas, você pode optar por repartitionar automaticamente os dados pelas colunas de partição antes de gravar na tabela. Em casos onde a operação de merge em uma tabela particionada é lenta porque gera muitos arquivos pequenos (<a href="https://github.com/delta-io/delta/issues/345">#345</a>), habilitar o repartitioning automático (spark.delta.merge.repartitionBeforeWrite) pode melhorar o desempenho.</li><li><strong>Melhor desempenho quando não há cláusula insert</strong> (<a href="https://github.com/delta-io/delta/issues/342">#342</a>) - Agora você pode obter melhor desempenho em uma operação de merge se ela não tiver nenhuma cláusula insert.</li><li><strong>Métricas de operação em DESCRIBE HISTORY</strong> (<a href="https://github.com/delta-io/delta/issues/312">#312</a>) - Agora você pode ver métricas de operação (por exemplo, número de arquivos e linhas alteradas) para todas as gravações, atualizações e exclusões em uma tabela Delta no histórico da tabela.</li><li><strong>Suporte para leitura de tabelas Delta de qualquer sistema de arquivos</strong> (<a href="https://github.com/delta-io/delta/issues/347">#347</a>) - Agora você pode ler tabelas Delta em qualquer sistema de armazenamento com uma implementação Hadoop FileSystem. No entanto, a gravação em tabelas Delta ainda requer a configuração de uma implementação LogStore que forneça as garantias necessárias no sistema de armazenamento.</li></ul><h2>Evolução de Esquema em Operações de Merge</h2><p>Como observado em versões anteriores do Delta Lake, o Delta Lake inclui a capacidade de <a href="https://www.databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html" rel="noopener noreferrer" target="_blank">executar operações de merge</a> para simplificar suas <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">operações de insert/update/delete em uma única operação atômica</a>, bem como a capacidade de <a href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" rel="noopener noreferrer" target="_blank">impor e evoluir seu esquema</a> (mais detalhes também podem ser encontrados neste <a href="https://www.youtube.com/watch?v=tjb10n5wVs8">tech talk</a>). Com o lançamento do Delta Lake 0.6.0, você agora pode evoluir seu esquema dentro de uma operação de merge.</p><p>Vamos demonstrar isso usando um exemplo oportuno; você pode encontrar o exemplo de código original em <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb">este notebook</a>. Começaremos com um pequeno subconjunto do <a href="https://github.com/CSSEGISandData/COVID-19">Repositório de Dados da COVID-19 (2019-nCoV) de 2019 do Johns Hopkins CSSE</a>, que disponibilizamos em <a href="https://docs.databricks.com/data/databricks-datasets.html">/databricks-datasets</a>. Este é um conjunto de dados comumente usado por pesquisadores e analistas para obter insights sobre o número de casos de COVID-19 em todo o mundo. Uma das questões com os dados é que o esquema muda ao longo do tempo.</p><p>Por exemplo, os arquivos que representam os casos de COVID-19 de 1º de março a 21 de março (em 30 de abril de 2020) têm o seguinte esquema:</p><pre># Import old_data old_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../03-21-2020.csv)) old_data.printSchema() root |-- Province/State: string (nullable = true) |-- Country/Region: string (nullable = true) |-- Last Update: timestamp (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Latitude: double (nullable = true) |-- Longitude: double (nullable = true) </pre><p>Mas os arquivos de 22 de março em diante (em 30 de abril) tinham colunas adicionais, incluindo FIPS, Admin2, Active e Combined_Key.</p><pre>new_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../04-21-2020.csv)) new_data.printSchema() root |-- FIPS: integer (nullable = true) |-- Admin2: string (nullable = true) |-- Province_State: string (nullable = true) |-- Country_Region: string (nullable = true) |-- Last_Update: string (nullable = true) |-- Lat: double (nullable = true) |-- Long_: double (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Active: integer (nullable = true) |-- Combined_Key: string (nullable = true) </pre><p><em><i class="text-center">Em nosso código de exemplo, renomeamos algumas das colunas (por exemplo, Long_ -> Longitude, Province/State -> Province_State, etc.) pois elas são semanticamente iguais. Em vez de evoluir o esquema da tabela, apenas renomeamos as colunas.</i></em></p><p>Se a principal preocupação fosse apenas mesclar os esquemas, poderíamos usar o recurso de evolução de esquema do Delta Lake com a opção “mergeSchema” em <code>DataFrame.write()</code>, como mostrado na instrução a seguir.</p><pre>new_data.write.option("mergeSchema", "true").mode("append").save(path) </pre><p>Mas o que acontece se você precisar atualizar um valor existente e mesclar o esquema ao mesmo tempo? Com o Delta Lake 0.6.0, isso pode ser alcançado com a <em>evolução de esquema para operações de merge</em>. Para visualizar isso, vamos começar revisando os old_data, que é <em>uma linha</em>.</p><pre>old_data.select("process_date", "Province_State", "Country_Region", "Last_Update", "Confirmed").show() +------------+--------------+--------------+-------------------+---------+ |process_date|Province_State|Country_Region| Last_Update|Confirmed| +------------+--------------+--------------+-------------------+---------+ | 2020-03-21| Washington| US|2020-03-21 22:43:04| 1793| +------------+--------------+--------------+-------------------+---------+ </pre><p>Em seguida, vamos simular uma entrada de atualização que segue o esquema de new_data</p><pre># Simulate an Updated Entry items = [(53, '', 'Washington', 'US', '2020-04-27T19:00:00', 47.4009, -121.4905, 1793, 94, 0, '', '', '2020-03-21', 2)] cols = ['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Latitude', 'Longitude', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key', 'process_date', 'level'] simulated_update = spark.createDataFrame(items, cols) </pre><p>e unimos simulated_update e new_data com um total de <em>40 linhas</em>.</p><pre>new_data.select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed").sort(col("FIPS")).show(5) +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed| +------------+-----+--------------+--------------+-------------------+---------+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| +------------+-----+--------------+--------------+-------------------+---------+ </pre><p>Definimos o seguinte parâmetro para configurar seu ambiente para <a href="https://docs.delta.io/latest/delta-update.html#automatic-schema-evolution">evolução automática de esquema</a>:</p><pre># Enable automatic schema evolution spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true") </pre><p>Agora podemos executar uma única operação atômica para atualizar os valores (de 21/03/2020) e mesclar o novo esquema com a seguinte instrução.</p><pre>from delta.tables import * deltaTable = DeltaTable.forPath(spark, DELTA_PATH) # Schema Evolution with a Merge Operation deltaTable.alias("t").merge( new_data.alias("s"), "s.process_date = t.process_date AND s.province_state = t.province_state AND s.country_region = t.country_region AND s.level = t.level" ).whenMatchedUpdateAll( ).whenNotMatchedInsertAll( ).execute() </pre><p>Vamos revisar a tabela Delta Lake com a seguinte instrução:</p><pre># Carregar os dados spark.read.format("delta").load(DELTA_PATH) .select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed", "Admin2") .sort(col("FIPS")) .show() +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed|Admin| +------------+-----+--------------+--------------+-------------------+---------+-----+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| Adams | | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| Asotin | | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| Benton | | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| Chelan | +------------+-----+--------------+--------------+-------------------+---------+-----+ </pre><h2>Métricas Operacionais</h2><p>Você pode se aprofundar ainda mais nas métricas operacionais examinando o Histórico da Tabela Delta Lake (coluna operationMetrics) na Spark UI executando a seguinte instrução:</p><pre>deltaTable.history().show() </pre><p>Abaixo está uma saída abreviada do comando anterior.</p><pre>+-------+------+---------+--------------------+ |version|userId|operation| operationMetrics| +-------+------+---------+--------------------+ | 1|100802| MERGE|[numTargetRowsCop...| | 0|100802| WRITE|[numFiles -> 1, n...| +-------+------+---------+--------------------+ </pre><p>Você notará duas versões da tabela, uma para o esquema antigo e outra para o novo esquema. Ao revisar as métricas operacionais abaixo, ela indica que houve 39 linhas inseridas e 1 linha atualizada.</p><pre>{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0",{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0", "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } </pre><p>Você pode entender mais sobre os detalhes por trás dessas métricas operacionais acessando a aba SQL dentro da Spark UI.</p><p style="max-width:450px;"><a href="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" data-lightbox=" "><img class="aligncenter size-full wp-image-109752" style="width:500px;" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" alt="Um exemplo das métricas operacionais agora disponíveis para revisão na Spark UI através do Delta Lake 0.6.0" height="410"></a></p><p>O GIF animado destaca os principais componentes da Spark UI para sua revisão.</p><ol><li>39 linhas iniciais de um arquivo (para 11/04/2020 com o novo esquema) que criaram o DataFrame new_data inicial</li><li>1 linha de atualização simulada gerada que se uniria ao DataFrame new_data</li><li>1 linha do arquivo (para 21/03/2020 com o esquema antigo) que criou o DataFrame old_data.</li><li>Um SortMergeJoin usado para unir os dois DataFrames para serem persistidos em nossa tabela Delta Lake.</li></ol><p>Para se aprofundar em como interpretar essas métricas operacionais, confira o <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">tech talk Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work</a>.</p><p><a class="lightbox-trigger" href="https://www.youtube.com/watch?v=7ewmcdrylsA" data-lightbox=" "><img class="aligncenter size-full wp-image-97884" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-2.png" alt="Um tech talk sobre Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work." height="630"></a></p>
Experimente o Delta Lake com os trechos de código anteriores em sua instância Apache Spark 2.4.5 (ou superior) (no Databricks, experimente com DBR 6.6+). O Delta Lake torna seus data lakes mais confiáveis (seja você criando um novo ou migrando um data lake existente). Para saber mais, consulte https://delta.io/ e junte-se à comunidade Delta Lake via Slack e Google Group. Você pode acompanhar todos os próximos lançamentos e recursos planejados em GitHub milestones. Você também pode experimentar o Delta Lake Gerenciado no Databricks com uma conta gratuita.
Queremos agradecer aos seguintes contribuidores por atualizações, alterações de documentação e contribuições no Delta Lake 0.6.0: Ali Afroozeh, Andrew Fogarty, Anurag870, Burak Yavuz, Erik LaBianca, Gengliang Wang, IonutBoicuAms, Jakub Orłowski, Jose Torres, KevinKarlBob, Michael Armbrust, Pranav Anand, Rahul Govind, Rahul Mahadev, Shixiong Zhu, Steve Suh, Tathagata Das, Timothy Zhang, Tom van Bussel, Wesley Hoffman, Xiao Li, chet, Eugene Koifman, Herman van Hovell, hongdd, lswyyy, lys0716, Mahmoud Mahdi, Maryann Xue
(Esta publicação no blog foi traduzida utilizando ferramentas baseadas em inteligência artificial) Publicação original
