Engineering blog

Structured Streaming: A Year in Review

Share this post

As we enter 2022, we want to take a moment to reflect on the great strides made on the streaming front in Databricks and Apache Spark™ ! In 2021, the engineering team and open source contributors made a number of advancements with three goals in mind:

  1. Lower latency and improve stateful stream processing
  2. Improve observability of Databricks and Spark Structured Streaming workloads
  3. Improve resource allocation and scalability

Ultimately, the motivation behind these goals was to enable more teams to run streaming workloads on Databricks and Spark, make it easier for customers to operate mission critical production streaming applications on Databricks and simultaneously optimizing for cost effectiveness and resource usage.

Goal # 1: Lower latency & improved stateful processing

There are two new key features that specifically target lowering latencies with stateful operations, as well as improvements to the stateful APIs. The first is asynchronous checkpointing for large stateful operations, which improves upon a historically synchronous and higher latency design.

Asynchronous Checkpointing

In this model, state updates are written to a cloud storage checkpoint location before the next microbatch begins. The advantage is that if a stateful streaming query fails, we can easily restart the query by using the information from the last successfully completed batch. In the asynchronous model, the next microbatch does not have to wait for state updates to be written, improving the end-to-end latency of the overall microbatch execution.

You can learn more about this feature in an upcoming deep-dive blog post, and try it in Databricks Runtime 10.3 and above.

Arbitrary stateful operator improvements

In a much earlier post, we introduced Arbitrary Stateful Processing in Structured Streaming with [flat]MapGroupsWithState. These operators provide a lot of flexibility and enable more advanced stateful operations beyond aggregations. We’ve introduced improvements to these operators that:

  • Allow initial state, avoiding the need to reprocess all your streaming data.
  • Enable easier logic testing by exposing a new TestGroupState interface, allowing users to create instances of GroupState and access internal values for what has been set, simplifying unit tests for the state transition functions.

Allow Initial State

Let’s start with the following flatMapGroupswithState operator:


def flatMapGroupsWithState[S: Encoder, U: Encoder](
    outputMode: OutputMode,
    timeoutConf: GroupStateTimeout,
    initialState: KeyValueGroupedDataset[K, S])(
    func: (K, Iterator[V], GroupState[S]) => Iterator[U])

This custom state function maintains a running count of fruit that have been encountered.


val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
  val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
  state.update(new RunningCount(count))
  Iterator((key, count.toString))
}

In this example, we specify the initial state to the this operator by setting starting values for certain fruit:


val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
  ("apple", new RunningCount(1)),
  ("orange", new RunningCount(2)),
  ("mango", new RunningCount(5)),
).toDS()

val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)

fruitStream
  .groupByKey(x => x)
  .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)

Easier Logic Testing

You can also now test state updates using the TestGroupState API.


import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional

test("flatMapGroupsWithState's state update function") {
  var prevState = TestGroupState.create[UserStatus](
    optionalState = Optional.empty[UserStatus],
    timeoutConf = GroupStateTimeout.EventTimeTimeout,
    batchProcessingTimeMs = 1L,
    eventTimeWatermarkMs = Optional.of(1L),
    hasTimedOut = false)

  val userId: String = ...
  val actions: Iterator[UserAction] = ...

  assert(!prevState.hasUpdated)

  updateState(userId, actions, prevState)

  assert(prevState.hasUpdated)
  
}

You can find these, and more examples in the Databricks documentation.

Native support for Session Windows

Structured Streaming introduced the ability to do aggregations over event-time based windows using tumbling or sliding windows, both of which are windows of fixed-length. In Spark 3.2, we introduced the concept of session windows, which allow dynamic window lengths. This historically required custom state operators using flatMapGroupsWithState.

An example of using dynamic gaps:


# Define the session window having dynamic gap duration based on eventType
session_window expr = session_window(events.timestamp, \
    when(events.eventType == "type1", "5 seconds") \
    .when(events.eventType == "type2", "20 seconds") \
    .otherwise("5 minutes"))

# Group the data by session window and userId, and compute the count of each group
windowedCountsDF = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(events.userID, session_window_expr) \
    .count()

Goal #2: Improve observability of streaming workloads

While the StreamingQueryListener API allows you to asynchronously monitor queries within a SparkSession and define custom callback functions for query state, progress, and terminated events, understanding back pressure and reasoning about where the bottlenecks are in a microbatch were still challenging. As of Databricks Runtime 8.1, the StreamingQueryProgress object reports data source specific back pressure metrics for Kafka, Kinesis, Delta Lake and Auto Loader streaming sources.

An example of the metrics provided for Kafka:


{
  "sources" : [ {
    "description" : "KafkaV2[Subscribe[topic]]",
    "metrics" : {
      "avgOffsetsBehindLatest" : "4.0",
      "maxOffsetsBehindLatest" : "4",
      "minOffsetsBehindLatest" : "4",
      "estimatedTotalBytesBehindLatest" : "80.0"
    },
  } ]
}

Databricks Runtime 8.3 introduces real-time metrics to help understand the performance of the RocksDB state store and debug the performance of state operations. These can also help identify target workloads for asynchronous checkpointing.

An example of the new state store metrics:


{
  "id" : "6774075e-8869-454b-ad51-513be86cfd43",
  "runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
  "batchId" : 7,
  "stateOperators" : [ {
    "numRowsTotal" : 20000000,
    "numRowsUpdated" : 20000000,
    "memoryUsedBytes" : 31005397,
    "numRowsDroppedByWatermark" : 0,
    "customMetrics" : {
      "rocksdbBytesCopied" : 141037747,
      "rocksdbCommitCheckpointLatency" : 2,
      "rocksdbCommitCompactLatency" : 22061,
      "rocksdbCommitFileSyncLatencyMs" : 1710,
      "rocksdbCommitFlushLatency" : 19032,
      "rocksdbCommitPauseLatency" : 0,
      "rocksdbCommitWriteBatchLatency" : 56155,
      "rocksdbFilesCopied" : 2,
      "rocksdbFilesReused" : 0,
      "rocksdbGetCount" : 40000000,
      "rocksdbGetLatency" : 21834,
      "rocksdbPutCount" : 1,
      "rocksdbPutLatency" : 56155599000,
      "rocksdbReadBlockCacheHitCount" : 1988,
      "rocksdbReadBlockCacheMissCount" : 40341617,
      "rocksdbSstFileSize" : 141037747,
      "rocksdbTotalBytesReadByCompaction" : 336853375,
      "rocksdbTotalBytesReadByGet" : 680000000,
      "rocksdbTotalBytesReadThroughIterator" : 0,
      "rocksdbTotalBytesWrittenByCompaction" : 141037747,
      "rocksdbTotalBytesWrittenByPut" : 740000012,
      "rocksdbTotalCompactionLatencyMs" : 21949695000,
      "rocksdbWriterStallLatencyMs" : 0,
      "rocksdbZipFileBytesUncompressed" : 7038
    }
  } ],
  "sources" : [ {
  } ],
  "sink" : {
  }
}

Goal # 3: Improve resource allocation and scalability

Streaming Autoscaling with Delta Live Tables (DLT)

At Data + AI Summit last year, we announced Delta Live Tables, which is a framework that allows you to declaratively build and orchestrate data pipelines, and largely abstracts the need to configure clusters and node types. We’re taking this a step further and introducing an intelligent autoscaling solution for streaming pipelines that improves upon the existing Databricks Optimized Autoscaling. These benefits include:

  • Better Cluster Utilization:
  • The new algorithm takes advantage of the new back pressure metrics to adjust cluster sizes to better handle scenarios in which there are fluctuations in streaming workloads, which ultimately leads to better cluster utilization.

  • Proactive Graceful Worker Shutdown:
  • While the existing autoscaling solution retires nodes only if they are idle, the new DLT Autoscaler will proactively shut down selected nodes when utilization is low, while simultaneously guaranteeing that there will be no failed tasks due to the shutdown.

As of writing, this feature is currently in Private Preview. Please reach out to your account team for more information.

Trigger.AvailableNow

In Structured Streaming, triggers allow a user to define the timing of a streaming query’s data processing. These trigger types can be micro-batch (default), fixed interval micro-batch (Trigger.ProcessingTime(“”), one-time micro-batch (Trigger.Once), and continuous (Trigger.Continuous).

Databricks Runtime 10.1 introduces a new type of trigger; Trigger.AvailableNow that is similar to Trigger.Once but provides better scalability. Like Trigger Once, all available data will be processed before the query is stopped, but in multiple batches instead of one. This is supported for Delta Lake and Auto Loader streaming sources.

Example:


spark.readStream
  .format("delta")
  .option("maxFilesPerTrigger", "1")
  .load(inputDir)
  .writeStream
  .trigger(Trigger.AvailableNow)
  .option("checkpointLocation", checkpointDir)
  .start()

Summary

As we head into 2022, we will continue to accelerate innovation in Structured Streaming, further improving performance, decreasing latency and implementing new and exciting features. Stay tuned for more information throughout the year!

Try Databricks for free

Related posts

Engineering blog

Native Support of Session Window in Spark Structured Streaming

Apache Spark™ Structured Streaming allowed users to do aggregations on windows over event-time. Before Apache Spark 3.2™, Spark supported tumbling windows and sliding win...
Engineering blog

What’s New in Apache Spark™ 3.1 Release for Structured Streaming

Along with providing the ability for streaming processing based on Spark Core and SQL API, Structured Streaming is one of the most important...
Company blog

Infrastructure Design for Real-time Machine Learning Inference

September 1, 2021 by Yu Chen in Company Blog
This is a guest authored post by Yu Chen, Senior Software Engineer, Headspace. Headspace’s core products are iOS, Android and web-based apps that f...
See all Data Engineering posts