주요 컨텐츠로 이동

Databricks Delta Live Tables를 사용하여 변경 데이터 캡처 간소화

db-129-cdc-og

발행일: 2022년 4월 25일

데이터 사이언스 및 ML4 min read

작성자: Mojgan Mazouchi

이 가이드에서는 Delta Live Tables 파이프라인에서 변경 데이터 캡처(Change Data Capture)를 활용하여 새 레코드를 식별하고 데이터 레이크의 데이터셋에 대한 변경 사항을 캡처하는 방법을 보여줍니다. Delta Live Tables 파이프라인을 사용하면 확장 가능하고 안정적이며 낮은 지연 시간의 데이터 파이프라인을 개발할 수 있으며, 최소한의 컴퓨팅 리소스와 원활한 순서 없는 데이터 처리를 통해 데이터 레이크에서 변경 데이터 캡처를 수행할 수 있습니다.

참고: Delta Live Tables(DLT)를 사용하여 확장 가능하고 안정적인 파이프라인을 생성하고 선언적 ETL 정의를 사용하는 방법을 설명하는 Delta Live Tables 시작하기를 따르는 것이 좋습니다.

변경 데이터 캡처 배경

변경 데이터 캡처(Change Data Capture, CDC)는 데이터베이스에서 증분 변경(데이터 삭제, 삽입 및 업데이트)을 식별하고 캡처하는 프로세스로, 거의 실시간 데이터 애플리케이션을 위해 고객, 주문 또는 제품 상태를 추적하는 것과 같습니다. CDC는 새로운 이벤트가 발생함에 따라 데이터를 지속적으로 증분 방식으로 처리하여 실시간 데이터 진화를 제공합니다.
2025년까지 80% 이상의 조직이 멀티 클라우드 전략을 구현할 계획이므로, 여러 환경에 걸쳐 ETL 파이프라인에서 모든 데이터 변경 사항을 실시간으로 원활하게 중앙 집중화할 수 있는 비즈니스에 적합한 접근 방식을 선택하는 것이 중요합니다.

CDC 이벤트를 캡처함으로써 Databricks 사용자는 Lakehouse의 Delta Table로 소스 테이블을 다시 만들고 그 위에서 분석을 실행할 수 있으며, 외부 시스템과 데이터를 결합할 수 있습니다. Databricks의 Delta Lake에 있는 MERGE INTO 명령을 사용하면 데이터 레이크에서 레코드를 효율적으로 upsert하고 삭제할 수 있습니다. 이 주제에 대한 이전 심층 분석은 여기에서 확인할 수 있습니다. 이는 많은 Databricks 고객이 Delta Lakes를 사용하여 수행하고 데이터 레이크를 실시간 비즈니스 데이터로 최신 상태로 유지하는 일반적인 사용 사례입니다.

Delta Lake는 데이터 레이크에서 실시간 CDC 동기화를 위한 완전한 솔루션을 제공하지만, 이제 Delta Live Tables의 변경 데이터 캡처 기능을 발표하게 되어 아키텍처를 더욱 단순하고 효율적이며 확장 가능하게 만들 수 있습니다. DLT를 사용하면 SQL 및 Python을 사용하여 CDC 데이터를 원활하게 수집할 수 있습니다.

이전의 델타 테이블을 사용한 CDC 솔루션은 MERGE INTO 작업을 사용했으며, 이는 대상 Delta 테이블의 동일한 행을 업데이트하려고 할 때 소스 데이터셋의 여러 행이 일치하는 경우 오류를 방지하기 위해 데이터를 수동으로 정렬해야 했습니다. 순서 없는 데이터를 처리하기 위해 foreachBatch 구현을 사용하여 소스 테이블을 사전 처리하여 여러 일치가 발생할 가능성을 제거하고 각 키에 대한 최신 변경 사항만 유지하는 추가 단계가 필요했습니다( 변경 데이터 캡처 예제 참조). DLT 파이프라인의 새로운 APPLY CHANGES INTO 작업은 데이터 엔지니어링의 수동 개입 없이도 순서 없는 데이터를 자동으로 원활하게 처리합니다.

Databricks Delta Live Tables를 사용한 CDC

이 블로그에서는 외부 시스템에서 CDC 데이터가 오는 일반적인 CDC 사용 사례에 대해 Delta Live Tables 파이프라인에서 APPLY CHANGES INTO 명령을 사용하는 방법을 보여줍니다. Debezium, Fivetran, Qlik Replicate, Talend, StreamSets와 같은 다양한 CDC 도구를 사용할 수 있습니다. 특정 구현은 다르지만, 이러한 도구는 일반적으로 데이터 변경 기록을 로그에 캡처하고 기록하며, 다운스트림 애플리케이션은 이러한 CDC 로그를 소비합니다. 저희 예제에서는 Debezium, Fivetran 등과 같은 CDC 도구에서 클라우드 객체 스토리지에 데이터가 저장됩니다.

다양한 CDC 도구에서 클라우드 객체 스토리지 또는 Apache Kafka와 같은 메시지 큐에 데이터를 저장합니다. 일반적으로 CDC는 우리가 메달리온 아키텍처라고 부르는 것으로 수집하는 데 사용됩니다. 메달리온 아키텍처는 Lakehouse에서 데이터를 논리적으로 구성하는 데 사용되는 데이터 설계 패턴으로, 아키텍처의 각 계층을 통과하는 데이터의 구조와 품질을 점진적으로 개선하는 것을 목표로 합니다. Delta Live Tables를 사용하면 CDC 피드에서 Lakehouse의 테이블로 변경 사항을 원활하게 적용할 수 있습니다. 이 기능을 메달리온 아키텍처와 결합하면 점진적인 변경 사항이 분석 워크로드에 대규모로 쉽게 흐를 수 있습니다. 메달리온 아키텍처와 함께 CDC를 사용하면 변경되거나 추가된 데이터만 처리하면 되므로 사용자에게 여러 가지 이점을 제공합니다. 따라서 비용 효율적으로 골드 테이블을 최신 비즈니스 데이터로 최신 상태로 유지할 수 있습니다.

참고: 여기의 예제는 CDC의 SQL 및 Python 버전 모두에 적용되며, 작업 사용 방법에 대한 특정 방법도 적용됩니다. 변형을 평가하려면 공식 문서 여기를 참조하십시오.

전제 조건

이 가이드를 최대한 활용하려면 다음 사항에 대한 기본적인 숙지가 필요합니다.

  • SQL 또는 Python
  • Delta Live Tables
  • ETL 파이프라인 개발 및/또는 빅 데이터 시스템 작업
  • Databricks 대화형 노트북 및 클러스터
  • 새 클러스터를 생성하고, 작업을 실행하고, 외부 클라우드 객체 스토리지 또는 DBFS의 위치에 데이터를 저장할 수 있는 권한이 있는 Databricks 작업 공간에 액세스할 수 있어야 합니다.
  • 이 블로그에서 만드는 파이프라인의 경우, 데이터 품질 제약 조건 적용을 지원하는 "Advanced" 제품 에디션을 선택해야 합니다.
     

데이터셋

여기서는 외부 데이터베이스에서 실제와 유사한 CDC 데이터를 사용합니다. 이 파이프라인에서는 Faker 라이브러리를 사용하여 Debezium과 같은 CDC 도구가 생성하여 Databricks로 초기 수집을 위해 클라우드 스토리지로 가져올 수 있는 데이터셋을 생성합니다. Auto Loader를 사용하여 클라우드 객체 스토리지에서 메시지를 점진적으로 로드하고 원시 메시지를 저장하는 Bronze 테이블에 저장합니다. Bronze 테이블은 데이터 수집을 위한 것이며, 단일 진실 공급원에 빠르게 액세스할 수 있도록 합니다. 다음으로, 정리된 Bronze 계층 테이블에서 APPLY CHANGES INTO를 수행하여 Silver 테이블로 변경 사항을 전파합니다. 데이터가 Silver 테이블로 흐르면 일반적으로 기업이 모든 주요 비즈니스 엔터티를 볼 수 있도록 더 정제되고 최적화됩니다("just-enough"). 아래 다이어그램을 참조하십시오.

샘플 CDC 흐름 (CDC 도구, Autoloader 및 Delta Live Table 파이프라인)

이 블로그는 고객 이름, 이메일, 주소 및 ID의 네 가지 필드와 작업(operation)(operation 코드(DELETE, APPEND, UPDATE, CREATE)를 저장함) 및 operation_date(각 작업 작업에 대한 레코드가 온 날짜 및 타임스탬프를 저장함)를 설명하는 두 가지 필드를 포함하는 JSON 메시지가 필요한 간단한 예제에 중점을 둡니다.

위 필드를 사용하여 샘플 데이터셋을 생성하기 위해 가짜 데이터를 생성하는 Python 패키지인 Faker를 사용합니다. 이 데이터 생성 섹션과 관련된 노트북은 여기에서 찾을 수 있습니다. 이 노트북에서는 이름과 생성된 데이터를 쓸 저장 위치를 제공합니다. Databricks의 DBFS 기능을 사용하며, 작동 방식에 대해 자세히 알아보려면 DBFS 설명서를 참조하십시오. 그런 다음 PySpark 사용자 정의 함수를 사용하여 각 필드에 대한 합성 데이터셋을 생성하고 데이터를 정의된 저장 위치에 다시 작성하며, 이 위치는 합성 데이터셋에 액세스하기 위해 다른 노트북에서 참조할 것입니다.

Auto Loader를 사용한 원시 데이터셋 수집

메달리온 아키텍처 패러다임에 따르면, 브론즈 계층은 가장 원시적인 데이터 품질을 보유합니다. 이 단계에서는 클라우드 스토리지의 위치에서 Autoloader를 사용하여 새 데이터를 점진적으로 읽을 수 있습니다. 여기서는 파이프라인 설정의 구성 섹션에 생성된 데이터셋의 경로를 추가하여 소스 경로를 변수로 로드할 수 있습니다. 따라서 파이프라인 설정의 구성은 다음과 같습니다.

그런 다음 노트북에서 이 구성 속성을 로드합니다.

수집할 Bronze 테이블을 살펴보겠습니다. a. SQL 및 b. Python 사용

a. SQL

b. Python

위의 문들은 Auto Loader를 사용하여 JSON 파일로부터 customer_bronze라는 스트리밍 라이브 테이블을 생성합니다. Delta Live Tables에서 Autoloader를 사용할 때 스키마나 체크포인트에 대한 위치를 제공할 필요가 없습니다. 해당 위치는 DLT 파이프라인에서 자동으로 관리되기 때문입니다.

Auto Loader는 SQL에서는 cloud_files, Python에서는 cloudFiles라는 이름의 Structured Streaming 소스를 제공하며, 이 소스는 클라우드 스토리지 경로와 형식을 매개변수로 받습니다.
컴퓨팅 비용을 줄이기 위해, 매우 낮은 지연 시간 요구 사항이 없다면 마이크로 배치로 Triggered 모드에서 DLT 파이프라인을 실행하는 것을 권장합니다.

기대값과 고품질 데이터

고품질의 다양하고 접근 가능한 데이터셋을 생성하는 다음 단계로, Constraints를 사용하여 품질 검사 기대 기준을 적용합니다. 현재 제약 조건은 retain, drop, 또는 fail이 될 수 있습니다. 더 자세한 내용은 여기를 참조하세요. 모든 제약 조건은 간소화된 품질 모니터링을 위해 기록됩니다.

a. SQL

b. Python

APPLY CHANGES INTO 문을 사용하여 다운스트림 대상 테이블로 변경 사항 전파

APPLY CHANGES INTO 쿼리를 실행하기 전에, 최신 데이터를 보유할 대상 스트리밍 테이블이 있는지 확인해야 합니다. 만약 존재하지 않는다면 생성해야 합니다. 아래 셀들은 대상 스트리밍 테이블을 생성하는 예시입니다. 이 블로그 게시 시점에는 대상 스트리밍 테이블 생성 문과 APPLY CHANGES INTO 쿼리가 함께 필요하며, 둘 다 파이프라인에 존재해야 합니다. 그렇지 않으면 테이블 생성 쿼리가 실패합니다.

a. SQL

b. Python

이제 대상 스트리밍 테이블이 준비되었으므로, APPLY CHANGES INTO 쿼리를 사용하여 다운스트림 대상 테이블로 변경 사항을 전파할 수 있습니다. CDC 피드는 INSERT, UPDATE, DELETE 이벤트를 포함하지만, DLT의 기본 동작은 소스 데이터셋의 모든 레코드에서 INSERT 및 UPDATE 이벤트를 기본 키로 일치시켜 적용하고, 이벤트 순서를 식별하는 필드로 정렬하는 것입니다. 더 구체적으로는, 대상 테이블에서 기본 키(들)와 일치하는 행을 업데이트하거나, 대상 스트리밍 테이블에 일치하는 레코드가 없을 때 새 행을 삽입합니다. DELETE 이벤트를 처리하기 위해 SQL에서는 APPLY AS DELETE WHEN을 사용하거나, Python에서는 해당 기능인 apply_as_deletes 인수를 사용할 수 있습니다.

이 예시에서는 고객을 고유하게 식별하고 CDC 이벤트가 대상 스트리밍 테이블의 해당 고객 레코드에 적용되도록 하는 기본 키로 "id"를 사용했습니다. "operation_date"는 소스 데이터셋에서 CDC 이벤트의 논리적 순서를 유지하므로, SQL에서는 SEQUENCE BY operation_date를 사용하거나, Python에서는 해당 기능인 sequence_by = col("operation_date")를 사용하여 순서가 잘못 도착한 변경 이벤트를 처리합니다. SEQUENCE BY (또는 sequence_by)와 함께 사용하는 필드 값은 동일한 키에 대한 모든 업데이트 중에서 고유해야 한다는 점을 명심하세요. 대부분의 경우, sequence by 컬럼은 타임스탬프 정보가 있는 컬럼이 될 것입니다.

마지막으로, SQL에서는 "COLUMNS * EXCEPT (operation, operation_date, _rescued_data)"를 사용하거나, Python에서는 해당 기능인 "except_column_list"= ["operation", "operation_date", "_rescued_data"]를 사용하여 "operation", "operation_date", "_rescued_data"의 세 컬럼을 대상 스트리밍 테이블에서 제외했습니다. "COLUMNS" 절을 지정하지 않으면 기본적으로 모든 컬럼이 대상 스트리밍 테이블에 포함됩니다.

a. SQL

b. Python

사용 가능한 절의 전체 목록을 보려면 여기를 참조하세요.
이 블로그 게시 시점에는 APPLY CHANGES INTO 쿼리 또는 apply_changes 함수의 대상에서 읽는 테이블은 라이브 테이블이어야 하며, 스트리밍 라이브 테이블은 될 수 없다는 점에 유의하세요.

참고용으로 SQLPython 노트북을 사용할 수 있습니다. 이제 모든 셀이 준비되었으므로, 클라우드 객체 스토리지에서 데이터를 수집하기 위한 파이프라인을 만들어 보겠습니다. 작업 공간에서 새 탭 또는 창으로 Jobs를 열고 "Delta Live Tables"를 선택하세요.

이 블로그와 관련된 파이프라인은 다음과 같은 DLT 파이프라인 설정을 가지고 있습니다:

  1. 새 파이프라인을 생성하려면 "Create Pipeline"을 선택합니다.
  2. "Retail CDC Pipeline"과 같은 이름을 지정합니다.
  3. 이전에 생성한 노트북 경로를 지정합니다. 하나는 Faker 패키지를 사용하여 생성된 데이터셋용이고, 다른 하나는 DLT에서 생성된 데이터를 수집하기 위한 경로입니다. 두 번째 노트북 경로는 선택한 언어에 따라 SQL 또는 Python으로 작성된 노트북을 참조할 수 있습니다.
  4. 첫 번째 노트북에서 생성된 데이터에 액세스하려면 구성에 데이터 세트 경로를 추가하세요. 여기서는 데이터를 "/tmp/demo/cdc_raw/customers"에 저장했으므로, 두 번째 노트북에서 "source"를 참조하기 위해 "source"를 "/tmp/demo/cdc_raw/"로 설정했습니다.
  5. 결과 테이블을 쿼할 수 있는 대상(선택 사항이며 대상 데이터베이스를 참조함)을 지정하세요.
  6. DLT에서 생성된 데이터 세트와 파이프라인의 메타데이터 로그에 액세스하기 위해 객체 스토리지에 스토리지 위치를 지정하세요(선택 사항).
  7. 파이프라인 모드를 Triggered로 설정하세요. Triggered 모드에서는 DLT 파이프라인이 소스의 새 데이터를 한 번에 모두 소비하고, 처리가 완료되면 컴퓨팅 리소스를 자동으로 종료합니다. 파이프라인 설정 편집 시 Triggered 모드와 Continuous 모드 간에 전환할 수 있습니다. JSON에서 "continuous": false를 설정하는 것은 파이프라인을 Triggered 모드로 설정하는 것과 동일합니다.
  8. 이 워크로드의 경우 Autopilot 옵션에서 자동 확장을 비활성화하고 워커 클러스터 1개만 사용할 수 있습니다. 프로덕션 워크로드의 경우 자동 확장을 활성화하고 클러스터 크기에 필요한 최대 워커 수를 설정하는 것이 좋습니다.
  9. "Start"를 선택하세요.
  10. 파이프라인이 생성되어 현재 실행 중입니다!

샘플 Delta Live Table 파이프라인이 변경 사항을 다운스트림 테이블로 전파하는 모습

기술 가이드 eBook

MLOps의 Big Book

DLT 파이프라인 계보 가시성 및 데이터 품질 모니터링

모든 DLT 파이프라인 로그는 파이프라인의 스토리지 위치에 저장됩니다. 파이프라인을 생성할 때만 스토리지 위치를 지정할 수 있습니다. 파이프라인이 생성되면 스토리지 위치를 더 이상 수정할 수 없습니다.

이 주제에 대한 이전 심층 분석은 여기에서 확인할 수 있습니다. 이 블로그와 관련된 예제 DLT 파이프라인에서 파이프라인 가시성 및 데이터 품질 모니터링을 보려면 이 노트북을 사용해 보세요.

결론

이 블로그에서는 사용자가 Delta Live Tables(DLT)를 사용하여 Lakehouse 플랫폼에 변경 데이터 캡처(CDC)를 효율적으로 구현할 수 있도록 얼마나 간편하게 만들었는지 보여주었습니다. DLT는 파이프라인 작업을 깊이 있게 파악하고, 파이프라인 계보를 관찰하며, 스키마를 모니터링하고, 파이프라인의 각 단계에서 품질 검사를 수행하는 등 내장된 품질 제어를 제공합니다. DLT는 스트리밍 워크로드를 위한 자동 오류 처리와 최고의 자동 확장 기능을 지원하여 사용자가 워크로드에 필요한 최적의 리소스로 품질 데이터를 확보할 수 있도록 합니다.

데이터 엔지니어는 이제 DLT에서 SQL 또는 Python으로 새로운 선언적 APPLY CHANGES INTO API를 사용하여 CDC를 쉽게 구현할 수 있습니다. 이 새로운 기능을 통해 ETL 파이프라인은 수만 개의 테이블에 걸쳐 변경 사항을 쉽게 식별하고 저지연 지원으로 해당 변경 사항을 적용할 수 있습니다.

직접 Delta Live Tables에서 CDC를 시작하고 사용해 볼 준비가 되셨나요?
Delta Live Tables가 데이터 변환 및 ETL의 복잡성을 어떻게 단순화하는지 알아보려면 이 웨비나를 시청하고, Delta Live Tables를 사용한 변경 데이터 캡처에 대한 문서, 공식 github를 확인하고 이 동영상의 단계를 따라 파이프라인을 생성하세요!

(이 글은 AI의 도움을 받아 번역되었습니다. 원문이 궁금하시다면 여기를 클릭해 주세요)

게시물을 놓치지 마세요

관심 있는 카테고리를 구독하고 최신 게시물을 받은편지함으로 받아보세요