주요 컨텐츠로 이동

심층 분석: Apache Spark Structured Streaming의 워터마킹

blog OG

발행일: 2022년 8월 22일

제품2 min read

작성자: Max Fisher

주요 내용

  • 워터마크는 이벤트 시간 기준으로 처리 진행 상황, 윈도우 집계 생성 시점, 집계 상태 트리밍 시점을 Spark가 이해하도록 돕습니다.
  • 데이터 스트림을 조인할 때 Spark는 기본적으로 입력 스트림 전체에서 관찰된 최소 이벤트 시간을 기준으로 상태를 제거하는 단일 전역 워터마크를 사용합니다.
  • RocksDB를 활용하면 클러스터 메모리 및 GC 일시 중단에 대한 부담을 줄일 수 있습니다.
  • StreamingQueryProgress 및 StateOperatorProgress 객체에는 워터마크가 스트림에 미치는 영향에 대한 핵심 정보가 포함되어 있습니다.

소개

실시간 파이프라인을 구축할 때 팀이 고려해야 할 현실 중 하나는 분산 데이터 수집이 본질적으로 순서가 없다는 것입니다. 또한, 상태 기반 스트리밍 작업의 맥락에서 팀은 시간 기반 집계 및 기타 상태 기반 작업을 올바르게 계산하기 위해 수집하는 데이터 스트림의 이벤트 시간 진행 상황을 제대로 추적할 수 있어야 합니다. 이 모든 문제를 Structured Streaming을 사용하여 해결할 수 있습니다.

예를 들어, 우리가 임대한 채굴 장비의 선제적 유지보수를 지원하는 파이프라인을 구축하는 팀이라고 가정해 보겠습니다. 이 장비는 항상 최상의 상태를 유지해야 하므로 실시간으로 모니터링해야 합니다. 장비 문제를 이해하고 식별하는 데 필요한 상태 기반 집계를 스트리밍 데이터에 대해 수행해야 합니다.

이러한 장비에 대한 예측 유지보수 및 기타 의사 결정을 지원하는 데 필요한 상태 기반 집계를 생성하기 위해 Structured Streaming과 워터마킹을 활용해야 합니다.

워터마킹이란 무엇인가요?

일반적으로 실시간 스트리밍 데이터를 다룰 때 데이터 수집 방식과 전체 애플리케이션의 다운타임과 같은 문제 발생 여부에 따라 이벤트 시간과 처리 시간 사이에 지연이 발생합니다. 이러한 잠재적인 가변 지연으로 인해 데이터를 처리하는 데 사용하는 엔진에는 집계 창을 언제 닫고 집계 결과를 생성할지 결정하는 메커니즘이 필요합니다.

이러한 문제를 해결하기 위해 고정된 지연 시간을 사용하려는 자연스러운 경향이 있을 수 있지만, 이 예시에서는 왜 이것이 최선의 해결책이 아닌지 보여드리겠습니다.

시각적으로 설명하기 위해 오전 10:50부터 오전 11:20 사이에 다양한 시간에 데이터를 수신하는 시나리오를 살펴보겠습니다. 10분마다 텀블링 창을 생성하여 해당 기간 동안 수신된 온도 및 압력 판독값의 평균을 계산합니다.

첫 번째 그림에서는 오전 11:00, 11:10, 11:20에 텀블링 창이 트리거되어 해당 시점의 결과 테이블이 표시됩니다. 오전 11:10경에 이벤트 시간이 오전 10:53인 데이터가 포함된 두 번째 배치 데이터가 도착하면, 이는 오전 11:10에 닫히는 오전 11:00 ~ 11:10 창의 온도 및 압력 평균에 포함됩니다. 이는 올바른 결과를 제공하지 못합니다.

온도 및 압력 데이터 배치를 수집하는 Structured Streaming 파이프라인의 시각적 표현

우리가 원하는 집계에 대한 올바른 결과를 얻으려면, Spark가 집계 창을 언제 닫고 올바른 집계 결과를 생성해야 하는지 이해하도록 하는 워터마크를 정의해야 합니다.

Structured Streaming 애플리케이션에서는 워터마킹이라는 기능을 사용하여 계산하려는 집계에 대한 모든 관련 데이터가 수집되도록 할 수 있습니다. 가장 기본적인 의미에서 워터마크를 정의함으로써 Spark Structured Streaming은 (설정된 지연 시간 기대치를 기반으로) 특정 시간 T까지 모든 데이터를 수집했음을 알게 되어 해당 타임스탬프 T까지의 윈도우 집계를 닫고 생성할 수 있습니다.

두 번째 시각 자료는 10분 워터마크를 구현하고 Spark Structured Streaming에서 Append 모드를 사용한 효과를 보여줍니다.

Structured Streaming 파이프라인에 10분 워터마크를 적용했을 때의 효과를 시각적으로 표현

첫 번째 시나리오에서는 Spark가 10분마다 이전 10분 동안의 윈도우 집계를 내보내지만(예: 오전 11:10에 오전 11:00 ~ 11:10 창을 내보냄), Spark는 이제 관찰된 최대 이벤트 시간에서 지정된 워터마크를 뺀 값이 창의 상한보다 클 때까지 기다렸다가 윈도우 집계를 닫고 출력합니다.

다시 말해, Spark는 오전 11:00 ~ 오전 11:10 창을 내보내기 위해 최신 이벤트 시간에서 10분을 뺀 값이 오전 11:00보다 큰 데이터 포인트를 볼 때까지 기다려야 했습니다. 오전 11:00에는 이 조건이 충족되지 않으므로 Spark의 내부 상태 저장소에서 집계 계산만 초기화합니다. 오전 11:10에도 이 조건은 여전히 충족되지 않지만, 오전 10:53에 대한 새로운 데이터 포인트가 있으므로 내부 상태가 업데이트되지만, 내보내지지는 않습니다. 그런 다음 오전 11:20이 되면 Spark는 이벤트 시간이 오전 11:15인 데이터 포인트를 보게 되고, 오전 11:15에서 10분을 빼면 오전 11:05가 되어 오전 11:00보다 늦으므로 오전 10:50 ~ 오전 11:00 창을 결과 테이블로 내보낼 수 있습니다.

이는 워터마크로 정의된 예상 지연 시간을 기반으로 데이터를 올바르게 통합하여 올바른 결과를 생성합니다. 결과가 내보내지면 해당 상태는 상태 저장소에서 제거됩니다.

파이프라인에 워터마킹 통합하기

Structured Streaming 파이프라인에 이러한 워터마크를 통합하는 방법을 이해하기 위해 이 블로그의 소개 섹션에 명시된 사용 사례를 기반으로 실제 코드 예제를 살펴보겠습니다.

클라우드의 Kafka 클러스터에서 모든 센서 데이터를 수집하고 10분마다 온도 및 압력 평균을 계산하며 예상 시간 편차는 10분이라고 가정해 보겠습니다. 워터마킹을 사용한 Structured Streaming 파이프라인은 다음과 같습니다.

PySpark

여기서는 Kafka에서 데이터를 읽고, 변환 및 집계를 적용한 다음, Delta Lake 테이블에 씁니다. 이 테이블은 Databricks SQL에서 시각화 및 모니터링됩니다. 특정 데이터 샘플에 대해 테이블에 기록된 출력은 다음과 같습니다.

위의 PySpark 코드 샘플에 정의된 스트리밍 쿼리의 출력

워터마킹을 통합하기 위해 먼저 두 가지 항목을 식별해야 했습니다.

  1. 센서 판독값의 이벤트 시간을 나타내는 열
  2. 데이터의 예상 시간 편차 추정치

이전 예제에서 가져온 워터마크는 .withWatermark() 메서드로 정의되며, eventTimestamp 열은 이벤트 시간 열로 사용되고 10분은 예상되는 시간 편차를 나타냅니다.

PySpark

이제 Structured Streaming 파이프라인에서 워터마크를 구현하는 방법을 알았으므로, 스트리밍 조인 작업 및 상태 관리와 같은 다른 항목이 워터마크에 의해 어떻게 영향을 받는지 이해하는 것이 중요합니다. 또한, 파이프라인을 확장함에 따라 데이터 엔지니어가 성능 문제를 피하기 위해 알아야 할 핵심 지표가 있을 것입니다. 워터마킹에 대해 더 깊이 파고들면서 이 모든 것을 탐구할 것입니다.

다양한 출력 모드에서의 워터마크

더 깊이 들어가기 전에, 출력 모드 선택이 설정한 워터마크의 동작에 어떻게 영향을 미치는지 이해하는 것이 중요합니다.

워터마크는 스트리밍 애플리케이션을 append 또는 update 출력 모드에서 실행할 때만 사용할 수 있습니다. 세 번째 출력 모드인 complete 모드에서는 전체 결과 테이블이 스토리지에 기록됩니다. 이 모드는 모든 집계 데이터를 보존해야 하므로 사용할 수 없으며, 따라서 중간 상태를 삭제하기 위해 워터마킹을 사용할 수 없습니다.

이러한 출력 모드가 윈도우 집계 및 워터마크의 맥락에서 갖는 의미는, ‘append’ 모드에서는 집계가 한 번만 생성될 수 있고 업데이트될 수 없다는 것입니다. 따라서 집계가 생성되면 엔진은 집계의 상태를 삭제하여 전체 집계 상태를 제한된 상태로 유지할 수 있습니다. 늦게 도착하는 레코드(근사 워터마크 휴리스틱이 적용되지 않은, 즉 워터마크 지연 기간보다 오래된 레코드)는 집계가 생성되고 집계 상태가 삭제되었기 때문에 필연적으로 삭제되어야 합니다.

반대로, ‘update’ 모드에서는 첫 번째 레코드부터 시작하여 수신되는 각 레코드마다 집계를 반복적으로 생성할 수 있으므로 워터마크는 선택 사항입니다. 워터마크는 엔진이 해당 집계에 대한 더 이상 레코드가 수신되지 않을 것이라고 휴리스틱하게 알 때 상태를 트리밍하는 데만 유용합니다. 상태가 삭제되면 집계 값이 손실되어 업데이트할 수 없으므로 늦게 도착하는 레코드는 다시 삭제되어야 합니다.

상태, 늦게 도착하는 레코드, 그리고 다양한 출력 모드가 Spark에서 실행되는 애플리케이션의 동작에 따라 다른 결과를 초래할 수 있다는 점을 이해하는 것이 중요합니다. 여기서 핵심은 append 모드와 update 모드 모두에서 워터마크가 집계 시간 창에 대한 모든 데이터가 수신되었음을 나타내면 엔진이 창 상태를 트리밍할 수 있다는 것입니다. append 모드에서는 시간 창이 닫히고 워터마크 지연이 발생한 후에만 집계가 생성되는 반면, update 모드에서는 창의 각 업데이트 시마다 생성됩니다.

마지막으로, 워터마크 지연 창을 늘리면 데이터에 대해 파이프라인이 더 오래 기다리게 되어 잠재적으로 데이터 손실이 줄어들 수 있습니다. 즉, 정밀도는 높아지지만 집계를 생성하는 데 걸리는 지연 시간도 늘어납니다. 반대로, 워터마크 지연이 짧으면 정밀도는 낮아지지만 집계를 생성하는 데 걸리는 지연 시간도 줄어듭니다.

창 지연 길이 정밀도 지연 시간
더 긴 지연 창 더 높은 정밀도 더 높은 지연 시간
더 짧은 지연 창 더 낮은 정밀도 더 낮은 지연 시간

워터마킹에 대한 심층 분석

조인 및 워터마킹

스트리밍 애플리케이션에서 조인 작업을 수행할 때, 특히 두 개의 스트림을 조인할 때 알아야 할 몇 가지 고려 사항이 있습니다. 사용 사례를 위해, 기계 전반의 다른 센서에서 캡처한 추가 값과 온도 및 압력 판독값에 대한 스트리밍 데이터셋을 조인한다고 가정해 보겠습니다.

Structured Streaming에서 구현할 수 있는 스트림-스트림 조인에는 내부 조인, 외부 조인, 세미 조인의 세 가지 유형이 있습니다. 스트리밍 애플리케이션에서 조인을 수행할 때 주요 문제는 조인의 한쪽 측면에 대한 불완전한 그림을 가질 수 있다는 것입니다. Spark에 미래에 일치하는 항목이 없을 것이라는 이해를 제공하는 것은 집계를 방출하기 전에 집계에 통합할 새로운 행이 없을 때 Spark가 이해해야 했던 이전 집계 문제와 유사합니다.

Spark가 이를 처리할 수 있도록 하려면 워터마크와 이벤트 시간 제약 조건을 스트림-스트림 조인의 조인 조건 내에서 조합하여 활용할 수 있습니다. 이 조합을 통해 Spark는 늦게 도착하는 레코드를 필터링하고 시간 범위 조건을 통해 조인 작업을 위한 상태를 트리밍할 수 있습니다. 아래 예시에서 이를 보여줍니다.

PySpark

그러나 위의 예시와 달리 각 스트림이 워터마크에 대해 서로 다른 시간 왜곡을 필요로 하는 경우가 있을 수 있습니다. 이 시나리오에서 Spark는 여러 워터마크 정의를 처리하는 정책을 가지고 있습니다. Spark는 데이터 손실을 방지하는 데 있어 최대한의 안전을 보장하기 위해 가장 느린 스트림을 기반으로 하는 단일 전역 워터마크를 유지합니다.

개발자는 spark.sql.streaming.multipleWatermarkPolicymax;로 변경하여 이 동작을 변경할 수 있지만, 이는 느린 스트림의 데이터가 삭제됨을 의미합니다.

워터마크를 사용하거나 활용할 수 있는 전체 조인 작업 범위를 확인하려면 Spark 설명서의 이 섹션을 확인하세요.

워터마크를 사용한 스트림 모니터링 및 관리

Spark가 수백만 개의 키를 관리하고 각 키에 대한 상태를 유지해야 하는 스트리밍 쿼리를 관리할 때, Databricks 클러스터에 기본 제공되는 기본 상태 저장소가 효과적이지 않을 수 있습니다. 메모리 사용량이 증가하고 가비지 컬렉션 일시 중지 시간이 길어지는 것을 볼 수 있습니다. 이 두 가지 모두 Structured Streaming 애플리케이션의 성능과 확장성을 저해합니다.

여기서 RocksDB가 사용됩니다. Spark 구성을 다음과 같이 활성화하여 Databricks에서 RocksDB를 기본적으로 활용할 수 있습니다.

이를 통해 Structured Streaming 애플리케이션을 실행하는 클러스터는 RocksDB를 활용할 수 있으며, 이는 모든 상태를 메모리에 유지하는 대신 기본 메모리에서 상태를 더 효율적으로 관리하고 로컬 디스크/SSD를 활용할 수 있습니다.

메모리 사용량 및 가비지 컬렉션 메트릭 추적 외에도, 워터마킹 및 Structured Streaming을 다룰 때 수집하고 추적해야 하는 다른 주요 지표 및 메트릭이 있습니다. 이러한 메트릭에 액세스하려면 StreamingQueryProgressStateOperatorProgress 객체를 확인할 수 있습니다. 이러한 사용 방법에 대한 예시는 여기의 설명서를 참조하세요.

StreamingQueryProgress 객체에는 호출 시 해당 트리거의 최대, 최소, 평균 이벤트 시간 및 사용된 워터마크 타임스탬프를 반환하는 "eventTime" 메서드가 있습니다. 처음 세 가지는 해당 트리거의 최대, 최소 및 평균 이벤트 시간이며, 마지막 하나는 해당 트리거에 사용된 워터마크입니다.

StreamingQueryProgress 객체의 축약된 예

이러한 정보는 스트리밍 쿼리가 출력하는 결과 테이블의 데이터를 조정하는 데 사용될 수 있으며, 사용되는 워터마크가 의도된 이벤트 시간 타임스탬프인지 확인하는 데에도 사용될 수 있습니다. 이는 데이터를 스트림으로 조인할 때 중요할 수 있습니다.

StateOperatorProgress 객체 내에는 numRowsDroppedByWatermark 메트릭이 있습니다. 이 메트릭은 상태 저장 집계에 포함되기에는 너무 늦은 것으로 간주되는 행 수를 보여줍니다. 이 메트릭은 원시 입력 행이 아닌 집계 후 삭제된 행을 측정하므로 숫자가 정확하지는 않지만 삭제되는 늦은 데이터가 있음을 나타낼 수 있습니다. StreamingQueryProgress 객체의 정보와 함께 이 메트릭을 사용하면 개발자는 워터마크가 올바르게 구성되었는지 확인할 수 있습니다.

여러 집계, 스트리밍 및 워터마크

Structured Streaming 쿼리의 한 가지 남은 제약 사항은 단일 스트리밍 쿼리에서 여러 상태 저장 연산자(예: 집계, 스트리밍 조인)를 연결하는 것입니다. 상태 저장 집계에 대한 단일 전역 워터마크의 이러한 제약 사항은 Databricks에서 해결책을 마련하고 있으며 향후 몇 달 안에 더 많은 정보를 공개할 예정입니다. 자세한 내용은 Project Lightspeed 블로그를 참조하세요: Project Lightspeed: Apache Spark를 이용한 더 빠르고 간편한 스트림 처리 (databricks.com).

결론

Databricks의 Structured Streaming 및 워터마킹을 사용하면 위에 설명된 사용 사례와 같은 조직에서 데이터가 제대로 정렬되지 않거나 제시간에 도착하지 않더라도 실시간 집계로 인한 메트릭이 정확하게 계산되도록 보장하는 복원력 있는 실시간 애플리케이션을 구축할 수 있습니다. Databricks를 사용하여 실시간 애플리케이션을 구축하는 방법에 대해 자세히 알아보려면 Databricks 담당자에게 문의하세요.

(이 글은 AI의 도움을 받아 번역되었습니다. 원문이 궁금하시다면 여기를 클릭해 주세요)

게시물을 놓치지 마세요

관심 있는 카테고리를 구독하고 최신 게시물을 받은편지함으로 받아보세요