주요 컨텐츠로 이동
오픈 소스

PySpark에서 Arrow UDF 소개: Pandas UDF를 대체하는 더 빠르고 가벼운 방법

더 성능이 뛰어난 UDF를 쉽게 정의하세요.

작성자: Ruifeng Zheng , Yicong Huang

  • Pandas UDF의 Pandas/Arrow 변환 오버헤드를 제거하여 더 빠른 실행과 낮은 메모리 사용량을 위해 Arrow 데이터에서 직접 작동하는 네이티브 Arrow UDF를 소개합니다.
  • 또한 스칼라 및 집계 사용 사례에 대한 Arrow UDF 유형과 테이블 입력, 테이블 출력 변환을 위한 Arrow UDTF를 Python 및 SQL 코드 예제와 함께 설명합니다.
  • 벤치마크에 따르면 Arrow UDF는 Pandas UDF보다 약 10% 더 빠르고 약 40% 더 적은 메모리를 사용하며 복잡한 데이터 유형에 대한 지원이 더 좋습니다.

소개

Python 사용자 정의 함수(UDF)는 필수적인 확장성 메커니즘이지만, 행 기반 실행으로 인해 전통적으로 높은 오버헤드를 겪어왔습니다. Apache Spark™에서는 Pandas UDF가 Arrow 기반 직렬화 및 배치 처리를 도입하여 이 문제의 일부를 해결했으며, 스칼라 Python UDF에 비해 처리량이 크게 향상되었습니다.

하지만 Pandas UDF에는 여전히 근본적인 한계가 있습니다:

  • Pandas/Arrow 데이터 변환은 추가 데이터 복사를 유발합니다. 제로 복사 접근 방식은 특정 제한적인 경우에만 가능합니다. 예를 들어, NULL 값이 있는 열은 깊은 복사를 트리거합니다.
  • 복잡한 데이터 유형은 잘 지원되지 않습니다. 예를 들어, 중첩된 StructType 인스턴스는 집계 사용 사례의 출력 유형으로 지원되지 않습니다.
Apache Spark™의 Pandas UDF 실행 데이터 흐름

Pandas/Arrow 데이터 변환을 제거함으로써 Arrow UDF는 Pandas UDF보다 더 빠르게 실행되고, 메모리를 덜 소비하며, 더 나은 데이터 유형 지원을 제공합니다.

네이티브 Arrow UDF

Databricks Runtime 18.0(릴리스 노트)부터 시작되는 네이티브 Arrow UDF를 소개하게 되어 기쁩니다. 이는 성능 좋은 UDF 실행을 위한 흥미로운 도약입니다.

네이티브 Arrow UDF는 입력을 Pandas 또는 NumPy 객체로 변환하지 않고 직접 Arrow 데이터에서 작동합니다. 이를 통해 엔드투엔드에서 열 레이아웃을 유지하고, 불필요한 데이터 복사를 방지하며, UDF가 Arrow의 네이티브 컴퓨팅 및 메모리 모델을 활용하여 벡터화된 처리를 사용하도록 합니다.

Arrow UDF를 정의하기 위해 사용자는 지정된 반환 유형과 선택적 평가 유형을 가진 새로운 Python 데코레이터를 사용할 수 있습니다. 예를 들어:

@arrow_udf

기존 데코레이터 @udf를 사용하여 완전한 유형 힌트와 함께 정의할 수도 있습니다. 예를 들어:

참고: 함수 정의에는 모든 인수와 반환 값에 대한 유형 힌트가 포함되어야 합니다.
이 디자인은 스칼라 Python UDF의 인터페이스와 일치하여 스칼라 Python UDF에 이미 익숙한 사용자에게 일관되고 직관적인 경험을 제공합니다.

다음은 Arrow UDF 사용 방법을 보여줍니다:

Python 사용법:

SQL 사용법:

스칼라 함수, 집계 함수 및 테이블 함수를 포함하여 Arrow UDF 인터페이스의 변형을 지원합니다. 데이터프레임 API에서는 Arrow UDF를 사용하기 위해 mapInArrow 및 applyInArrow도 제공합니다. 다음으로 하나씩 소개하겠습니다.

Arrow 스칼라 함수

Arrow 스칼라 함수는 행별 변환을 수행합니다. 이는 스칼라 Pandas UDF의 Arrow 등가물이며, df.select() 또는 df.withColumn()와 같이 열 표현식이 필요한 모든 곳에서 사용할 수 있습니다. 세 가지 입력 모드가 지원됩니다: 직접, 이터레이터, 여러 배열의 이터레이터. UDF에 비용이 많이 드는 일회성 초기화(예: 모델 로드 또는 정규식 패턴 컴파일)가 필요한 경우 이터레이터 변형이 유용하며, 설정 비용은 모든 배치에 걸쳐 상각됩니다. 모든 경우에 출력 행 수는 입력 행 수와 일치해야 합니다.

  • 배열에서 배열로: 하나 이상의 pyarrow.Array를 받고 하나의 pyarrow.Array를 반환합니다. 입력 및 출력 배열은 동일한 수의 값을 가져야 합니다.
  • 배열 이터레이터에서 배열 이터레이터로: pyarrow.Array의 이터레이터를 받고 pyarrow.Array의 이터레이터를 반환합니다. 이 유형은 UDF 실행에 비용이 많이 드는 초기화가 필요한 경우 유용합니다.
  • 여러 배열 이터레이터에서 배열 이터레이터로: 여러 pyarrow.Array의 튜플 이터레이터를 받고 pyarrow.Array의 이터레이터를 반환합니다.

Arrow 집계 함수

Arrow 집계 함수는 하나 이상의 pyarrow.Array 입력을 받아 스칼라 값을 반환하며, 행 그룹을 단일 결과로 축소합니다. 이는 그룹화된 집계 Pandas UDF의 Arrow 등가물이며 groupBy().agg() 또는 창 작업과 함께 사용됩니다. 스칼라 함수와 마찬가지로 집계 함수도 세 가지 입력 모드를 지원합니다.

배열에서 스칼라로: pyarrow.Array를 받고 스칼라 값을 반환합니다.

  • 배열 이터레이터에서 스칼라로: pyarrow.Array의 이터레이터를 받고 스칼라 값을 반환합니다. 이는 집계 스타일 작업에서 대량의 데이터를 처리하는 데 유용합니다.

여러 배열 이터레이터에서 스칼라로: 여러 pyarrow.Array의 튜플 이터레이터를 받고 스칼라 값을 반환합니다. 더 복잡한 집계를 정의할 수 있습니다.

Arrow 테이블 함수

Arrow 테이블 함수, 즉 Arrow UDTF(사용자 정의 테이블 함수)는 pyarrow.RecordBatch 또는 여러 pa.Array를 입력으로 받아 pyarrow.Table를 출력으로 생성합니다. 이는 열 실행을 활용하여 Python에서 구현된 테이블 입력, 테이블 출력 변환의 주요 패턴을 나타냅니다. Arrow UDTF는 다음을 수행할 수 있습니다:

  • 여러 열 반환
  • 0개, 1개 또는 여러 행 생성
  • Arrow 컴퓨팅 커널을 활용한 벡터화된 테이블 변환 실행

결과적으로 필터링, 행 확장, 데이터 재구성 및 파생 열 생성과 같은 작업에 최적으로 적합합니다.

The arrow_udtf 인터페이스는 데코레이터 구문을 사용하여 반환 유형을 DDL 형식의 문자열로 정의함으로써 단순성을 위해 설계되었습니다. 이 설정에서 eval 메서드는 PyArrow 객체를 입력으로 받고 PyArrow 테이블 또는 레코드 배치를 생성해야 합니다. 인터페이스는 두 가지 입력 모드를 지원합니다. 테이블 인수를 처리할 때 eval 메서드에는 입력 테이블의 모든 열을 캡슐화하는 pa.RecordBatch 객체가 제공됩니다:

스칼라 인수의 경우 메서드는 각 스칼라 입력에 대해 pa.Array 객체를 받습니다:

다른 예는 다음과 같습니다:

이 UDTF는 두 가지 방식으로 작동할 수 있습니다:

Python 사용법:

SQL 사용법:

DataFrame mapInArrow 및 applyInArrow 지원

사용자 정의 함수(UDF) 및 사용자 정의 테이블 함수(UDTF) 외에도 PySpark는 DataFrame 수준에서 Python 네이티브 함수를 Arrow 데이터에 직접 적용할 수 있도록 하는 Arrow 함수 API를 제공합니다. 이러한 API는 Pandas와 유사하게 작동합니다(mapInPandas, applyInPandas) 하지만 Pandas DataFrame 대신 pyarrow.RecordBatchpyarrow.Table를 사용하여 Pandas와 Arrow 형식 간의 변환 오버헤드를 피합니다.

  • 맵. DataFrame.mapInArrowpyarrow.RecordBatch의 이터레이터를 pyarrow.RecordBatch의 다른 이터레이터로 변환하여 필터링, 변환 또는 확장과 같은 행 수준 작업을 가능하게 합니다.
  • 그룹화된 맵. groupBy().applyInArrow()는 각 그룹에 지정된 함수를 적용하며, pyarrow.Table를 받아 반환합니다. 이 기능은 데이터 정규화와 같은 그룹별 변환에 유용합니다.
  • 공동 그룹화된 맵. cogroup().applyInArrow()는 공유 키를 기반으로 두 DataFrame을 공동 그룹화한 다음 각 공동 그룹에 함수를 적용할 수 있도록 합니다. 함수는 두 개의 pyarrow.Table 입력을 받고 단일 pyarrow.Table를 반환해야 합니다.

성능

비용이 많이 드는 Pandas/Arrow 데이터 변환을 제거함으로써 Arrow UDF는 일반적으로 Pandas UDF보다 더 빠르게 실행되며 메모리 사용량도 적습니다. 두 개의 간단한 UDF를 비교해 보겠습니다:

Arrow UDF는 Pandas UDF보다 약 10% 빠르며, 메모리 프로파일러는 실행 시 약 40%의 메모리가 절약됨을 보여줍니다.

결론

Databricks Runtime 18.0은 PySpark에서 성능이 뛰어난 Python UDF 실행을 위한 Pandas UDF의 더 빠르고 가벼운 대안을 제공하는 네이티브 Arrow UDF를 도입합니다. Arrow 데이터에서 직접 작동하고 Pandas/Arrow 변환 오버헤드를 제거함으로써 Arrow UDF는 약 10% 더 빠른 실행, 약 40% 더 적은 메모리 사용량, 복잡한 데이터 유형에 대한 더 나은 지원을 제공합니다. 이 모든 것이 익숙하고 직관적인 데코레이터 구문으로 제공됩니다.

더 많은 것을 탐색할 준비가 되셨습니까? Databricks Runtime 18.0의 일부로 Databricks에서 지금 바로 네이티브 Arrow UDF를 사용해 보세요. 시작하려면 기존 Pandas UDF를 Arrow UDF로 바꾸기만 하면 됩니다. 대부분의 경우 몇 줄만 변경하면 즉각적인 성능 향상을 얻을 수 있습니다. 전체 API 참조 및 추가 예제는 Arrow UDF 설명서 및 Arrow UDTF 설명서를 참조하세요.

(이 글은 AI의 도움을 받아 번역되었습니다. 원문이 궁금하시다면 여기를 클릭해 주세요)

최신 게시물을 이메일로 받아보세요

블로그를 구독하고 최신 게시물을 이메일로 받아보세요.