メインコンテンツへジャンプ
Engineering blog

DataFrameの等式関数を使ったPySparkテストのシンプル化

PySpark DataFrameの等式テスト関数の紹介、なぜ重要なのか、どのように使うのか。
イ・ヘジュン
アリソン・ワン
アマンダ・リュー
Share this post

DataFrameの等式テスト関数は、PySparkのユニットテストを簡素化するためにApache Spark™ 3.5とDatabricks Runtime 14.2で導入されました。 このブログ記事で説明した機能一式は、次期Apache Spark 4.0とDatabricks Runtime 14.3から利用可能になります。

DataFrameの等式テスト関数を使用して、より信頼性の高いDataFrame変換を記述

PySparkでデータを扱うには、DataFrameに変換、集約、操作を適用します。 変換が蓄積されるにつれて、コードが期待通りに動作することをどうやって確信できるでしょうか? PySparkの等式テストユーティリティ関数は、データを期待される結果と照らし合わせてチェックする効率的で効果的な方法を提供し、予期しない差異を特定して分析プロセスの初期段階でエラーを検出するのに役立ちます。 さらに、デバッグに多くの時間を費やすことなく、即座に対策を講じることができるように、違いを正確に特定する直感的な情報を返します。

DataFrame の等式テスト関数の使用

Apache Spark 3.5では、PySpark DataFrame用の2つの等式テスト関数assertDataFrameEqualと assertSchemaEqualが導入されました。 それぞれの使い方を見てみましょう。

assertDataFrameEqualこの関数を使うと、1行のコードで2つのPySpark DataFrameが等しいかどうかを比較し、データとスキーマが一致するかどうかをチェックできます。 違いがある場合は、説明的な情報を返します。

例を見てみましょう。 まず、2つのDataFrameを作成し、最初の行に意図的に差分を入れます:

df_expected = spark.createDataFrame(data=[("Alfred", 1500), ("Alfred", 2500), ("Anna", 
500), ("Anna", 3000)], schema=["name", "amount"])

df_actual = spark.createDataFrame(data=[("Alfred", 1200), ("Alfred", 2500), ("Anna", 500), 
("Anna", 3000)], schema=["name", "amount"])

そして、2つのDataFrameでassertDataFrameEqualを呼び出します:

from pyspark.testing import assertDataFrameEqual

assertDataFrameEqual(df_actual, df_expected)

この関数は、2つのDataFrameの最初の行が異なることを示す説明的なメッセージを返します。 この例では、この行のAlfredの最初の金額が同じではありません(予想:1500、実際:1200):

DataFrame

この情報があれば、あなたのコードが生成したDataFrameの問題をすぐに知ることができ、それに基づいてデバッグを行うことができます。

また、この関数には、DataFrame の比較の厳密さを制御するためのいくつかのオプションがあります。

assertSchemaEqual行データの比較は行いません。 これにより、2つの異なるDataFramesのカラム名、データ型、Nullableプロパティが同じかどうかを検証することができます。

例を見てみましょう。 まず、異なるスキーマを持つ2つのDataFrameを作成します:

schema_actual = "name STRING, amount DOUBLE"

data_expected = [["Alfred", 1500], ["Alfred", 2500], ["Anna", 500], ["Anna", 3000]]
data_actual = [["Alfred", 1500.0], ["Alfred", 2500.0], ["Anna", 500.0], ["Anna", 3000.0]]

df_expected = spark.createDataFrame(data = data_expected)
df_actual = spark.createDataFrame(data = data_actual, schema = schema_actual)

では、これら2つのDataFrameスキーマを使ってassertSchemaEqualを呼び出してみましょう:

from pyspark.testing import assertSchemaEqual

assertSchemaEqual(df_actual.schema, df_expected.schema)

この関数は2つのDataFramesのスキーマが異なることを判定し、出力がどこで分岐しているかを出力します。

DataFrame

この例では、amount列のデータ型が、実際のDataFrameではLONGですが、期待されるDataFrameではDOUBLEであることと、スキーマを指定せずに期待されるDataFrameを作成したため、列名も異なるという2つの違いがあります。

これらの違いは、ここに示されているように、関数の出力で強調表示されます。

assertPandasOnSparkEqualはApache Spark 3.5.1から非推奨となり、次期Apache Spark 4.0.0で削除される予定なので、このブログ記事では取り上げません。 Spark 上での Pandas API のテストについては、Pandas API on Spark equality test functions を参照してください。

PySpark DataFrames の違いをデバッグするための構造化出力

assertDataFrameEqual関数とassertSchemaEqual関数は主にPySpark関数をテストするために小さいデータセットを使用するユニットテストを目的としていますが、数行と数列以上のDataFrameで使用することもあります。 このような場合、異なる行のデータを簡単に取得することができ、デバッグが容易になります。

その方法を見てみましょう。 先ほどと同じデータを使って、2つのDataFrameを作ります:

df_expected = spark.createDataFrame(data=[("Alfred", 1500), ("Alfred", 2500), 
("Anna", 500), ("Anna", 3000)], schema=["name", "amount"])
df_actual = spark.createDataFrame(data=[("Alfred", 1200), ("Alfred", 2500), ("Anna", 
500), ("Anna", 3000)], schema=["name", "amount"])

そして今度は、assertDataFrameEqual をコールした後に、アサーション・エラー・オブジェクトから 2 つの DataFrame の間で異なるデータを取得します:

from pyspark.testing import assertDataFrameEqual
from pyspark.errors import PySparkAssertionError

try:
    assertDataFrameEqual(df_actual, df_expected, includeDiffRows=True)
except PySparkAssertionError as e:
    # `e.data` here looks like:
    # [(Row(name='Alfred', amount=1200), Row(name='Alfred', amount=1500))]
    spark.createDataFrame(e.data, schema=["Actual", "Expected"]).show() 

この例で行ったように、異なる行に基づいてDataFrameを作成し、それを表示することで、この情報へのアクセスがいかに簡単であるかを示しています:

データフレーム

ご覧のように、異なる行の情報はすぐに分析に利用できます。 デバッグのために、この情報を実際のDataFramesと期待されるDataFramesから抽出するコードを書く必要はもうありません。

この機能は、次期Apache Spark 4.0とDBR 14.3から利用可能になります。

PandasのAPIをSparkの等式テスト関数で実行

PySparkのDataFrameの等質性をテストする関数に加えて、Pandas API on Sparkのユーザは以下のDataFrameの等式テスト関数を利用することができます:

  • assert_frame_equal
  • assert_series_equal
  • assert_index_equal

この関数は、比較の厳密性を制御するオプションを提供し、Spark DataFrames上でのPandas APIのユニットテストに最適です。 これらはpandasのテストユーティリティ関数と全く同じAPIを提供しているため、Spark上でPandas APIを使用して実行したい既存のpandasテストコードを変更することなく使用することができます。

以下は、Pandas APIとSpark DataFramesを比較し、パラメータを変えてassert_frame_equalを使用する例です:

from pyspark.pandas.testing import assert_frame_equal
import pyspark.pandas as ps

# Create two slightly different Pandas API on Spark DataFrames
df1 = ps.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})
df2 = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})  # 'b' column as integers

# Validate DataFrame equality with strict data type checking
assert_frame_equal(df1, df2, check_dtype=True)

この例では、2つのDataFrameのスキーマが異なっています。 関数の出力は、ここに示すように、差分を一覧表示します:

DataFrame

この例のように、check_dtype引数を使用して、カラムのデータ型が同じでなくてもカラムのデータを比較するように関数を指定することができます:

# DataFrames are equal with check_dtype=False
assert_frame_equal(df1, df2, check_dtype=False)

assert_frame_equalは列のデータ型を無視するように指定したので、2つのDataFrameは等しいとみなされます。

これらの関数は、Sparkオブジェクト上のPandas APIとpandasオブジェクトの比較も可能で、この例で示されているように、異なるDataFrameライブラリ間の互換性チェックを容易にします:

import pandas as pd

# Pandas DataFrame
df_pandas = pd.DataFrame({"a": [1, 2, 3], "b": [4.0, 5.0, 6.0]})

# Comparing Pandas API on Spark DataFrame with the Pandas DataFrame
assert_frame_equal(df1, df_pandas)

# Comparing Pandas API on Spark Series with the Pandas Series
assert_series_equal(df1.a, df_pandas.a)

# Comparing Pandas API on Spark Index with the Pandas Index
assert_index_equal(df1.index, df_pandas.index)

新しい PySpark DataFrame と Pandas API on Spark の等式テスト関数を使用することは、PySpark コードが期待通りに動作することを確認する素晴らしい方法です。 これらの機能は、エラーをキャッチするだけでなく、何が問題なのかを正確に理解し、問題がどこにあるのかを素早く簡単に特定するのに役立ちます。 詳しくはTesting PySparkのページを参照してください。

これらの機能は、次期Apache Spark 4.0から利用可能になります。 DBR14.2はすでにサポートしています。

Databricks 無料トライアル

関連記事

Engineering blog

Apache Spark™ 3.5のご紹介

翻訳:Junichi Maruyama. - Original Blog Link 本日、Databricks Runtime 14.0の一部として、Databricks上でApache Spark™ 3.5が利用可能になったことを発表いたします。Spark 3.5のリリースに多大な貢献をしていただいたApache Sparkコミュニティに深く感謝いたします。 Sparkをこれまで以上にアクセスしやすく、多用途で効率的なものにするという我々のミッションに沿った今回のアップデートには、以下のような新機能と改良が盛り込まれています: The English SDK for Apache Spark enables users to...
エンジニアリングのブログ一覧へ