メインコンテンツへジャンプ
Platform blog

dbtとDatabricksを用いてコスパの良いリアルタイムデータ処理を行う

シャビル・カーンバイ
ポール・ラパス
ビラル・アスラム
Share this post

ビジネスが成長するにつれ、データ量はGBからTB(またはそれ以上)に拡大し、レイテンシー要求は数時間から数分(またはそれ以下)になり、ビジネスに新鮮な洞察を提供するためのコストはますます高くなります。これまでPythonやScalaのデータエンジニアは、このような需要に応えるためにストリーミングを利用し、新しいデータをリアルタイムで効率的に処理してきましたが、SQLベースのdbtパイプラインを拡張する必要があるアナリティクスエンジニアには、このような選択肢はありませんでした。

しかし今は違います!このブログでは、Databricks の新しいストリーミングテーブルとマテリアライズドビューを使用して、SQL と dbt のシンプルさで新鮮なリアルタイムのインサイトをビジネスに提供する方法を説明します。

背景

2023 Data + AI Summitでは、Databricks SQLにストリーミングテーブルとマテリアライズドビューを導入しました。この素晴らしい機能により、Databricks SQL ユーザーは、Delta Live Tables で初めて導入された強力な新しいテーブルマテリアライゼーションに簡単にアクセスできるようになり、大規模なクエリのインクリメンタル化や、イベントデータソースからの直接ストリーミングなどが可能になりました。

dbt-databricksは、Databricks上でデータモデルを構築する最も一般的な方法の1つとなっており、Photonコンピュートエンジン、即座にスケーリング可能なServerless SQL Warehouses、Unityカタログガバナンスモデルなど、Databricks SQLの強力な機能を、dbtの変換フレームワークのユビキタス性と共に活用しています。

dbt-Databricks で何が変わったのか?

dbt v1.6+の時点で、dbt-Databricks は3つの重要な面で進化しています:

  1. 新しいマテリアライゼーション:"streaming_table" および"materialized_view"
  2. ソースをテーブルとしてステージングすることなく、クラウドデータストレージから直接読み込むための新しい構文
  3. window aggregations、watermarking、stream-stream joinsなどの高度なストリーミング概念へのアクセス

注:近々リリースされるdbt v1.7.3では、上記の機能がさらに改良される予定です!

Airline Tripsのデモで、これらの新機能の使い方を見てみよう。

エアライン・トリップ・デモ

Airline Tripsのデモは、ダッシュボードやAIモデルなど、Databricks上で最新のビジネスインサイトのためにライブイベントデータを段階的に取り込み、変換する方法を示すために作成されました。このデータセットは、米国で利用されているすべての航空会社の旅行を時系列で表しており、各旅行の出発と到着の遅延をキャプチャしています。

付属のヘルパー・ノートブックでは、このデータセットからシミュレートされたストリームを確立し、dbtプロジェクトでは、これらの生のjsonイベントを受け取り、ストリーミングETLを介してマテリアライズド・ビューやフィーチャー・テーブルなどのレイヤーに変換するデータモデルを紹介しています。

リポジトリはこちらで公開されており、すべてのDatabricksワークスペースにパッケージされたサンプルデータをすぐに利用することができます。お気軽にフォローしてください!

航空会社のトリップ・データ・モデル
The airline trips data model

クラウドデータストレージからのデータ取り込み

ストリーミングテーブルを活用する最も簡単な方法の1つは、AWSのS3やAzureのADLSのようなクラウドデータストレージからのデータ取り込みです。イベントデータを大量に生成するアップストリームデータソースがあり、それを生ファイルとしてストレージ(通常はjson、csv、parquet、avro)に取り込むプロセスがあるとします。

このデモでは、米国で利用されたすべての航空券のライブ・フィードを外部パーティから受け取り、その都度インジェストするとします。

ファイルを外部テーブルとしてステージングしたり、サードパーティツールを使用してデータソースのデルタテーブルを実体化する代わりに、ストリーミングテーブルを使用するだけで解決できます。ブロンズ航空券のフィードのモデルを以下に示します:

{{
    config(
        materialized='streaming_table'
    )
}}

select 
    * 
    ,_metadata.file_modification_time as file_modification_time
from stream read_files('{{var("input_path")}}/airlines', format=>'json')

注意すべき点は2つあります:

  • マテリアライゼーション・ストラテジは'streaming_table'に設定されています。
    • これはDatabricksでCREATE OR REFRESH STREAMING TABLEコマンドを実行します。
  • クラウドストレージから読み込む構文は、Auto Loader を利用しています。
    • read_files()は、指定されたフォルダ内の新しいjsonファイルをリストアップし、処理を開始します。dbtを使用しているので、s3フォルダのパスを動的に渡すためにdbtのvar()関数を利用しています("s3://... "の形式)。
    • STREAMキーワードは、この場所からストリーミングすることを示します。また、このキーワードがなくても、read_files() に materialized='table' を指定することで、指定したフォルダから直接一括読み込みを行うこともできます。

余談ですが、Auto Loaderは最小限のセットアップで利用できますが、Kafka、Kinesis、Event Hubsのようなイベント・ストリーミング・プラットフォームから直接ストリーミングすることも可能で、非常によく似た構文を使ってさらに低レイテンシーを実現できる。 詳しくはこちらをご覧ください。

シルバーレイヤーのインクリメンタル・エンリッチ・データ

ストリーミングはインジェストステップで停止する必要はありません。下流で結合を実行したりサロゲートキーを追加したりしたいが、計算量を節約するために新しいデータだけに限定したい場合は、引き続き Streaming Table のマテリアライズを使用できます。たとえば、次のモデルでは空港コードのマッピングテーブルを生のデータセットに結合しています:

{{
    config(
        materialized='streaming_table'
        )
}}

...

SELECT 
  {{ dbt_utils.generate_surrogate_key([
                'ArrTimestamp'
            ])
        }} as delay_id
  ,...
FROM STREAM({{ref("airline_trips_bronze")}}) raw
INNER JOIN {{ref("airport_codes")}} ac
  ON raw.Origin = ac.iata_code
...

今回もStreaming Tableのマテリアライゼーションを利用し、すべてのロジックに標準的なdbtの機能を利用することができました。これには以下が含まれます:

  • dbt_utilsパッケージの活用によるサロゲートキーの生成などの便利なショートカット機能
  • 完全なリネージを維持するためのref()文の使用

SQLの唯一の実質的な変更は、airline_trips_bronzeのref()文の周りにSTREAM()キーワードを追加したことです。これはstream-static joinと呼ばれます。

マテリアライズド・ビューで計算効率の良いゴールド・レイヤーを作る

エンリッチされたシルバー・テーブルの準備ができたので、集約されたインサイトをエンド・ビジネス・コンシューマに提供する方法を考えることができます。通常、テーブルのマテリアライゼーションを使用する場合、毎回すべての過去の結果を再計算する必要があります。

各実行で新しいデータのみを処理する Streaming Tables のアップストリームを利用するために、このタスクの代わりに Materialized Views を使用します!

Databricksの良い点は、マテリアライズド・ビューを構築するモデルはテーブルを構築するモデルと変わらないということです!例えば、ゴールドレイヤーのマテリアライズド・ビューで、毎日のフライトの遅延率を計算する例を考えてみましょう:

{{
    config(
        materialized='materialized_view'
    )
}}

    SELECT 
        airline_name
        ,ArrDate
        ,COUNT(*) AS no_flights
        ,SUM(IF(IsArrDelayed = TRUE,1,0)) AS tot_delayed
        ,ROUND(tot_delayed*100/no_flights,2) AS perc_delayed
        FROM {{ ref('airline_trips_silver') }}
        WHERE airline_name IS NOT NULL
        GROUP BY 1,2

変更したのはマテリアライゼーションの設定だけです!

マテリアライズド・ビューは、ベース・テーブルが変更されたときにインクリメンタルに更新できることを覚えておいてください。上記のシナリオでは、新しいデータをストリーミングすると、マテリアライズド・ビューは再計算が必要なグループを判断し、そのグループのみを計算します。この例では、フライトの到着日である ArrDate を集計しているため、新しい日のデータは自然に新しいグループに分類され、既存のグループは変更されません。

モデルを数回実行した後のマテリアライズド・ビューのイベント・ログ(下の写真)を分析すると、インクリメンタル化が機能していることがわかります。最初の実行は他のテーブルと同様に完全な計算ですが、新しいデータで集計を更新するための2回目の実行では、行単位のインクリメンタルなリフレッシュを利用しています。モデルの最後の実行は、新しいデータが上流に取り込まれていないことを認識し、そのまま何もしません。

マテリアライズド・ビューのイベントログ
Materialized view event log

デモリポジトリには他に何がありますか?

イベント ソースから BI 対応のマテリアライズド ビューに直接データを取得する基本的な方法を説明しましたが、デモ リポジトリにはさらに多くの情報が含まれています。

このリポジトリには、ストリーミングテーブルとマテリアライズドビューのログを監視して、データがどのように処理されているかを理解する方法の例や、このブログでは取り上げていない、SQLだけで2つのストリームを結合するストリームストリーム結合の高度な例が含まれています!

Databricks環境にレポをクローンして始めるか、dbtクラウドをDatabricksに追加費用なしでパートナーコネクトで接続してください。また、マテリアライズド・ビューストリーミング・テーブルのドキュメントでも詳細をご覧いただけます。

Databricks 無料トライアル

関連記事

Platform blog

Databricks SQLのマテリアライズド・ビューとストリーミング・テーブルの紹介

翻訳:Junichi Maruyama. - Original Blog Link AWSとAzure上の Databricks SQL でマテリアライズド・ビューとストリーミング・テーブルが公開されたことをお知らせできることを嬉しく思います。ストリーミングテーブルは、クラウドストレージやメッセージキューからの増分インジェストを提供します。マテリアライズド・ビューは、新しいデータが到着すると自動的にインクリメンタルに更新されます。これら2つの機能を組み合わせることで、インフラストラクチャを必要としないデータパイプラインが実現し、セットアップが簡単で、新鮮なデータをビジネスに提供することができます。このブログポストでは、アナリストやアナリティクス・エンジニアがデータウェアハウスでデータとアナリティクス・アプリケーションをより効果的に提供するために、これらの新機能がどのように役立つかを探ります。 背景 データウェアハウスとデータエンジニアリングは、データ駆動型の組織にとって極めて重要である。データウェアハウスはアナリ
Platform blog

Best Practices for Super Powering Your dbt Project on Databricks

dbt is a data transformation framework that enables data teams to collaboratively model, test and document data in data warehouses . Getting started...
Platform blog

Databricks、dbt Labs、Fivetranと一緒にレイクハウスでモダンデータスタックを構築する5つの理由

Original : Five Reasons to Build your Modern Data Stack on the Lakehouse with Databricks, dbt Labs and Fivetran translate by junichi.maruyama 数年前、クラウドベースのモダンデータ・プラットフォームによって、アナリティクスとそれを支えるツールが実務者の手に渡るようになり、モダンデータ・スタック(MDS)が登場しました。オンプレミスで慎重にサイズを調整したHadoopクラスタの時代は終わり、瞬時に拡張でき、標準SQLを使用して新世代のETLおよびBIツールに接続できるデータウェアハウスに取って代わられました。レイクハウスパターンは、ここ数年で登場した最新の、そしておそらく最も強力なパターンです。データウェアハウスのシンプルさと拡張性、データレイクのオープン性とコスト面の優位性を一体化させたものです。重要なのは、レイクハウスパターンは厳密に加算型であることです。データ実務家として
プラットフォームブログ一覧へ