주요 컨텐츠로 이동

Delta Lake의 병합 작업에서의 스키마 진화 및 운영 메트릭

Delta Lake 0.6.0은 병합 및 테이블 기록의 운영 메트릭에서 스키마 진화와 성능 개선을 도입합니다.

An example of the operational metrics now available for review in the Spark UI through Delta Lake 0.6.0

발행일: 2020년 5월 19일

솔루션4 min read

작성자: Tathagata Das , Denny Lee

 <em>Databricks의 새로운 전자책에서 제공하는 단계별 안내를 통해 Delta Lake 사용을 시작해 보세요.</em> <a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=operationsmetricsdl-blog-oreillydlupandrunning" target="_blank"><em>O'Reilly의 새 전자책</em></a><em>을 미리 살펴보세요.</em></p><blockquote><p>아래 설명된 단계를 재현하려면 이 <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb" rel="noopener noreferrer" target="_blank">노트북</a>을 사용해 보세요.</p></blockquote><p>최근 저희는 스키마 진화 및 merge 작업의 성능 개선과 테이블 기록의 운영 메트릭을 도입한 <a href="https://github.com/delta-io/delta/releases/tag/v0.6.0">Delta Lake 0.6.0</a>을 출시했습니다. 이번 릴리스의 주요 기능은 다음과 같습니다.</p><ul><li>merge 작업에서 스키마 진화 지원(<a href="https://github.com/delta-io/delta/issues/170">#170</a>) - 이제 merge 작업을 통해 테이블의 스키마를 자동으로 진화시킬 수 있습니다. 이는 테이블에 변경 데이터를 upsert하고 데이터 스키마가 시간이 지남에 따라 변경되는 시나리오에서 유용합니다. upsert 전에 스키마 변경을 감지하고 적용하는 대신, merge는 스키마를 동시에 진화시키고 변경 사항을 upsert할 수 있습니다.</li><li><strong>자동 재분할을 통한 merge 성능 향상</strong>(<a href="https://github.com/delta-io/delta/issues/349">#349</a>) - 파티션 테이블에 merge할 때 테이블에 쓰기 전에 파티션 열별로 데이터를 자동으로 재분할하도록 선택할 수 있습니다. 파티션 테이블에서 merge 작업이 너무 많은 작은 파일을 생성하여 느려지는 경우(<a href="https://github.com/delta-io/delta/issues/345">#345</a>), 자동 재분할(spark.delta.merge.repartitionBeforeWrite)을 활성화하면 성능을 개선할 수 있습니다.</li><li><strong>insert 절이 없을 때 성능 향상</strong>(<a href="https://github.com/delta-io/delta/issues/342">#342</a>) - insert 절이 없는 merge 작업에서 이제 더 나은 성능을 얻을 수 있습니다.</li><li><strong>DESCRIBE HISTORY의 작업 메트릭</strong>(<a href="https://github.com/delta-io/delta/issues/312">#312</a>) - 이제 테이블 기록에서 Delta 테이블의 모든 쓰기, 업데이트 및 삭제에 대한 작업 메트릭(예: 변경된 파일 수 및 행 수)을 볼 수 있습니다.</li><li><strong>모든 파일 시스템에서 Delta 테이블 읽기 지원</strong>(<a href="https://github.com/delta-io/delta/issues/347">#347</a>) - 이제 Hadoop FileSystem 구현이 있는 모든 스토리지 시스템의 Delta 테이블을 읽을 수 있습니다. 그러나 Delta 테이블에 쓰려면 여전히 스토리지 시스템에 필요한 보증을 제공하는 LogStore 구현을 구성해야 합니다.</li></ul><h2>Schema Evolution in Merge Operations</h2><p>Delta Lake의 이전 릴리스에서 언급했듯이, Delta Lake는 <a href="https://www.databricks.com/blog/2019/10/03/simple-reliable-upserts-and-deletes-on-delta-lake-tables-using-python-apis.html" rel="noopener noreferrer" target="_blank">merge 작업을 실행</a>하여 <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">insert/update/delete 작업을 단일 원자적 작업으로</a> 단순화하고 <a href="https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html" rel="noopener noreferrer" target="_blank">스키마를 적용하고 진화</a>할 수 있는 기능을 포함합니다(이 <a href="https://www.youtube.com/watch?v=tjb10n5wVs8">tech talk</a>에서도 자세한 내용을 확인할 수 있습니다). Delta Lake 0.6.0 릴리스를 통해 merge 작업 내에서 스키마를 진화시킬 수 있습니다.</p><p>이를 보여주기 위해 시기적절한 예제를 사용해 보겠습니다. 원본 코드 샘플은 <a href="https://github.com/databricks/tech-talks/blob/master/samples/Schema%20Evolution%20in%20Merge%20Operations.ipynb">이 노트북</a>에서 찾을 수 있습니다. Johns Hopkins CSSE의 <a href="https://github.com/CSSEGISandData/COVID-19">2019 Novel Coronavirus COVID-19 (2019-nCoV) 데이터 리포지토리</a>의 작은 하위 집합으로 시작하겠습니다. 이 데이터셋은 연구원과 분석가가 전 세계 COVID-19 사례 수에 대한 통찰력을 얻기 위해 일반적으로 사용합니다. 이 데이터의 문제점 중 하나는 스키마가 시간이 지남에 따라 변경된다는 것입니다.</p><p>예를 들어, 3월 1일부터 3월 21일까지의 COVID-19 사례를 나타내는 파일(2020년 4월 30일 기준)은 다음과 같은 스키마를 가집니다.</p><pre># Import old_data old_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../03-21-2020.csv)) old_data.printSchema() root |-- Province/State: string (nullable = true) |-- Country/Region: string (nullable = true) |-- Last Update: timestamp (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Latitude: double (nullable = true) |-- Longitude: double (nullable = true) </pre><p>하지만 3월 22일 이후의 파일(4월 30일 기준)에는 FIPS, Admin2, Active, Combined_Key와 같은 추가 열이 포함되었습니다.</p><pre>new_data = (spark.read.option("inferSchema", True).option("header", True)... .csv(/databricks-datasets/COVID/.../04-21-2020.csv)) new_data.printSchema() root |-- FIPS: integer (nullable = true) |-- Admin2: string (nullable = true) |-- Province_State: string (nullable = true) |-- Country_Region: string (nullable = true) |-- Last_Update: string (nullable = true) |-- Lat: double (nullable = true) |-- Long_: double (nullable = true) |-- Confirmed: integer (nullable = true) |-- Deaths: integer (nullable = true) |-- Recovered: integer (nullable = true) |-- Active: integer (nullable = true) |-- Combined_Key: string (nullable = true) </pre><p><em><i class="text-center">샘플 코드에서는 의미상 동일한 열(예: Long_ -&gt; Longitude, Province/State -&gt; Province_State 등)의 이름을 변경했습니다. 테이블 스키마를 진화하는 대신 단순히 열 이름을 변경했습니다.</i></em></p><p>스키마를 병합하는 것이 주요 관심사였다면, 다음 문에서 볼 수 있듯이 <code>DataFrame.write()</code>의 "mergeSchema" 옵션을 사용하여 Delta Lake의 스키마 진화 기능을 사용할 수 있습니다.</p><pre>new_data.write.option("mergeSchema", "true").mode("append").save(path) </pre><p>하지만 기존 값을 업데이트하고 동시에 스키마를 병합해야 하는 경우에는 어떻게 될까요? Delta Lake 0.6.0을 사용하면 <em>merge 작업을 위한 스키마 진화</em>를 통해 이를 달성할 수 있습니다. 이를 시각화하기 위해 <em>한 행</em>인 old_data를 검토하는 것부터 시작하겠습니다.</p><pre>old_data.select("process_date", "Province_State", "Country_Region", "Last_Update", "Confirmed").show() +------------+--------------+--------------+-------------------+---------+ |process_date|Province_State|Country_Region| Last_Update|Confirmed| +------------+--------------+--------------+-------------------+---------+ | 2020-03-21| Washington| US|2020-03-21 22:43:04| 1793| +------------+--------------+--------------+-------------------+---------+ </pre><p>다음으로 new_data의 스키마를 따르는 업데이트 항목을 시뮬레이션해 보겠습니다.</p><pre># Simulate an Updated Entry items = [(53, '', 'Washington', 'US', '2020-04-27T19:00:00', 47.4009, -121.4905, 1793, 94, 0, '', '', '2020-03-21', 2)] cols = ['FIPS', 'Admin2', 'Province_State', 'Country_Region', 'Last_Update', 'Latitude', 'Longitude', 'Confirmed', 'Deaths', 'Recovered', 'Active', 'Combined_Key', 'process_date', 'level'] simulated_update = spark.createDataFrame(items, cols) </pre><p>simulated_update와 new_data를 합쳐 총 <em>40개 행</em>을 만듭니다.</p><pre>new_data.select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed").sort(col("FIPS")).show(5) +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed| +------------+-----+--------------+--------------+-------------------+---------+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| +------------+-----+--------------+--------------+-------------------+---------+ </pre><p><em>자동 스키마 진화</em>를 위한 환경을 구성하기 위해 다음 매개변수를 설정합니다.</p><pre># Enable automatic schema evolution spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true") </pre><p>이제 다음 문을 사용하여 값을 업데이트(2020년 3월 21일 기준)하고 새 스키마를 병합하는 단일 원자적 작업을 실행할 수 있습니다.</p><pre>from delta.tables import * deltaTable = DeltaTable.forPath(spark, DELTA_PATH) # Schema Evolution with a Merge Operation deltaTable.alias("t").merge( new_data.alias("s"), "s.process_date = t.process_date AND s.province_state = t.province_state AND s.country_region = t.country_region AND s.level = t.level" ).whenMatchedUpdateAll( ).whenNotMatchedInsertAll( ).execute() </pre><p>다음 문을 사용하여 Delta Lake 테이블을 검토해 보겠습니다.</p><pre># 데이터 로드 spark.read.format("delta").load(DELTA_PATH) .select("process_date", "FIPS", "Province_State", "Country_Region", "Last_Update", "Confirmed", "Admin2") .sort(col("FIPS")) .show() +------------+-----+--------------+--------------+-------------------+---------+ |process_date| FIPS|Province_State|Country_Region| Last_Update|Confirmed|Admin| +------------+-----+--------------+--------------+-------------------+---------+-----+ | 2020-03-21| 53| Washington| US|2020-04-27T19:00:00| 1793| | | 2020-04-11|53001| Washington| US|2020-04-11 22:45:33| 30| Adams | | 2020-04-11|53003| Washington| US|2020-04-11 22:45:33| 4| Asotin | | 2020-04-11|53005| Washington| US|2020-04-11 22:45:33| 244| Benton | | 2020-04-11|53007| Washington| US|2020-04-11 22:45:33| 53| Chelan | +------------+-----+--------------+--------------+-------------------+---------+-----+ </pre><h2>운영 지표</h2><p>Spark UI에서 Delta Lake 테이블 기록(operationMetrics 열)을 살펴보면 운영 지표를 더 자세히 확인할 수 있습니다. 다음 문을 실행해 보세요:</p><pre>deltaTable.history().show() </pre><p>아래는 이전 명령의 축약된 출력입니다.</p><pre>+-------+------+---------+--------------------+ |version|userId|operation| operationMetrics| +-------+------+---------+--------------------+ | 1|100802| MERGE|[numTargetRowsCop...| | 0|100802| WRITE|[numFiles -&gt; 1, n...| +-------+------+---------+--------------------+ </pre><p>테이블의 두 가지 버전이 표시됩니다. 하나는 이전 스키마용이고 다른 하나는 새 스키마용입니다. 아래 운영 지표를 검토할 때 39개의 행이 삽입되고 1개의 행이 업데이트되었음을 알 수 있습니다.</p><pre>{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0",{ "numTargetRowsCopied":"0", "numTargetRowsDeleted":"0", "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } "numTargetFilesAdded":"3", "numTargetRowsInserted":"39", "numTargetRowsUpdated":"1", "numOutputRows":"40", "numSourceRows":"40", "numTargetFilesRemoved":"1" } </pre><p>Spark UI의 SQL 탭으로 이동하여 이러한 운영 지표에 대한 자세한 내용을 이해할 수 있습니다.</p><p style="max-width:450px;"><a href="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" data-lightbox=" "><img class="aligncenter size-full wp-image-109752" style="width:500px;" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-og3.gif" alt="Delta Lake 0.6.0을 통해 Spark UI에서 검토할 수 있는 운영 지표의 예" height="410"></a></p><p>애니메이션 GIF는 검토를 위해 Spark UI의 주요 구성 요소를 강조 표시합니다.</p><ol><li>초기 새_데이터 DataFrame을 생성한 파일 하나의 초기 행 39개(새 스키마가 적용된 2020년 4월 11일자)</li><li>새_데이터 DataFrame과 union될 시뮬레이션된 업데이트 행 1개 생성</li><li>이전 스키마가 적용된 파일 하나의 행 1개(2020년 3월 21일자)로 old_data DataFrame 생성</li><li>Delta Lake 테이블에 영구 저장하기 위해 두 DataFrame을 union하는 데 사용된 SortMergeJoin</li></ol><p>이러한 운영 지표를 해석하는 방법에 대해 자세히 알아보려면 <a href="https://www.youtube.com/watch?v=7ewmcdrylsA">Diving into Delta Lake Part 3: DELETE, UPDATE, MERGE는 어떻게 작동하나요 기술 강연</a>을 확인하세요.</p><p><a class="lightbox-trigger" href="https://www.youtube.com/watch?v=7ewmcdrylsA" data-lightbox=" "><img class="aligncenter size-full wp-image-97884" src="https://www.databricks.com/wp-content/uploads/2020/05/blog-delta-lake-6-2.png" alt="Diving into Delta Lake Part 3: DELETE, UPDATE, MERGE는 어떻게 작동하나요 기술 강연." height="630"></a></p>

5X 리더

Gartner®: Databricks 클라우드 데이터베이스 리더

Delta Lake 0.6.0 시작하기

Apache Spark 2.4.5(이상) 인스턴스에서 앞서 제공된 코드 조각으로 Delta Lake를 사용해 보세요(Databricks에서는 DBR 6.6 이상으로 시도해 보세요). Delta Lake는 데이터 레이크를 더 안정적으로 만듭니다(새로운 데이터 레이크를 생성하든 기존 데이터 레이크를 마이그레이션하든 상관없습니다). 자세한 내용은 https://delta.io/를 참조하고 Slack 및 Google 그룹을 통해 Delta Lake 커뮤니티에 참여하세요. GitHub 마일스톤에서 예정된 모든 릴리스와 계획된 기능을 추적할 수 있습니다. 무료 계정으로 Databricks에서 관리형 Delta Lake를 사용해 볼 수도 있습니다.

감사의 말

Delta Lake 0.6.0의 업데이트, 문서 변경 및 기여에 대해 다음 기여자들에게 감사드립니다: Ali Afroozeh, Andrew Fogarty, Anurag870, Burak Yavuz, Erik LaBianca, Gengliang Wang, IonutBoicuAms, Jakub Orłowski, Jose Torres, KevinKarlBob, Michael Armbrust, Pranav Anand, Rahul Govind, Rahul Mahadev, Shixiong Zhu, Steve Suh, Tathagata Das, Timothy Zhang, Tom van Bussel, Wesley Hoffman, Xiao Li, chet, Eugene Koifman, Herman van Hovell, hongdd, lswyyy, lys0716, Mahmoud Mahdi, Maryann Xue

(이 글은 AI의 도움을 받아 번역되었습니다. 원문이 궁금하시다면 여기를 클릭해 주세요)

게시물을 놓치지 마세요

관심 있는 카테고리를 구독하고 최신 게시물을 받은편지함으로 받아보세요