주요 컨텐츠로 이동
Engineering blog

DataFrame 동일성 테스트 함수를 이용하여 PySpark 테스트 간소화

PySpark DataFrame의 동일성 테스트 함수와 그 중요성, 사용 방법을 소개합니다.
Haejoon Lee
Allison Wang
Amanda Liu
이 포스트 공유하기

(번역: Youngkyong Ko) Original Post

DataFrame 동일성 테스트 함수는 PySpark 단위 테스트를 간소화하기 위해 Apache Spark 3.5와 Databricks Runtime 14.2에 도입되었습니다. 이 블로그 게시물에 설명된 전체 기능셋은 곧 출시될 Apache Spark 4.0 및 Databricks Runtime 14.3부터 제공될 예정입니다.

DataFrame 동일성 테스트 함수를 이용하여 더 자신감 있게 데이터프레임 변환 코드 작성

PySpark에서 데이터로 작업하려면 DataFrame에 변환, 집계 및 조작을 적용해야 합니다. 변환이 누적될 때 코드가 예상대로 작동하는지 어떻게 확신할 수 있을까요? PySpark 동일성 테스트 유틸리티 함수는 데이터를 예상 결과와 비교하여 효율적, 효과적으로 확인할 수 있는 방법을 제공함으로써, 예상치 못한 차이를 식별하고 분석 프로세스 초기에 오류를 포착할 수 있도록 도와줍니다. 또한 차이점을 정확히 짚어주는 직관적인 정보를 반환하므로 디버깅에 많은 시간을 들이지 않고도 즉시 조치를 취할 수 있습니다.

DataFrame 동일성 테스트 함수 사용

Apache Spark 3.5에는 PySpark DataFrame에 대한 두 가지 동일성 테스트 함수인 assertDataFrameEqualassertSchemaEqual이 도입되었습니다. 각각의 사용법을 살펴보겠습니다.

assertDataFrameEqual: 이 함수를 사용하면 한 줄의 코드로 두 PySpark DataFrame이 동일한지 비교하여 데이터와 스키마가 모두 일치하는지 확인할 수 있습니다. 차이가 있는 경우 설명 정보를 반환합니다.

예제를 살펴보겠습니다. 먼저 두 개의 DataFrame을 생성하고 첫 번째 행에 의도적으로 차이점를 만듭니다:

df_expected = spark.createDataFrame(data=[("Alfred", 1500), ("Alfred", 2500), ("Anna", 
500), ("Anna", 3000)], schema=["name", "amount"])

df_actual = spark.createDataFrame(data=[("Alfred", 1200), ("Alfred", 2500), ("Anna", 500), 
("Anna", 3000)], schema=["name", "amount"])

그런 다음 두 개의 DataFrame에 대해 assertDataFrameEqual을 호출합니다:

from pyspark.testing import assertDataFrameEqual

assertDataFrameEqual(df_actual, df_expected)

이 함수는 두 DataFrame의 첫 번째 행이 다르다는 것을 나타내는 설명 메시지를 반환합니다. 이 예제에서는 이 행의 Alfred에 대해 나열된 첫 번째 금액이 동일하지 않습니다 (예상: 1500, 실제: 1200):

DataFrames

이 정보를 통해 코드에서 생성된 DataFrame의 문제를 즉시 파악하고 이를 기반으로 디버깅 대상을 지정할 수 있습니다.

이 함수에는 DataFrame 비교의 엄격성을 제어하는 몇 가지 옵션도 있으므로 특정 사용 사례에 따라 조정할 수 있습니다.

assertSchemaEqual: 이 함수는 두 DataFrame의 스키마만 비교하며 행 데이터는 비교하지 않습니다. 이 함수를 사용하면 서로 다른 두 DataFrame의 열 이름, 데이터 유형 및 null 가능 속성이 동일한지 확인할 수 있습니다.

예제를 살펴보겠습니다. 먼저 서로 다른 스키마를 가진 두 개의 데이터 프레임을 만들어 보겠습니다:

schema_actual = "name STRING, amount DOUBLE"

data_expected = [["Alfred", 1500], ["Alfred", 2500], ["Anna", 500], ["Anna", 3000]]
data_actual = [["Alfred", 1500.0], ["Alfred", 2500.0], ["Anna", 500.0], ["Anna", 3000.0]]

df_expected = spark.createDataFrame(data = data_expected)
df_actual = spark.createDataFrame(data = data_actual, schema = schema_actual)

이제 이 두 DataFrame 스키마로 assertSchemaEqual을 호출해 보겠습니다:

from pyspark.testing import assertSchemaEqual

assertSchemaEqual(df_actual.schema, df_expected.schema)

이 함수는 두 DataFrame의 스키마가 서로 다른지 확인하고, 출력에 스키마가 갈라지는 지점을 표시합니다:

DataFrames

이 예에서는 두 가지 차이점이 있습니다. 실제 DataFrame에서는 금액 열의 데이터 유형이 LONG이지만 예상 데이터 프레임에서는 DOUBLE이고, 스키마를 지정하지 않고 예상 데이터 프레임을 만들었기 때문에 열 이름도 다릅니다.

이 두 가지 차이점은 여기 그림과 같이 함수 출력에서 강조 표시되어 있습니다.

assertPandasOnSparkEqual은 Apache Spark 3.5.1에서 더 이상 사용되지 않으며 곧 출시될 Apache Spark 4.0.0에서 제거될 예정이므로 이 블로그 게시물에서는 다루지 않습니다. Spark에서 Pandas API를 테스트하려면 Pandas API on Spark equality test functions를 참조하세요.

PySpark DataFrame의 차이 디버깅을 위한 구조화된 출력

assertDataFrameEqual 과 assertSchemaEqual 함수는 주로 단위 테스트 용도이고, 따라서 일반적으로 작은 데이터셋을 사용하여 PySpark 함수를 테스트하는 것이 보통이지만, 적지 않은 행과 열을 가진 DataFrame과 함께 사용할 수도 있습니다. 이러한 시나리오에서는 서로 다른 행에 대한 행 데이터를 쉽게 검색하여 추가 디버깅을 더 쉽게 수행할 수 있습니다.

그 방법을 살펴보겠습니다. 앞서 사용한 것과 동일한 데이터를 사용하여 두 개의 데이터 프레임을 만들어 보겠습니다:

df_expected = spark.createDataFrame(data=[("Alfred", 1500), ("Alfred", 2500), 
("Anna", 500), ("Anna", 3000)], schema=["name", "amount"])
df_actual = spark.createDataFrame(data=[("Alfred", 1200), ("Alfred", 2500), ("Anna", 
500), ("Anna", 3000)], schema=["name", "amount"])

이제  assertDataFrameEqual을 호출한 후 assertion 오류 객체에서 두 DataFrame 간에 서로 다른 데이터를 찾을 수 있습니다:

from pyspark.testing import assertDataFrameEqual
from pyspark.errors import PySparkAssertionError

try:
    assertDataFrameEqual(df_actual, df_expected, includeDiffRows=True)
except PySparkAssertionError as e:
    # `e.data` here looks like:
    # [(Row(name='Alfred', amount=1200), Row(name='Alfred', amount=1500))]
    spark.createDataFrame(e.data, schema=["Actual", "Expected"]).show() 

이 예제에서와 같이 서로 다른 행들을 DataFrame으로 만들어 표시하면 차이를 찾는 것이 매우 쉬워 집니다:

DataFrame

보시다시피, 서로 다른 행에 대한 정보를 즉시 추가 분석에 사용할 수 있습니다. 더 이상 디버깅 목적으로 실제 DataFrame과 예상 DataFrame에서 이 정보를 추출하는 코드를 작성할 필요가 없습니다.

이 기능은 곧 출시될 Apache Spark 4.0 및 DBR 14.3에서 사용할 수 있습니다.

Pandas API on Spark 동일성 테스트 함수 

PySpark DataFrame의 동일성 테스트 함수 외에도, Pandas API on Spark 사용자는 다음과 같은 DataFrame 동일성 테스트 함수에 액세스할 수 있습니다:

  • assert_frame_equal
  • assert_series_equal
  • assert_index_equal

이 함수들는 비교의 엄격성을 제어하기 위한 옵션들을 제공하며, Spark DataFrame에서 Pandas API를 단위 테스트하는 데 유용합니다. 이 함수는 pandas 테스트 유틸리티 함수와 완전히 동일한 API를 제공하므로, Pandas API on Spark을 사용하여 실행하려는 기존 pandas 테스트 코드를 변경하지 않고도 사용할 수 있습니다.

다음은 다양한 매개변수와 함께 assert_frame_equal을 사용하는 것을 보여주는 몇 가지 예제로, Pandas API on Spark DataFrame을 비교합니다:

from pyspark.pandas.testing import assert_frame_equal
import pyspark.pandas as ps

# Create two slightly different Pandas API on Spark DataFrames
df1 = ps.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})
df2 = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})  # 'b' column as integers

# Validate DataFrame equality with strict data type checking
assert_frame_equal(df1, df2, check_dtype=True)

이 예제에서는 두 DataFrame의 스키마가 다릅니다. 함수 출력에는 다음과 같이 차이점이 나열되어 있습니다:

DataFrames

이 예제에서와 같이 check_dtype 인수를 사용하여 열의 데이터 유형이 같지 않은 경우에도 열 데이터를 비교하도록 함수를 지정할 수 있습니다:

# DataFrames are equal with check_dtype=False
assert_frame_equal(df1, df2, check_dtype=False)

 assert_frame_equal이 열 데이터 유형을 무시하도록 지정했기 때문에 이제 두 DataFrame이 동일한 것으로 간주합니다.

또한 이 함수를 사용하면 이 예제에서와 같이 Pandas API on Spark 객체와 pandas 객체를 비교할 수 있으므로 서로 다른 DataFrame 라이브러리 간의 호환성을 쉽게 확인할 수 있습니다.

import pandas as pd

# Pandas DataFrame
df_pandas = pd.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})

# Comparing Pandas API on Spark DataFrame with the Pandas DataFrame
assert_frame_equal(df1, df_pandas)

# Comparing Pandas API on Spark Series with the Pandas Series
assert_series_equal(df1.a, df_pandas.a)

# Comparing Pandas API on Spark Index with the Pandas Index
assert_index_equal(df1.index, df_pandas.index)

새로운 PySpark DataFrame 및 Pandas API on Spark의 동일성 테스트 함수들을 사용하면 PySpark 코드가 예상대로 작동하는지 확인할 수 있습니다. 이러한 함수를 사용하면 오류를 포착할 뿐만 아니라 무엇이 잘못되었는지 정확히 파악할 수 있어 문제가 어디에 있는지 빠르고 쉽게 파악할 수 있습니다. 자세한 내용은 PySpark 테스팅 페이지를 참조하세요.

이러한 기능은 곧 출시될 Apache Spark 4.0에서 사용할 수 있습니다. DBR 14.2에서는 이미 지원되고 있습니다.

Databricks 무료로 시작하기

관련 포스트

Engineering blog

Apache Spark™ 3.5 소개

(번역: Sangbae Lim) Original Blog Post 오늘, 데이터브릭스 런타임 14.0에서 Apache Spark™ 3.5를 사용할 수 있다는 소식을 발표하게 되어 기쁘게 생각합니다. Spark 3.5...
모든 엔지니어링 블로그 포스트 보기