メインコンテンツへジャンプ

Apache Spark 3.0 概要|Python API の強化・PySpark API の拡充など新機能搭載

Databricks Runtime 7.0 での利用が可能に
Share this post

Apache SparkTM 3.0.0 が Databricks Runtime 7.0 で利用できるようになりました。Spark 3.0.0 はオープンソースコミュニティでの多くのコントリビュートが結実したものです。3,400 以上のパッチが含まれ、Python API および ANSI SQL の機能拡充に加え、開発や調査が行いやすくなるような工夫が施されています。オープンソースプロジェクトとして 10 年目を迎え、多くの参加者の意見と多様なユースケースに応え続けてきた結果が反映されています。

Apache Spark 3.0 の主な新機能:

Apache Spark 3.0.0の導入に際しては、大きなコード変更は不要です。詳細については、移行ガイドを参照してください。

Spark 開発のこれまでの 10 年間

Spark は、カリフォルニア大学バークレー校の研究ラボ AMPlab でのデータ集約型コンピューティングに関する研究から生まれました。AMPlab の研究者は当初、大手インターネット企業との共同研究を通じてデータと AI の問題に取り組んでいましたが、それらの問題が、増え続けるデータ量に対応しようとするあらゆる企業において、いずれは共通の課題となることに気づきました。そこで、これらのワークロードのための新たなエンジンと、ビッグデータを利用しやすくするための API の開発に着手したのです。

コミュニティでの開発を通じて Spark はさまざまな分野で活用されるようになりました。ストリーミングや Python と SQL に関する新しい機能が追加され、今ではそれらが Spark の主要なユースケースになっています。そして、この継続的な取り組みを通じて、Spark はデータ処理、データサイエンス、機械学習、データ分析ワークロードにおける事実上の標準となるに至りました。Apache Spark 3.0 は、こうした方向性に沿って開発され、広く使用されている Python と SQL の両言語のサポートを大幅に改善し、全般的なパフォーマンスと運用性の最適化を図っています。

Spark SQL エンジンの改善

Spark SQL は多くの Spark アプリケーションを支えるエンジンです。たとえば、Databricks では、Spark API 呼び出しの 90% 以上で、DataFrame、Dataset、SQL の各 API と SQL オプティマイザによって最適化されたライブラリを使用しています。つまり、Python や Scala で開発を行う場合であっても、作業の大半に Spark SQL エンジンが関わっていることになります。Spark 3.0 でコントリビュートされたパッチの 46%は、SQL のパフォーマンスと ANSI との互換性に関するもので、下の図に示すように、Apache Spark 3.0 は Spark 2.4 に比べ、合計実行時間が約半分にまで改善されています。次に、Spark SQL エンジンの 4 つの新機能について説明します。

Spark SQL エンジンには、4 つの新機能が導入されています。新しい適応型クエリ実行(AQE: Adaptive Query Execution)フレームワークでは、より効率的な実行計画を生成することでパフォーマンスの向上とチューニングの簡略化を行います。正確な統計情報の不足やコスト推定の誤りにより最初の計画が十分に最適化されていない場合でも有効です。Spark では、ストレージとコンピューティングが分離されており、データの到着が予測できないことから、従来のシステムに比べて実行時の適応性が重要になります。そこで、Apache Spark 3.0 では適応型の最適化機能が新たに 3 つ導入されました。

  • シャッフルパーティションの動的統合により、シャッフルパーティション数のチューニングを簡略化、もしくはチューニング自体を不要にします。ユーザーは、事前に多めのシャッフルパーティション数を設定しておくことができ、AQE は実行時に隣接する小さなパーティションを大きなパーティションに結合できます。
  • 結合戦略の動的切り替えにより、統計情報の不足やサイズ推定の誤りのために十分に最適化されていない計画が実行されることを、一定の割合で回避します。ソートマージ結合が自動的にブロードキャストハッシュ結合に変換されるため、チューニングの簡略化とパフォーマンスの向上が期待できます。
  • スキューデータ結合時の動的な最適化により、スキューデータ結合時の処理の偏りに起因する極端なパフォーマンスの低下を防ぎます。AQE がシャッフルファイル統計からスキューを検出した場合、スキューが発生しているパーティションを小さなパーティションに分割したうえで、対応するパーティションと結合させます。その結果、スキューが並行処理されることとなり、全体的なパフォーマンスが向上します。

3TB のTPC-DS ベンチマークでは、AQEが機能していない場合と比べて AQEが機能しているSpark のパフォーマンスは 2 つのクエリで 1.5 倍高速になり、別の 37 のクエリでも 1.1 倍に向上することが確認されています。

TPC-DS 3TB Parquet での AQE ありとなしの比較。
TPC-DS 1TB Parquet での動的パーティションプルーニングありなしの比較

動的パーティションプルーニングは、オプティマイザがコンパイル時にパーティションのスキップが可能か判別できない場合に適用されます。ただし、スタースキーマにおいては、1 つないし複数のファクトテーブルが複数のディメンションテーブルを参照する形になることから、通常はスキップの可否が判別できません。そこで、動的パーティションプルーニングにより、ディメンションテーブルのフィルタ結果からスキップ可能なパーティションを特定し、結合時にファクトテーブルから読み込むパーティションに対してプルーニングを行えるようにしました。TPC-DS ベンチマークでは、102 件のクエリのうち 60 件で 2 倍から18 倍へと大きく速度が向上しています。

ANSI SQL に準拠することで、ワークロードを他の SQL エンジンから Spark SQL に移行しやすくしました。Spark 3.0 では、ANSI SQL に合わせて先発グレゴリオ歴に切り替えています。また、ANSI SQL の予約キーワードを識別子として使用することをユーザーが禁止できるようにしました。さらに、算術演算実行時のオーバーフローチェックや、定義済みスキーマでテーブルにデータを挿入する場合のコンパイル時型適用も導入しています。新たにこれらの検証機能を用意したことで、データの品質が改善されています。 

結合ヒント:コンパイラの改善は継続的に行われていますが、どのような場合においても常に最適な決定が行われる確証はありません。結合時のアルゴリズムは、統計と経験則に基づいて設計されています。コンパイラにおいて最適な決定が行われない場合、ユーザーは結合ヒントを使用してオプティマイザの計画選定に関与することができます。Spark 3.0 では、従来の結合ヒントに加え、SHUFFLE_MERGE、SHUFFLE_HASH、SHUFFLE_REPLICATE_NL の 3 つの結合ヒントを新たに追加しました。

Python API の強化:PySpark と Koalas 
Python は Spark で最もよく使用される言語であり、Apache Spark 3.0 の開発においても重点項目に挙げられていました。Databricks の Notebook コマンドの 68% は Python で記述されており、Apache Spark の Python API である PySpark は、PyPI(Python Package Index)で毎月 500 万回以上ダウンロードされています。

多くの Python開発者がデータの構造化と分析に pandas API を使用していますが、pandas API はシングルノードでの処理しか行えません。そこで、Databricks では、pandas API を Apache Spark 上に実装した Koalas の開発も行っています。データサイエンティストが分散環境でビッグデータを扱う際の生産性の向上を目的としたものです。Koalas では、プロットサポートなどのために PySpark で多くの関数を作成することなく、クラスタ全体を対象として効率的な処理が行えます。

1 年以上の開発を経て、Koalas は pandas API の 80% 程度をカバーするに至りました。2 週間に 1 回の頻度で更新が続けられており、PyPI での毎月のダウンロード件数も急激に増加して 85 万件に達しています。Koalas は、シングルノードで動作する pandas API のコードを容易に移行できる API です。一方、同様に多くのユーザーから支持を集めている Python APIとして、PySpark があります。

PySpark と Koalas のための週次PyPIのダウンロード

Apache Spark 3.0 では、次の複数の項目で PySpark API を拡充しています。

  • 型ヒントを備えた新しい pandas API: pandas UDF は、PySpark のユーザー定義関数のスケーリングや pandas API の PySpark アプリケーションへの統合を目的に、Spark 2.3 以降から導入されています。しかし、従来のインターフェイスでは、UDF で新たに型が追加された場合に、その把握が難しいという問題がありました。Spark 3.0 では、pandas UDF での型の増加に対処するため、Python の型ヒントを活用する新しい pandas UDF インターフェイスを導入しています。新しいインターフェイスは、より Python に近くなり、わかりやすくなっています。
  • pandas UDF の新しい型と pandas 関数 API:Apache Spark 3.0 では、pandas UDF の新しい型が 2 つ追加されています。「シリーズのイテレータからシリーズのイテレータ」と「複数シリーズのイテレータからシリーズのイテレータ」です。これらは、データのプリフェッチや処理負荷の高い初期化を行う場合に有用です。また、新しい pandas 関数 API として、map とco-grouped map が追加されています。詳細については、こちらのブログ記事を参照してください。
  • エラー処理の改善:PySpark のエラー処理は、Python ユーザーにとって少しわかりにくいものになっていました。Spark 3.0 では、PySpark の例外を簡略化し、不要な JVM スタックトレースを表示させないようにして、より Python に近づけました。

Python のサポートの改善と Spark のユーザビリティ向上は、今後も最優先の開発項目です。

Hydrogen プロジェクトにおけるストリーミング機能と拡張性の改善 

Spark 3.0 では、Hydrogen プロジェクトにおける主要コンポーネントが完了し、ストリーミングと拡張性を改善する新機能が導入されています。

  • アクセラレータ対応スケジューリング:Hydrogen プロジェクトは、深層学習と Sparkでのデータ処理をより適切に統合するための取り組みです。GPU とその他のアクセラレータは、深層学習ワークロードのアクセラレーションに広く使用されています。ハードウェアアクセラレーションの効果をターゲットプラットフォームに反映させるため、Spark 3.0 では既存のスケジューラを強化し、クラスタマネージャをアクセラレータに対応させました。ユーザーは、検出スクリプトを使用して設定を行い、構成を介してアクセラレータを指定することができ、新しい RDD API を呼び出してそれらのアクセラレータを利用できます。
  • 構造化ストリーミング用の新しい Spark UI:構造化ストリーミングは、Spark 2.0 で初めて導入されました。データブリックスでの構造化ストリーミングの使用は対前年比で 4 倍に伸び、1 日で 5 兆件以上が処理されています。Spark 3.0 では、これらのストリーミングジョブの詳細を把握するために、専用の新しい Spark UI が追加されました。また、1) 完了したクエリジョブのストリーミングに関する集約情報と、2) ストリーミングクエリに関する詳細な統計情報の 2 つの統計セットが提供されています。

Databricks 上の構造化ストリーミングによって処理されたレコード数の傾向

  • モニタリング指標:データパイプラインを管理するうえで、データの品質に影響する変更を継続的にモニタリングすることが必要です。Apache Spark 3.0 では、バッチ処理アプリケーションとストリーミングアプリケーションのモニタリング機能であるモニタリング指標を導入しました。モニタリング指標は、DataFrame のクエリで自由に定義できる集約関数です。バッチクエリが終了、またはストリーミング期限に到達するなどして DataFrame の実行が完了するとすぐに、前回完了時以降のデータ処理について、各指標が名前付きイベントに格納されて送信されます。
  • 新しいカタログプラグイン API:既存のデータソース API では、外部データソースのメタデータへのアクセスや操作が行えません。Apache Spark 3.0 では、機能を強化したデータソース APIのバージョン 2 を用意し、新しいカタログプラグイン API を導入しました。外部データソースにカタログプラグイン API とデータソース API バージョン 2 を実装すると、対応する外部カタログの登録後に、ユーザーがマルチパート識別子を通じて外部テーブルのデータとメタデータの両方を直接操作できるようになります。

Apache Spark 3.0 の最新情報をさらに詳しく

Spark 3.0 は、オープンソースコミュニティにおいて 3,400 件以上の Jira チケットが解決されたメジャーリリースです。個人だけでなく、データブリックス、Google、Microsoft、Intel、IBM、Alibaba、Facebook、Nvidia、Netflix、Adobe などの企業も加えた 440 以上のコントリビュータが開発に貢献しました。このブログ記事では、Spark 3.0 における Python、API、ANSI SQL 準拠、ストリーミング、パーティション・プルーニングなどについて説明しましたが、他にも紹介できていない多くの機能があります。詳細については、リリースノートをご覧ください。データソース、エコシステム、モニタリングなどの Spark の他の改善事項をご覧ください。

Spark 3.0 および Databricks ランタイム 7.0 の主な機能

Apache Spark 3.0 を使ってみる

Databricks Runtime 7.0 で Apache Spark 3.0 をお試しいただけます。無料トライアルアカウントを取得してください。すぐに使用を開始できます。クラスタを起動させる際に、バージョン 7.0 を選択するだけで Spark 3.0 が使用できます。

Databricks Runtime 7.0 で Spark 3.0を使用するには、クラスタ起動時にドロップダウンメニューから Spark 3.0 を選択するだけです。

機能とリリースの詳細についてはこちらをご覧ください。

Databricks 無料トライアル

関連記事

Apache Spark 3.0 概要|Python API の強化・PySpark API の拡充など新機能搭載

Apache Spark TM 3.0.0 が Databricks Runtime 7.0 で利用できるようになりました。Spark 3.0.0 はオープンソースコミュニティでの多くのコントリビュートが結実したものです。3,400 以上のパッチが含まれ、Python API および ANSI SQL の機能拡充に加え、開発や調査が行いやすくなるような工夫が施されています。オープンソースプロジェクトとして 10 年目を迎え、多くの参加者の意見と多様なユースケースに応え続けてきた結果が反映されています。 Apache Spark 3.0 の主な新機能...
Databricks ブログ一覧へ