주요 컨텐츠로 이동
Engineering blog

Apache Spark™ 3.5의 Arrow에 최적화된 Python UDF

Xinrong Meng
Hyukjin Kwon
Takuya Ueshin
Allan Folting
이 포스트 공유하기

(번역: Youngkyong Ko) Original Blog Post

Apache Spark™에서 Python 사용자 정의 함수(UDF)는 가장 인기 있는 기능 중 하나입니다. 이 기능을 통해 사용자는 고유한 데이터 처리 요구사항에 맞는 사용자 정의 코드를 작성할 수 있습니다. 그러나 직렬화와 역직렬화(serialization and deserialization)를 위해 cloudpickle에 의존하는 현재의 Python UDF는 특히 대용량 데이터 입출력을 처리할 때 성능 병목 현상이 발생합니다.

Apache Spark 3.5와 Databricks Runtime 14.0에서는 성능을 크게 개선하기 위해 Arrow에 최적화된 Python UDF를 도입했습니다. 이 최적화의 핵심은 Apache Arrow로, 여러 언어에서 표준화된 방식의 컬럼형 인메모리 데이터 표현입니다. 이러한 UDF는 Arrow를 활용함으로써 기존의 느린 데이터 (역)직렬화 방식을 우회하여 JVM과 Python 프로세스 간에 신속한 데이터 교환을 가능하게 합니다. Apache Arrow의 풍부한 타입 시스템을 통해 이러한 최적화된 UDF는 형 변환(type coercion)을 처리하는 보다 일관되고 표준화된 방법을 제공합니다.

Python UDF에 대한 Arrow 최적화는 선택 사항이며, 사용자는 "functions.udf""useArrow" 부울 매개변수를 사용하여 개별 UDF에 대한 Arrow 최적화를 활성화할지 여부를 제어할 수 있습니다. 아래 예를 참조하세요:

>>> @udf(returnType='int', useArrow=True)  # An Arrow Python UDF
... def arrow_slen(s):
...   return len(s)
... 

또한 아래와 같이 "spark.sql.execution.pythonUDF.arrow.enabled" Spark 설정을 통해, 전체 SparkSession의 모든 UDF에 대해 Arrow 최적화를 활성화할 수 있습니다: 

>>> spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
>>> 
>>> @udf(returnType='int')  # An Arrow Python UDF
... def arrow_slen(s):
...   return len(s)
...

더 빠른 (역)직렬화

Apache Arrow는 서로 다른 시스템과 프로그래밍 언어 간에 효율적인 데이터 교환을 제공하는 컬럼형 인메모리 데이터 형식입니다. 전체 행을 객체로 직렬화하는 Pickle과 달리, Arrow는 컬럼 기반 형식으로 데이터를 저장하기 때문에 압축과 메모리 로컬리티가 향상되어 분석 워크로드에 더 적합합니다.

아래 차트는 Arrow에 최적화된 Python UDF가 다양한 크기의 입력 데이터셋에 대해 단일 변환을 수행하는 성능을 보여줍니다. 클러스터는 3개의 워커와 1개의 드라이버로 구성되며, 클러스터의 각 머신에는 16개의 vCPU와 122 GiB 메모리가 있습니다. Arrow에 최적화된 Python UDF는 피클된 Python UDF보다 1.6배까지 빠릅니다.

Arrow-optimized Python UDF

Arrow에 최적화된 Python UDF는 UDF를 체인으로 연결할 때 상당한 이점이 있습니다. 아래에서 볼 수 있듯이, 동일한 클러스터에서 Arrow에 최적화된 Python UDF는 32 GB 데이터 세트에서 피클된 Python UDF보다 1.9배까지 더 빠르게 실행할 수 있습니다.

Arrow-optimized Python UDF

전체 벤치마크 및 결과는 여기를 참조하세요.

표준화된 형 변환(type coercion)

UDF 형 변환은 UDF가 반환하는 Python 값이 사용자가 지정한 리턴값 유형과 일치하지 않을 때 문제를 일으킵니다. 기본으로 제공되는 피클된 Python UDF의 형 변환은 이러한 형 불일치를 처리하는 방법으로 None에 의존하여 잠재적인 모호성 및 데이터 손실을 초래하는 등의 제약이 있습니다. 또한 날짜, 날짜/시간, 튜플을 문자열로 변환하면 모호한 결과가 나올 수 있습니다. Arrow에 최적화된 Python UDF는 형 변환을 위한 Arrow의 잘 정의된 규칙 세트를 활용하여 이러한 문제를 해결합니다.

아래와 같이, Arrow에 최적화된 Python UDF(useArrow=True)는 문자열로 저장된 정수를 지정된 대로 "int"로 성공적으로 형 변환하지만, 피클된 Python UDF(useArrow=False)는 "NULL"을 반환합니다.

>>> df = spark.createDataFrame(['1', '2'], schema='string')
>>> df.select(udf(lambda x: x, 'int', useArrow=True)('value').alias('str_to_int')).show()
+----------+                                                                    
|str_to_int|
+----------+
|         1|
|         2|
+----------+
>>> df.select(udf(lambda x: x, 'int', useArrow=False)('value').alias('str_to_int')).show()
+----------+
|str_to_int|
+----------+
|      NULL|
|      NULL|
+----------+

아래의 또 다른 예에서, Arrow에 최적화된 Python UDF(useArrow=True)는 날짜를 문자열로 올바르게 형 변환하는 반면, 피클된 Python UDF(useArrow=False)는 기본 Java 객체를 노출하여 모호한 결과를 반환합니다.

>>> df = spark.createDataFrame([datetime.date(1970, 1, 1), datetime.date(1970, 1, 2)], schema='date')
>>> df.select(udf(lambda x: x, 'string', useArrow=True)('value').alias('date_in_string')).show()
+--------------+
|date_in_string|
+--------------+
|    1970-01-01|
|    1970-01-02|
+--------------+
>>> df.select(udf(lambda x: x, 'string', useArrow=False)('value').alias('date_in_string')).show()
+-----------------------------------------------------------------------+
|date_in_string                                                         |
+-----------------------------------------------------------------------+
|java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet..|
|java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet..|
+-----------------------------------------------------------------------+

피클과 비교했을 때, Arrow의 형 변환은 변환 과정에서 가능한 한 많은 정보와 정밀도를 유지하는 것을 목표로 합니다.

형 변환과 관련하여 피클 Python UDF와 Arrow에 최적화된 Python UDF를 종합적으로 비교하려면 여기를 참조하세요.

결론

Arrow에 최적화된 Python UDF는 UDF 입력과 출력의 (역)직렬화를 위해 Apache Arrow를 활용하므로 기본 피클된 Python UDF에 비해 (역)직렬화가 훨씬 빠릅니다. 또한 Apache Arrow 사양에 따라 표준화된 형 변환 규칙을 적용합니다. Arrow에 최적화된 Python UDF는 Spark 3.5부터 사용할 수 있으며, 자세한 내용은 SPARK-40307을 참조하세요.

Databricks 무료로 시작하기

관련 포스트

Engineering blog

Introducing Pandas UDF for PySpark

October 30, 2017 작성자: Li Jin in 엔지니어링 블로그
NOTE: Spark 3.0 introduced a new pandas UDF. You can find more details in the following blog post: New Pandas UDFs and Python...
Engineering blog

Memory Profiling in PySpark

There are many factors in a PySpark program's performance. PySpark supports various profiling tools to expose tight loops of your program and allow...
Engineering blog

Apache Spark™ 3.5 소개

(번역: Sangbae Lim) Original Blog Post 오늘, 데이터브릭스 런타임 14.0에서 Apache Spark™ 3.5를 사용할 수 있다는 소식을 발표하게 되어 기쁘게 생각합니다. Spark 3.5...
모든 엔지니어링 블로그 포스트 보기