メインコンテンツへジャンプ
Platform blog

この4部構成のブログ・シリーズ「Lessons learned building Cybersecurity Lakehouses」では、サイバーセキュリティ・データ用のレイクハウスを構築する際に、組織がデータ・エンジニアリングで直面する多くの課題について議論し、それらを克服するために私たちが現場で使用したソリューション、ヒント、トリック、ベスト・プラクティスを紹介する。

パート1では、まず統一されたイベントのタイムスタンプ抽出から始めた。 この第2部では、効果的なセキュリティ運用を維持するために不可欠なログの取り込みの遅延を発見し、対処する方法について見ていく。

このブログが終わるころには、直面する問題のいくつかと、データ取り込みの遅れを監視し報告するために使用できるいくつかのテクニックをしっかりと理解していることだろう。

なぜデータの取り込みが重要なのか?

タイムリーで、正確で、検索可能なログデータは、セキュリティ・オペレーションにおいて非常に重要です。 アナリストは、セキュリティ・イベントやインシデントをほぼリアルタイムで可視化する必要がある。 事故対応者はしばしば、環境へのさらなるダメージを回避または軽減するために回避行動を取る必要がある。 コンプライアンスとアシュアランスの機能には、データの完全性を証明する法的義務があり、セキュリティ・インシデントのタイムリーな報告を義務付ける規制遵守義務がある。

データ取り込み遅延の課題

データの取り込みは、伝統的なインフラタイプの問題から、最新のデータスタックとそのマルチホップ取り込みルートに起因する遅延まで、さまざまな理由で遅延する可能性がある。

従来のインフラ

従来の非SaaSタイプの環境では、ログソースは、オンプレミスまたはクラウドホスティングインスタンスのいずれかのシステムによって生成されることが多く、多くの場合、独自のログ転送エージェントがローカルにインストールされている。 以下は、より伝統的な建築において遅延を引き起こす可能性のある問題の例である:

  • ネットワーク停止
  • 受信システムのリソース不足
  • フォワーディングおよびミドルウェア層の障害とリソースの枯渇

最新のクラウドスタック

多くのSaaSプロバイダーは、顧客が他の分析製品に取り込むために、製品からログファイルをスケジュールまたはストリーミングでエクスポートできるようにしています。 ストリーミング製品やSaaSサービスが存在する一方で、多くの組織が、サイバー分析エンジンに取り込む前に、これらのログファイルをクラウドオブジェクトストレージにランディングすることを選択しています。 これにより、マルチホップ、時間遅延、ほとんどバッチに近い取り込みパターンが生まれる。 これは、最近のアーキテクチャがログファイルに関してどのように相互運用することが多いかの副産物である。 以下は、SaaSが生成したログを取り込む際に発生する可能性のある問題の例である;

  • SaaSプロバイダーのログエクスポートの失敗
  • SaaSプロバイダーのログエクスポート遅延
  • クラウドストレージのバケット書き込み失敗
  • 受信システムが新しく書き込まれたファイルを認識できない

インジェスト問題のモニタリング

このブログシリーズの第1回をお読みになった方は、インジェスト時に2つのメタデータ・フィールドを生成することを推奨していることをご存じだろう。 event_timeと _ingest_time

メダリオンアーキテクチャーのブロンズ層でこれら2つのカラムをキャプチャすることで、ログデータの受信と処理の遅延を監視することができる。

主に2つの質問に答えなければならない:

  1. 各ログソースから、期待される割合でデータが入ってきているか?
  2. 各ログソースから、予想される頻度でデータが入ってきているか?

以下の例は、これらをどのように実現できるかを示している。

以下のデータフレームには、インジェスト遅延モニタリングの生成に使用された両方のタイムスタンプが含まれています。

サイバーセキュリティのレイクハウス パート2:取り込み遅延への対応

データフレーム内の各行について、各レコードが何分遅れたかを計算し、結果を新しい列ingest_lag_minsに書き込む。

df = df.withColumn("ingest_lag_mins" 、 round((col("_ingest_time").cast("long") - col("_event_time").cast("long"))/60,0))
display(df)

サイバーセキュリティのレイクハウス パート2:取り込み遅延への対応

ラグ・カラムが作成されたので、ビジュアライゼーション・エディターを使ってビジュアライゼーションを作成するのはとても簡単です。

サイバーセキュリティのレイクハウス パート2:取り込み遅延への対応

サイバーセキュリティのレイクハウス パート2:取り込み遅延への対応

上の図は、ソースタイプ別の平均ラグ(分)を示している。

これがダッシュボードの出発点となる。 しかし、調査が必要な例外を示すレポートを作成すべきだ。 そのために、各ログソースに対して予想される閾値を追加することができる。

def add_threshold_column(input_df):
   # 'threshold' カラムの条件と値を定義する
 threshold_mins = [
 (col("sourcecetype") =="access_combined", 300),
 (col("sourcetype") =="vpc_flowlogs", 200).
   ]
 default_value = 100
 
 # 条件を適用し、新しいカラムに値を代入
 output_df = input_df.withColumn("threshold",
 when(threshold_mins[0][0], threshold_mins[0][1])
.when(threshold_mins[1][0]、 threshold_mins[1][1])
.otherwise(default_value))
 
 return output_df

df = add_threshold_column(df)

サイバーセキュリティのレイクハウス パート2:取り込み遅延への対応

最後に、しきい値の範囲外、またはオプションでしきい値の倍数で動作しているログ・ソースについて報告する。

from pyspark.sql.functions import max

THRESHOLD_MODIFIER = 5
df2 = (df.groupBy("source""sourceetype","threshold","ingest_lag_mins").agg(max("_ingest_time").alias("last_seen"))
.where(col("ingest_lag_mins") > THRESHOLD_MODIFIER*(col( threshold )) .orderBy( last_seen 、""
""ascending=True)
)
display(df2)

上記のコマンドでは、余分なノイズを取り除くために THRESHOLD_MODIFIERを 定義し、PySparkの関数 MAXを使って 新しいカラム last_seenを 作成し、最後にインジェストラグタイムが閾値より大きいレコードのみをフィルタリングしています。

サイバーセキュリティのレイクハウス パート2:取り込み遅延への対応

予想される頻度のモニタリング

ログソースまたはレポーティングホストは、半定期的にデータを送信することが期待される。 活動レベルに応じて、頻度は変わる。 予想される頻度でログを記録しないソースを特定するための戦略は数多くある。 この例では、予想される時間枠内にログに記録されないソースをレポートする簡単な方法を示します。 その他の戦略としては、以下のようなものが考えられる。

  • 上記を応用し、時間ウィンドウ内で閾値を超えた倍数を探す。
  • ソースが見つからない場合、ログソースごとに専用のしきい値を保存し、例外によってレポートする。
  • 基準値または正常な頻度を作成し、標準偏差の倍数に基づいて報告する。
from pyspark.sql.functions import current_timestamp


df3 = (df.groupBy("source" 、 "sourcetype","threshold").agg(max("_ingest_time").alias("last_seen")、 current_timestamp().alias("t_now"))
)
display(df3)

サイバーセキュリティのレイクハウス パート2:取り込み遅延への対応

上のコマンドでは、2つの新しいカラムlast_seent_now を作成し、ソースとソースタイプで集約して、各ログソースで受信した最新のイベントを与えている。

from pyspark.sql.types import IntegerType

df4 = (df3.withColumn('missing_minutes'、 ((col('t_now').cast('long')) - col('last_seen').cast('long'))/ 60).cast(IntegerType())
.where(col("missing_minutes"). > 240) ) display(df4)

サイバーセキュリティのレイクハウス パート2:取り込み遅延への対応

あるいは、ソース・カラムとソースタイプ・カラムを連結し、単純なリストをレポートすることもできる;

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import concat_ws

df4 = (df3.withColumn('missing_minutes'、 ((col('t_now').cast('long')) - col('last_seen').cast('long'))/ 60).cast(IntegerType())
.where(col("missing_minutes"). > 240) .select(concat_ws( : 、
"""source","sourcetype").alias("missing_log_source")))


display(df4)

サイバーセキュリティのレイクハウス パート2:取り込み遅延への対応

Databricks SQL(DB SQL)を使用している場合は、ダッシュボードを作成して頻繁に確認し、ログソースの欠落や遅延に対してアラートを出すことをお勧めします。 もう一つの可能性は、Databricksのワークフロー機能を使ってノートブックの実行をスケジュールし、実行結果をメールで送信することです。

ヒントとベストプラクティス

このブログでは、ログ・ソースの遅延や欠落を特定し、レポートするためのいくつかのオプションについて説明します。 これをより効果的にする方法は他にもあり、興味のある読者に委ねられている。 しかし、最初に考えたことはいくつかある:

  • イベントをタイムウィンドウにバケットし、ローリング平均遅延を計算し、ログソースごとの通常の遅延を提供する。
  • ログ・ソースごとに「欠落」プロパティを格納し、欠落値を超えるソースのみをレポートする。
  • ダッシュボードを追加し、ドロップダウンで特定のソースを選択することで、すべてのログソースを可視化する。

まとめ

データの取り込みの遅れを認識することは、セキュリティと保証機能の多くの部分にとって重要であり、したがって、監視し、迅速に解決しなければならない。 適切な管理体制が整備されていないと、組織には死角が生じ、コンプライアンス上の義務を果たせない可能性がある。

お問い合わせ

Databricksのサイバー・ソリューションがどのようにあなたの組織にサイバー脅威の特定と軽減の力を与えることができるかについて、さらにご興味がおありでしたら、[email protected]ご連絡いただき、Lakehouse for Cybersecurity Applicationsのウェブページをご覧ください。

Databricks 無料トライアル

関連記事

Platform blog

サイバーセキュリティ・レイクハウス Part 1: イベントのタイムスタンプ抽出

November 3, 2023 デレク・キング による投稿 in プラットフォームブログ
この4回にわたるブログ・シリーズ "Lessons learned from building Cybersecurity Lakehouses," では、サイバーセキュリティ・データ用のレイクハウスを構築する際に、組織がデータ・エンジニアリングで直面する多くの課題について説明し、それを克服するために私たちが現場で使用したソリューション、ヒント、コツ、ベスト・プラクティスを紹介する。 このシリーズでは、サイバーセキュリティのレイクハウスを作りたいとお考えの方に、課題を学び、進むべき道を提案します。 Databricksは、サイバーログを効率的に処理し、標準化するための実用的なローコード・コンフィギュレーション・ソリューションを構築した。 当社のLakehouseプラットフォームは、データエンジニアリングを簡素化し、検索、分析、ストリーム型脅威検知への迅速な移行を促進します。 既存のSIEMやSOARシステムを補完し、不必要に複雑化することなくサイバーセキュリティ運用を強化します。 第1部では、サイバー分析エンジン
Platform blog

Cybersecurity in the Era of Multiple Clouds and Regions

August 30, 2022 Zafer BilalogluLipyeow Lim による投稿 in 製品
In 2021, more than three quarters of all enterprises have infrastructure in multiple clouds . This trend shows no signs of slowdown with...
Industries category icon 1

サイバーセキュリティアプリケーション向けDatabricks Lakehouseプラットフォーム

翻訳: Masahiko Kitamura 具体的なコードはIOCマッチングのソリューションアクセラレータの GitHub reo を参照ください。また、本ソリューションのPOC・トライアルについては [email protected] までご連絡ください。 金融機関、医療機関、政府機関がデータをクラウドに移行し、IoTセンサーや相互接続されたデバイスが増加しているため、サイバーセキュリティは依然として重要なデータ課題となっています。地政学的な脅威が続く中、企業は、大量のデータの処理、複雑なデータ処理タスク(人工知能や機械学習などの高度な分析機能を含む)のサポート、費用対効果の高い拡張が可能なDatabricks Lakehouseプラットフォームをサイバー業務に採用しています。Databricks Lakehouseプラットフォームは、データ、アナリティクス、AIを単一のプラットフォームで統合した、サイバーセキュリティ業界の隠れた標準基盤になっています。 企業やサイバーセキュリティベンダー
プラットフォームブログ一覧へ