メインコンテンツへジャンプ
Engineering blog

PySparkによるパラメータ化クエリ

マシュー・パワーズ
ダニエル・テネドリオ
Hyukjin Kwon
Share this post

PySparkは常にデータを問い合わせるための素晴らしいSQLとPython APIを提供してきました。 Databricks Runtime 12.1とApache Spark 3.4の時点で、パラメータ化されたクエリは、Pythonicプログラミングパラダイムを使用してSQLでデータをクエリする安全で表現力豊かな方法をサポートしています。

この投稿では、PySparkでパラメータ化されたクエリを作成する方法と、それがあなたのコードにとって良いデザインパターンである場合について説明します。

パラメータは、Sparkコードの再利用やテストを容易にするのに役立ちます。 また、良いコーディングの実践も奨励しています。 この記事では、PySparkのクエリをパラメータ化する2つの異なる方法を示します:

  1. PySparkカスタム文字列フォーマット
  2. パラメータマーカー

両方のタイプのPySparkパラメータ付きクエリの使い方を見て、組み込みの機能が他の選択肢よりも優れている理由を探ってみましょう。

パラメータ化されたクエリの利点

パラメータ化されたクエリは、"Don't Repeat yourself" (DRY)パターンを奨励し、ユニットテストを容易にし、SQLを再利用しやすくします。 また、セキュリティの脆弱性をもたらすSQLインジェクション攻撃も防ぐことができます。

似たようなクエリを書くとき、SQLの大きな塊をコピー&ペーストしたくなることがあります。 パラメータ化されたクエリは、パターンを抽象化し、DRYパターンでコードを書くことを推奨します。

パラメータ化されたクエリはテストも簡単です。 クエリをパラメータ化することで、本番データやテストデータセットで簡単に実行できます。

一方、Pythonのf-stringsを使ってSQLクエリを手動でパラメータ化するのは、あまり良い方法ではありません。 以下のデメリットを考えてみてください:

  1. Pythonのf-stringsはSQLインジェクション攻撃への防御を提供しません。
  2. Pythonのf-stringsはDataFramesやカラム、特殊文字などのPythonネイティブオブジェクトを理解しません。

SQLインジェクションの脆弱性からコードを保護し、文字列形式の一般的なPySparkインスタンスの自動型変換をサポートするパラメータマーカーでクエリをパラメータ化する方法を見てみましょう。

PySpark カスタム文字列フォーマットによるパラメータ化クエリ

h20_1e9という9列のデータ・テーブルがあるとします:

+-----+-----+------------+---+---+-----+---+---+---------+
|  id1|  id2|         id3|id4|id5|  id6| v1| v2|       v3|
+-----+-----+------------+---+---+-----+---+---+---------+
|id008|id052|id0000073659| 84| 89|82005|  5| 11|64.785802|
|id079|id037|id0000041462|  4| 35|28153|  1|  1|28.732545|
|id098|id031|id0000027269| 27| 38|13508|  5|  2|59.867875|
+-----+-----+------------+---+---+-----+---+---+---------+

以下の SQL クエリをパラメータ化したいとします:

SELECT id1, SUM(v1) AS v1 
FROM h20_1e9 
WHERE id1 = "id089"
GROUP BY id1

このクエリを、id1 の値を変えて簡単に実行できるようにしたいとします。 ここでは、id1 の値を変えてクエリをパラメータ化して実行する方法を説明します。

query = """SELECT id1, SUM(v1) AS v1 
FROM h20_1e9 
WHERE id1 = {id1_val} 
GROUP BY id1"""

spark.sql(query, id1_val="id016").show()

+-----+------+
|  id1|    v1|
+-----+------+
|id016|298268|
+-----+------+

別の引数を指定してクエリを再実行します:

spark.sql(query, id1_val="id018").show()

+-----+------+
|  id1|    v1|
+-----+------+
|id089|300446|
+-----+------+

PySpark の文字列フォーマッタを使うと、明示的に一時ビューを定義することなく DataFrame に対して直接 SQL クエリを実行することもできます。

次のようなperson_dfというDataFrameがあるとします:

+---------+--------+
|firstname| country|
+---------+--------+
|    frank|     usa|
|   sourav|   india|
|    rahul|   india|
|      sim|buglaria|
+---------+--------+

SQLでDataFrameに問い合わせる方法は以下の通りです。

spark.sql(
    "select country, count(*) as num_ppl from {person_df} group by country",
    person_df=person_df,
).show()

+--------+-------+
| country|num_ppl|
+--------+-------+
|     usa|      1|
|   india|      2|
|bulgaria|      1|
+--------+-------+

一時ビューを手動で登録することなく、SQL構文を使ってDataFrame上でクエリを実行できるのは非常に便利です!

それでは、パラメータマーカーで引数を指定してクエリをパラメータ化する方法を見てみましょう。

パラメータマーカーによるパラメータ化クエリ

引数のディクショナリを使用して、パラメータマーカーでパラメータ化された SQL クエリを作成することもできます。

some_purchasesという名前の以下のビューがあるとします:

+-------+------+-------------+
|   item|amount|purchase_date|
+-------+------+-------------+
|  socks|  7.55|   2022-05-15|
|handbag| 49.99|   2022-05-16|
| shorts|  25.0|   2023-01-05|
+-------+------+-------------+

ここでは、名前付きパラメータマーカーを使用してパラメータ化されたクエリを作成し、指定されたアイテムの合計金額を計算する方法を示します。

query = "SELECT item, sum(amount) from some_purchases group by item having item = :item"	

靴下に費やした総額を計算してください。

spark.sql(
    query,
    args={"item": "socks"},
).show()

+-----+-----------+
| item|sum(amount)|
+-----+-----------+
|socks|      32.55|
+-----+-----------+

無名のパラメータマーカーを使用してクエリをパラメータ化することもできます。

Apache Sparkはパラメータマーカーをサニタイズするので、このパラメータ化手法はSQLインジェクション攻撃からも保護されます。

PySparkがパラメータ付きクエリをサニタイズする方法

以下は、Sparkが名前付きパラメータ化クエリをどのようにサニタイズするかについての高レベルの説明です:

  • SQLクエリは、オプションのキー/値パラメータ・リストとともに送信されます。
  • Apache SparkはSQLクエリを解析し、パラメータ参照を対応する解析ツリーノードに置き換えます。
  • 分析中、Catalystルールが実行され、これらの参照をパラメータから提供されたパラメータ値に置き換えます。
  • この方法はリテラル値のみをサポートするため、SQLインジェクション攻撃から保護されます。 正規文字列補間はSQL文字列に対して置換を適用します。文字列が意図したリテラル値以外のSQL構文を含んでいる場合、この戦略は攻撃に対して脆弱になる可能性があります。

前述したように、PySparkでは2種類のパラメータ付きクエリがサポートされています:

構文 {} はクライアント側でSQLクエリの文字列置換を行い、使いやすさとプログラムしやすさを向上させます。 ただし、クエリテキストは Spark サーバーに送信される前に置換されるため、SQL インジェクション攻撃からは保護されません。

パラメータ化はsql() API のargs引数を使用し、SQL テキストとパラメータを別々にサーバに渡します。 SQLテキストはパラメータのプレースホルダと共に解析され、解析されたクエリツリーのargsで指定されたパラメータの値に置き換えられます。

サーバサイドパラメータ化クエリには、名前付きパラメータマーカーと無名パラメータマーカーの2種類があります。 名前付きパラメータマーカーでは、プレースホルダに :<param_name> 構文を使用します。 無名パラメータマーカーの使用方法の詳細については、ドキュメントを参照してください。

パラメータ化されたクエリーと文字列の補間

通常のPython文字列補間を使用してクエリをパラメータ化することもできますが、それほど便利ではありません。

Pythonのf-stringsを使った先ほどのクエリのパラメータは以下のようになります:

some_df.createOrReplaceTempView("whatever")
the_date ="2021-01-01"
 min_value ="4.0"
table_name ="whatever"

 query = f"" " SELECT * from {table_name}
WHERE the_date> '{the_date}'.AND number> {min_value}"" " 
 spark.sql(query).show()

次のような理由から、これはあまりいいことではありません:

  • 一時的なビューを作成する必要があります。
  • Python の日付ではなく、文字列として日付を表現する必要があります。
  • SQL文字列を適切にフォーマットするために、クエリ内で日付をシングルクォートで囲む必要があります。
  • これはSQLインジェクション攻撃を防御するものではありません。

まとめると、組み込みのクエリ・パラメータ化機能は、文字列補間よりも安全で効果的です。

まとめ

PySpark のパラメータ化されたクエリは、使い慣れた SQL 構文できれいなコードを書くための新しい機能を提供します。 SQLでSpark DataFrameをクエリするときに便利です。 浮動小数点値、文字列、日付、datetimeのような一般的なPythonデータ型を使うことができ、これらは自動的にSQL値に変換されます。 このようにして、一般的なPythonイディオムを活用し、美しいコードを書くことができるようになりました。

今すぐPySparkのパラメータ付きクエリを活用することで、より高品質なコードベースのメリットをすぐに享受できます。

Databricks 無料トライアル

関連記事

Engineering blog

Spark ConnectにおけるPythonの依存関係の管理方法

November 14, 2023 Hyukjin Kwon鄭瑞鳳 による投稿 in エンジニアリングのブログ
分散コンピューティング環境におけるアプリケーションの環境管理は難しい。 すべてのノードがコードを実行するのに必要な環境を持っていることを保証し、ユーザーのコードの実際の場所を決定することは、複雑なタスクである。 Apache Spark™は、Conda、venv、PEXなど様々な方法を提供している。 --jars、--packagesの ようなスクリプトオプションや、 spark.jars.*の ようなSparkコンフィギュレーションをサブミットする方法と 同様に、 PySparkでPythonの依存関係を管理する方法も併せて参照してみてください。これらのオプションにより、ユーザーはクラスタ内の依存関係をシームレスに処理できる。 しかし、Apache Sparkの依存関係を管理するための現在のサポートには限界がある。 依存関係は静的にしか追加できず、実行中に変更することはできない。 つまり、Driverを起動する前に必ず依存関係を設定する必要がある。 この問題に対処するため、Apache Spark 3.5.0か
Engineering blog

SQL関数の名前付き引数

本日は、SQL関数で名前付き引数を利用できるようになったことを紹介します。 この機能を使えば、より柔軟な方法で関数を呼び出すことが可能になります。 このブログでは、まずこの機能がどのようなものかを紹介し、次にSQLユーザー定義関数(UDF)のコンテキストで何ができるかを示し、最後に組み込み関数でどのように機能するかを探ります。 まとめると、名前付き引数はSQLのヘビーユーザーにとってもライトユーザーにとっても、作業を容易にする新しい便利な方法です。 名前付き引数とは何か? 多くのプログラミング言語では、関数定義に1つ以上の引数のデフォルト値を含めることができます。 例えば、Pythonでは次のようなメソッドを定義できます: def botw(x, y = 6, z = 7): return x * y + z ユーザーがこの機能を呼び出したい場合、次のように選択できます: botw(5...
Engineering blog

Python ユーザー定義テーブル関数(UDTFs)の紹介

Apache Spark™ 3.5とDatabricks Runtime 14.0は、エキサイティングな機能をもたらした:Pythonのユーザー定義テーブル関数(UDTFs)です。 このブログでは、UDTFとは何か、なぜUDTFは強力なのか、そしてどのようにUDTFを使うことができるのかについて説明する。 Pythonのユーザー定義テーブル関数(UDTF)とは? Pythonのユーザー定義テーブル関数(UDTF)は、出力として単一のスカラー結果値の代わりにテーブルを返す新しい種類の関数です。 一度登録されると、SQLクエリの FROM 句に登場させることができる。 各Python UDTFは0個以上の引数を受け入れ、各引数は整数や文字列のような定数スカラー値である。 関数本体は、これらの引数の値を調べて、どのデータを返すべきかを決定することができる。 PythonのUDTFを使うべき理由 要するに、複数の行や列を生成する関数が必要で、Pythonの豊富なエコシステムを活用したいのであれば、Python UDTFが
エンジニアリングのブログ一覧へ