この4部構成のブログ・シリーズ「Lessons learned building Cybersecurity Lakehouses」では、サイバーセキュリティ・データ用のレイクハウスを構築する際に、組織がデータ・エンジニアリングで直面する多くの課題について議論し、それらを克服するために私たちが現場で使用したソリューション、ヒント、トリック、ベスト・プラクティスを紹介する。パート1では、まず統一されたイベントのタイムスタンプ抽出から始めた。 この第2部では、効果的なセキュリティ運用を維持するために不可欠なログの取り込みの遅延を発見し、対処する方法について見ていく。このブログが終わるころには、直面する問題のいくつかと、データ取り込みの遅れを監視し報告するために使用できるいくつかのテクニックをしっかりと理解していることだろう。なぜデータの取り込みが重要なのか?タイムリーで、正確で、検索可能なログデータは、セキュリティ・オペレーションにおいて非常に重要です。 アナリストは、セキュリティ・イベントやインシデントをほぼリアルタイムで可視化する必要がある。 事故対応者はしばしば、環境へのさらなるダメージを回避または軽減するために回避行動を取る必要がある。 コンプライアンスとアシュアランスの機能には、データの完全性を証明する法的義務があり、セキュリティ・インシデントのタイムリーな報告を義務付ける規制遵守義務がある。データ取り込み遅延の課題データの取り込みは、伝統的なインフラタイプの問題から、最新のデータスタックとそのマルチホップ取り込みルートに起因する遅延まで、さまざまな理由で遅延する可能性がある。従来のインフラ従来の非SaaSタイプの環境では、ログソースは、オンプレミスまたはクラウドホスティングインスタンスのいずれかのシステムによって生成されることが多く、多くの場合、独自のログ転送エージェントがローカルにインストールされている。 以下は、より伝統的な建築において遅延を引き起こす可能性のある問題の例である:ネットワーク停止受信システムのリソース不足フォワーディングおよびミドルウェア層の障害とリソースの枯渇最新のクラウドスタック多くのSaaSプロバイダーは、顧客が他の分析製品に取り込むために、製品からログファイルをスケジュールまたはストリーミングでエクスポートできるようにしています。 ストリーミング製品やSaaSサービスが存在する一方で、多くの組織が、サイバー分析エンジンに取り込む前に、これらのログファイルをクラウドオブジェクトストレージにランディングすることを選択しています。 これにより、マルチホップ、時間遅延、ほとんどバッチに近い取り込みパターンが生まれる。 これは、最近のアーキテクチャがログファイルに関してどのように相互運用することが多いかの副産物である。 以下は、SaaSが生成したログを取り込む際に発生する可能性のある問題の例である;SaaSプロバイダーのログエクスポートの失敗SaaSプロバイダーのログエクスポート遅延クラウドストレージのバケット書き込み失敗受信システムが新しく書き込まれたファイルを認識できないインジェスト問題のモニタリングこのブログシリーズの第1回をお読みになった方は、インジェスト時に2つのメタデータ・フィールドを生成することを推奨していることをご存じだろう。 event_timeと _ingest_time。メダリオンアーキテクチャーのブロンズ層でこれら2つのカラムをキャプチャすることで、ログデータの受信と処理の遅延を監視することができる。主に2つの質問に答えなければならない:各ログソースから、期待される割合でデータが入ってきているか?各ログソースから、予想される頻度でデータが入ってきているか?以下の例は、これらをどのように実現できるかを示している。以下のデータフレームには、インジェスト遅延モニタリングの生成に使用された両方のタイムスタンプが含まれています。データフレーム内の各行について、各レコードが何分遅れたかを計算し、結果を新しい列ingest_lag_minsに書き込む。df = df.withColumn("ingest_lag_mins" 、 round((col("_ingest_time").cast("long") - col("_event_time").cast("long"))/60,0)) display(df)ラグ・カラムが作成されたので、ビジュアライゼーション・エディターを使ってビジュアライゼーションを作成するのはとても簡単です。上の図は、ソースタイプ別の平均ラグ(分)を示している。これがダッシュボードの出発点となる。 しかし、調査が必要な例外を示すレポートを作成すべきだ。 そのために、各ログソースに対して予想される閾値を追加することができる。def add_threshold_column(input_df): # 'threshold' カラムの条件と値を定義する threshold_mins = [ (col("sourcecetype") =="access_combined", 300), (col("sourcetype") =="vpc_flowlogs", 200). ] default_value = 100 # 条件を適用し、新しいカラムに値を代入 output_df = input_df.withColumn("threshold", when(threshold_mins[0][0], threshold_mins[0][1]) .when(threshold_mins[1][0]、 threshold_mins[1][1]) .otherwise(default_value)) return output_df df = add_threshold_column(df)最後に、しきい値の範囲外、またはオプションでしきい値の倍数で動作しているログ・ソースについて報告する。from pyspark.sql.functions import max THRESHOLD_MODIFIER = 5 df2 = (df.groupBy("source" 、 "sourceetype","threshold","ingest_lag_mins").agg(max("_ingest_time").alias("last_seen")) .where(col("ingest_lag_mins") > THRESHOLD_MODIFIER*(col( threshold )) .orderBy( last_seen 、"" ""ascending=True) ) display(df2)上記のコマンドでは、余分なノイズを取り除くために THRESHOLD_MODIFIERを 定義し、PySparkの関数 MAXを使って 新しいカラム last_seenを 作成し、最後にインジェストラグタイムが閾値より大きいレコードのみをフィルタリングしています。予想される頻度のモニタリングログソースまたはレポーティングホストは、半定期的にデータを送信することが期待される。 活動レベルに応じて、頻度は変わる。 予想される頻度でログを記録しないソースを特定するための戦略は数多くある。 この例では、予想される時間枠内にログに記録されないソースをレポートする簡単な方法を示します。 その他の戦略としては、以下のようなものが考えられる。上記を応用し、時間ウィンドウ内で閾値を超えた倍数を探す。ソースが見つからない場合、ログソースごとに専用のしきい値を保存し、例外によってレポートする。基準値または正常な頻度を作成し、標準偏差の倍数に基づいて報告する。from pyspark.sql.functions import current_timestamp df3 = (df.groupBy("source" 、 "sourcetype","threshold").agg(max("_ingest_time").alias("last_seen")、 current_timestamp().alias("t_now")) ) display(df3)上のコマンドでは、2つの新しいカラムlast_seenとt_now を作成し、ソースとソースタイプで集約して、各ログソースで受信した最新のイベントを与えている。from pyspark.sql.types import IntegerType df4 = (df3.withColumn('missing_minutes'、 ((col('t_now').cast('long')) - col('last_seen').cast('long'))/ 60).cast(IntegerType()) .where(col("missing_minutes"). > 240) ) display(df4) あるいは、ソース・カラムとソースタイプ・カラムを連結し、単純なリストをレポートすることもできる;from pyspark.sql.types import IntegerType from pyspark.sql.functions import concat_ws df4 = (df3.withColumn('missing_minutes'、 ((col('t_now').cast('long')) - col('last_seen').cast('long'))/ 60).cast(IntegerType()) .where(col("missing_minutes"). > 240) .select(concat_ws( : 、 """source","sourcetype").alias("missing_log_source"))) display(df4)Databricks SQL(DB SQL)を使用している場合は、ダッシュボードを作成して頻繁に確認し、ログソースの欠落や遅延に対してアラートを出すことをお勧めします。 もう一つの可能性は、Databricksのワークフロー機能を使ってノートブックの実行をスケジュールし、実行結果をメールで送信することです。ヒントとベストプラクティスこのブログでは、ログ・ソースの遅延や欠落を特定し、レポートするためのいくつかのオプションについて説明します。 これをより効果的にする方法は他にもあり、興味のある読者に委ねられている。 しかし、最初に考えたことはいくつかある:イベントをタイムウィンドウにバケットし、ローリング平均遅延を計算し、ログソースごとの通常の遅延を提供する。ログ・ソースごとに「欠落」プロパティを格納し、欠落値を超えるソースのみをレポートする。ダッシュボードを追加し、ドロップダウンで特定のソースを選択することで、すべてのログソースを可視化する。まとめデータの取り込みの遅れを認識することは、セキュリティと保証機能の多くの部分にとって重要であり、したがって、監視し、迅速に解決しなければならない。 適切な管理体制が整備されていないと、組織には死角が生じ、コンプライアンス上の義務を果たせない可能性がある。お問い合わせDatabricksのサイバー・ソリューションがどのようにあなたの組織にサイバー脅威の特定と軽減の力を与えることができるかについて、さらにご興味がおありでしたら、cybersecurity@databricks.comにご連絡いただき、Lakehouse for Cybersecurity Applicationsのウェブページをご覧ください。