주요 컨텐츠로 이동

새로운 transformWithState API를 사용한 지속적인 환경 모니터링

Apache Spark™의 새로운 상태 유지 스트리밍 API를 활용한 실용적인 사용 사례 탐색

Continuous Environmental Monitoring Using the New transformWithState API

Published: July 30, 2025

솔루션3분 소요

Summary

  • Spark 4.0의 새로운 transformWithState API를 사용하여 운영 모니터링 파이프라인 구축 배우기
  • ValueState, ListState, 그리고 MapState를 사용하여 도시 전체의 센서 데이터를 추적, 알림, 분석하기
  • TTL, 타이머, 그리고 Pandas 지원과 같은 기능을 활용하여 신뢰성과 디버깅 가능성 향상

Apache Spark 의 스트리밍 기능은 처음 시작할 때부터 간단한 상태 없는 처리에서 시작하여 각 배치가 독립적으로 작동하는 방식에서 크게 발전했습니다. 진정한 변화는 mapGroupsWithState 와 이후의 flatMapGroupsWithState와 같은 API를 통해 상태 유지 처리 기능이 추가되면서 왔습니다. 이를 통해 개발자들은 스트리밍 마이크로 배치 간에 상태를 유지하고 업데이트할 수 있게 되었습니다. 이러한 상태 유지 작업은 연속적인 데이터 스트림에서 복잡한 이벤트 처리, 이상 탐지, 패턴 인식의 가능성을 열었습니다.

Apache Spark 구조화된 스트리밍의 최신 추가 기능인, transformWithState는 상태 유지 스트림 처리의 중요한 발전을 나타내며, 이전의,flatMapGroupsWithStateapplyInPandasWithState,에 비해 임의의 상태 유지 처리를 더 효과적으로 실행하는 여러 가지 이점을 제공합니다. Apache Spark 4.0을 통해 이 프레임워크는 표현력과 성능의 새로운 높이에 도달했습니다. 이 최신 진화는 시간을 걸쳐 컨텍스트를 유지하면서 초당 수백만 개의 이벤트를 처리하는 복잡한 실시간 데이터 애플리케이션을 구축하는 데 필요한 포괄적인 도구 세트를 제공합니다.

시나리오 깊이 분석

우리는 환경 모니터링 시스템을 예로 들어 transformWithStateInPandas의 기능을 보여줄 것입니다. 이 시스템에서는 센서 데이터의 연속적인 스트림을 수집, 처리, 분석합니다. 우리의 예시는 환경 데이터에 초점을 맞추고 있지만, 이와 같은 접근 방식은 장비 텔레메트리, 물류 추적, 산업 자동화 등 많은 운영적인 사용 사례에 적용될 수 있습니다.

기초

시간 동안 한 장소의 온도, 습도, 이산화탄소 농도, 미세먼지를 모니터링하고 있으며, 이러한 측정치의 평균 값이 임계값을 초과하거나 미달하면 경고를 발생시켜야 한다고 상상해 보세요.

이곳이 바로 ValueState API가 활용되는 곳입니다. 이들은 기본형이나 복잡한 구조체로 상태를 저장하는 데 사용할 수 있습니다. 어떻게 작동하는지 살펴봅시다.

ValueState 구현

단일 센서부터 시작해봅시다. 몇 초마다 이 센서는 다음과 같은 읽기를 보냅니다:

각 센서, 위치, 도시에 대해 현재 상태뿐만 아니라 역사적 맥락을 추적하는 상태를 유지해야 합니다. 이것을 센서의 메모리로 생각할 수 있으며, 마지막으로 읽은 타임스탬프부터 생성된 경고의 수까지 모든 것을 추적합니다. 이 전체 사진을 포착하기 위해 우리는 ValueState 스키마를 설계합니다:

환경 데이터를 델타 테이블에 저장

우리의 상태 유지 프로세서를 TemperatureMonitor로 정의한 후, 이 프로세서를 transformWithStateInPandas 연산자에 전달하고 결과를 델타 테이블에 저장합니다. 이렇게 하면 TemperatureMonitor 의 데이터가 외부 서비스와 분석을 위해 사용 가능하게 됩니다.

출력 검사

TemperatureMonitor 에 의해 처리되고 출력 델타 테이블에 저장된 데이터를 살펴봅시다. 이는 다양한 위치(파리, 뉴욕, 런던, 도쿄, 시드니)의 여러 센서로부터의 환경 읽기 값과 함께 트리거된 알림을 포함하고 있습니다.

보시다시피, transformWithState는 우리가 상태를 효과적으로 처리하고 높은 습도, 온도, CO2 수준 등에 대한 다양한 환경 경고를 다른 위치에서 발생시키는 데 도움이 됩니다.

환경 이력 관리

이제 센서가 계속해서 다양한 위치에서 환경 조건을 모니터링하는 도시를 상상해 보세요. 온도가 급격히 상승할 경우, 도시 관리자는 이것이 지역적인 문제인지 아니면 도시 전체의 문제인지 알아야 할 수 있습니다.

ListState API는 상태 관리를 순서가 지정된 컬렉션을 처리할 수 있도록 확장하며, 시계열 데이터 및 역사적 분석에 이상적입니다. 이것은 타임라인이나 우리가 선택한 임의의 경계를 가로질러 패턴과 추세를 추적할 때 중요해집니다.

ListState 구현 - 도시를 위한 스마트한 역사적 저장소

도시에는 지속적으로 데이터를 스트리밍하는 여러 센서가 있다고 가정해 봅시다. 도시 내의 어떤 위치에서든 25°C의 임계값을 초과하는 온도를 보고하면, 우리는 그 데이터를 캡처하고 시간 인식 ListState에 저장합니다:

아래 예에서는 EnvironmentalMonitorListProcessor 클래스와 ListState를 사용하고 내장된 TTL (Time To Live)을 사용하여 센서 데이터의 이력을 1시간 동안 유지합니다:

Time to Live(TTL)을 사용하여 오래된 상태 값 만료

transformWithState에서 사용하는 상태 값은 선택적으로 살아있는 시간(TTL) 값을 지원하며, 이는 값의 처리 시간과 설정된 밀리초 단위의 기간을 기반으로 계산됩니다. TTL이 만료되면, 해당 값은 상태 저장소에서 제거됩니다.

ListState와 함께 TTL은 상태 객체 내에서 관련 데이터만 자동으로 유지하는 데 중요하며, 지정된 시간 기간 후에는 오래된 레코드를 자동으로 제거합니다.

이 예에서, TTL은 도시 전체의 분석이 현재 상태를 유지하고 관련성을 유지하도록 보장합니다. 각 상태 항목은 만료 타임스탬프를 받고, 한번 만료되면 상태는 자동으로 지워져 도시의 최근 역사적 맥락을 유지하면서 무제한 상태 성장을 방지합니다.

도시 전체 패턴 인식

ListState 객체에 저장된 이력을 이용하여 패턴을 찾아내고 다양한 계산을 수행할 수 있습니다. 예를 들어, EnvironmentalMonitorListProcessor 에서는 현재 읽기 값과 가장 최근의 역사적 읽기 값을 비교하여 온도 추세를 파악합니다.

스트리밍 쿼리 설정

이제 EnvironmentalMonitorListProcessor 를 스트리밍 파이프라인에 연결하고, 결과를 델타 테이블에 저장하고, 더 자세히 조사해 봅시다.

출력 검사

위의 스크린샷에서 볼 수 있듯이, 델타 테이블은 이제 위치별 시간 분석을 보여줍니다. ListState의 시간 저장소와 도시 수준 분석을 결합함으로써, 우리는 환경 문제를 감지하는 것뿐만 아니라 전체 도시를 걸쳐 그들의 맥락과 진화를 이해하는 시스템을 만들었습니다. ListState API와 TTL 관리의 결합은 역사적인 환경 데이터를 처리하는 효율적인 방법을 제공하면서 무제한 상태 성장을 방지하므로, 도시 전체의 환경 모니터링 시스템에 이상적입니다.

위치 기반 분석 수행

이제 스마트 도시 계획자들이 바쁜 시내 교차로부터 주거 지역, 산업 단지에 이르기까지 다양한 도시 구역에 환경 센서를 배치하는 시나리오를 상상해 보세요. 각 지역은 시간과 계절에 따라 다양한 환경 기준을 가지고 있습니다.

MapState API를 사용하면, 시스템은 위치별 환경 읽기를 유지하고, 읽기가 허용 가능한 임계값을 초과하는 위치를 식별할 수 있습니다. 이 아키텍처는 도시 위치를 키로 사용하여 여러 환경에서 병렬 모니터링을 수행하고, 중요한 환경 추세를 추적하기 위해 최대 측정값을 보존하면서 무제한 상태 성장을 방지합니다.

EnvironmentalMonitorProcessor 는 MapState의 세련된 키-값 저장 기능을 활용하여 도시 내 위치별로 데이터를 구성합니다. 이를 통해 다양한 도시 환경에서 변화하는 조건에 대한 실시간 분석이 가능해져, 원시 센서 데이터를 도시 환경 관리를 위한 실질적인 정보로 변환합니다.

처리 로직

MapState 구조는 다음과 같이 위치를 키로 초기화됩니다:

우리의 구현에서 상태 업데이트 과정은 각 환경 파라미터에 대한 최대값을 취하여 각 위치에서의 최고 오염 수준을 추적하도록 보장합니다:

스트리밍 쿼리 설정

이제 구현을 다음과 같이 Spark 구조화된 스트리밍 파이프라인에 통합할 수 있습니다:

출력 검사

Delta 테이블 출력은 이제 여러 위치/도시에 걸친 종합적인 환경 모니터링을 보여줍니다.

모두 함께 놓기

위의 섹션에서는 Apache Spark의 새로운 transformWithState API를 사용하여 다양한 환경 모니터링 사용 사례를 쉽게 지원할 수 있음을 보여주었습니다. 요약하면, 위의 구현은 다음 사용 사례를 가능하게 할 수 있습니다:

  • 다중 파라미터 임계값 모니터링: 온도, 습도, 이산화탄소, PM2.5 레벨에 걸친 위반의 실시간 감지
  • 실시간 알림: 환경 조건 변화에 대한 즉시 알림
  • 병렬 도시 모니터링: 여러 도시 지역의 독립적인 추적

향상된 디버깅 및 관찰 가능성

위에서 보여준 파이프라인 코드와 함께, 새로운 transformWithState API의 가장 강력한 기능 중 하나는 Apache Spark의 state reader 와의 원활한 통합입니다. 이 기능은 우리의 환경 모니터링 시스템에서 유지되는 내부 상태에 대한 전례 없는 가시성을 제공하며, 개발, 디버깅, 운영 모니터링을 훨씬 효과적으로 만듭니다.

상태 정보 접근

여러 도시에서 중요한 환경 모니터링 시스템을 관리할 때, 기본 상태를 이해하는 것은 이상 현상을 문제 해결하고, 데이터 무결성을 검증하고, 적절한 시스템 운영을 보장하는 데 필수적입니다. 상태 데이터 소스 리더는 고수준 메타데이터와 상세한 상태 값을 모두 쿼리할 수 있게 해줍니다.

출력 검사

아래 스크린샷에서 보여지는 것처럼, 사용자는 이제 모든 복합 유형에 대한 모든 상태 행에 대한 세밀한 접근 권한을 얻을 수 있어, 이러한 파이프라인의 디버깅 및 관찰 가능성이 크게 향상되었습니다.

결론

Apache Spark™ 4.0의 transformWithState API는 스트리밍 애플리케이션에서 임의의 상태 유지 처리를 위한 중요한 발전을 나타냅니다. 위의 환경 모니터링 사용 사례를 통해, 사용자가 새 API를 사용하여 강력한 운영 작업 부하를 구축하고 실행할 수 있음을 보여주었습니다. 그것의 객체 지향적 접근법과 강력한 기능 세트는 복잡한 요구 사항을 처리할 수 있는 고급 스트리밍 파이프라인의 개발을 가능하게 하면서 신뢰성과 성능을 유지합니다. 우리는 모든 Spark 사용자들이 스트리밍 사용 사례에 대해 새 API를 시도하고 이 새 API가 제공하는 모든 이점을 활용하도록 권장합니다!

위의 코드는 여기에서 다운로드 받을 수 있습니다: https://github.com/databricks-solutions/databricks-blogposts/tree/main/2025-05-transformWithStateInPandas/python/environmentalMonitoring

 

(이 글은 AI의 도움을 받아 번역되었습니다. 원문이 궁금하시다면 여기를 클릭해 주세요)

게시물을 놓치지 마세요

관심 있는 카테고리를 구독하고 최신 게시물을 받은편지함으로 받아보세요