Streaming in Production: Collected Best Practices
December 12, 2022 in Platform Blog
Releasing any data pipeline or application into a production state requires planning, testing, monitoring, and maintenance. Streaming pipelines are no different in this regard; in this blog we present some of the most important considerations for deploying streaming pipelines and applications to a production environment.
At Databricks, we offer two different ways of building and running streaming pipelines and applications - Delta Live Tables (DLT) and Databricks Workflows. DLT is our flagship, fully managed ETL product that supports both batch and streaming pipelines. It offers declarative development, automated operations, data quality, advanced observability capabilities, and more. Workflows enable customers to run Apache Spark(™) workloads in Databricks' optimized runtime environment (i.e. Photon) with access to unified governance (Unity Catalog) and storage (Delta Lake). Regarding streaming workloads, both DLT and Workflows share the same core streaming engine - Spark Structured Streaming. In the case of DLT, customers program against the DLT API and DLT uses the Structured Streaming engine under the hood. In the case of Jobs, customers program against the Spark API directly.
The recommendations in this blog post are written from the Structured Streaming engine perspective, most of which apply to both DLT and Workflows (although DLT does take care of some of these automatically, like Triggers and Checkpoints). We group the recommendations under the headings "Before Deployment" and "After Deployment" to highlight when these concepts will need to be applied and are releasing this blog series with this split between the two. There will be additional deep-dive content for some of the sections beyond as well. We recommend reading all sections before beginning work to productionalize a streaming pipeline or application, and revisiting these recommendations as you promote it from dev to QA and eventually production.
There are many things you need to consider when creating your streaming application to improve the production experience. Some of these topics, like unit testing, checkpoints, triggers, and state management, will determine how your streaming application performs. Others, like naming conventions and how many streams to run on which clusters, have more to do with managing multiple streaming applications in the same environment.
The cost associated with finding and fixing a bug goes up exponentially the farther along you get in the SDLC process, and a Structured Streaming application is no different. When you're turning that prototype into a hardened production pipeline you need a CI/CD process with built-in tests. So how do you create those tests?
At first you might think that unit testing a streaming pipeline requires something special, but that isn't the case. The general guidance for streaming pipelines is no different than guidance you may have heard for Spark batch jobs. It starts by organizing your code so that it can be unit tested effectively:
- Divide your code into testable chunks
- Organize your business logic into functions calling other functions. If you have a lot of logic in a foreachBatch or you've implemented mapGroupsWithState or flatMapGroupsWithState, organize that code into multiple functions that can be individually tested
- Do not code in dependencies on the global state or external systems
- Any function manipulating a DataFrame or Dataset should be organized to take the DataFrame/Dataset/Configuration as input and output the DataFrame/Dataset
Once your code is separated out in a logical manner you can implement unit tests for each of your functions. Spark-agnostic functions can be tested like any other function in that language. For testing UDFs and functions with DataFrames and Datasets, there are multiple Spark testing frameworks available. These frameworks support all of the DataFrame/Dataset APIs so that you can easily create input, and they have specialized assertions that allow you to compare DataFrame content and schemas. Some examples are:
- The built-in Spark test suite, designed to test all parts of Spark
- spark-testing-base, which has support for both Scala and Python
- spark-fast-tests, for testing Scala Spark 2 & 3
- chispa, a Python version of spark-fast-tests
Code examples for each of these libraries can be found here.
But wait! I'm testing a streaming application here - don't I need to make streaming DataFrames for my unit tests? The answer is no; you do not! Even though a streaming DataFrame represents a dataset with no defined ending, when functions are executed on it they are executed on a microbatch - a discrete set of data. You can use the same unit tests that you would use for a batch application, for both stateless and stateful streams. One of the advantages of Structured Streaming over other frameworks is the ability to use the same transformation code for both streaming and with other batch operations for the same sink. This allows you to simplify some operations, like backfilling data, for example, where rather than trying to sync the logic between two different applications, you can just modify the input sources and write to the same destination. If the sink is a Delta table, you can even do these operations concurrently if both processes are append-only operations.
Now that you know your code works, you need to determine how often your stream will look for new data. This is where triggers come in. Setting a trigger is one of the options for the writeStream command, and it looks like this:
// Scala/Java .trigger(Trigger.ProcessingTime("30 seconds")) # Python .trigger(processingTime='30 seconds')
In the above example, if a microbatch completes in less than 30 seconds then the engine will wait for the rest of the time before kicking off the next microbatch. If a microbatch takes longer than 30 seconds to complete then the engine will start the next microbatch immediately after the previous one finishes.
The two factors you should consider when setting your trigger interval are how long you expect your stream to process a microbatch and how often you want the system to check for new data. You can lower the overall processing latency by using a shorter trigger interval and increasing the resources available for the streaming query by adding more workers or using compute or memory optimized instances tailored to your application's performance. These increased resources come with increased costs, so if your goal is to minimize costs then a longer trigger interval with less compute can work. Normally you would not set a trigger interval longer than what it would typically take for your stream to process a microbatch in order to maximize resource utilization, but setting the interval longer would make sense if your stream is running on a shared cluster and you don't want it to constantly take the cluster resources.
If you do not need your stream to run continuously, either because data doesn't come that often or your SLA is 10 minutes or greater, then you can use the Trigger.Once option. This option will start up the stream, check for anything new since the last time it ran, process it all in one big batch, and then shut down. Just like with a continuously running stream when using Trigger.Once, the checkpoint that guarantees fault tolerance (see below) will guarantee exactly-once processing.
Spark has a new version of Trigger.Once called Trigger.AvailableNow. While Trigger.Once will process everything in one big batch, which depending on your data size may not be ideal, Trigger.AvailableNow will split up the data based on maxFilesPerTrigger and maxBytesPerTrigger settings. This allows the data to be processed in multiple batches. Those settings are ignored with Trigger.Once. You can examples for setting triggers here.
Pop quiz - how do you turn your streaming process into a batch process that automatically keeps track of where it left off with just one line of code? Answer - change your processing time trigger to Trigger.Once/Trigger.AvailableNow! Exact same code, running on a schedule, that will neither miss nor re-process any records.
Name your stream
You name your children, you name your pets, now it's time to name your streams. There's a writeStream option called .queryName that allows you to provide a friendly name for your stream. Why bother? Well, suppose you don't name it. In that case, all you'll have to go on in the Structured Streaming tab in the Spark UI is the string <no name> and the unintelligible guid that is automatically generated as the stream's unique identifier. If you have more than one stream running on a cluster, and all of them have <no name> and unintelligible strings as identifiers, how do you find the one you want? If you're exporting metrics how do you tell which is which?
Make it easy on yourself, and name your streams. When you're managing them in production you'll be glad you did, and while you're at it, go and name your batch queries in any foreachBatch() code you have.
How does your stream recover from being shut down? There are a few different cases where this can come into play, like cluster node failures or intentional halts, but the solution is to set up checkpointing. Checkpoints with write-ahead logs provide a degree of protection from your streaming application being interrupted, ensuring it will be able to pick up again where it last left off.
Checkpoints store the current offsets and state values (e.g. aggregate values) for your stream. Checkpoints are stream specific so each should be set to its own location. Doing this will let you recover more gracefully from shutdowns, failures from your application code or unexpected cloud provider failures or limitations.
To configure checkpoints, add the checkpointLocation option to your stream definition:
// Scala/Java/Python streamingDataFrame.writeStream .format("delta") .option("path", "") .queryName("TestStream") .option("checkpointLocation", "") .start()
To keep it simple - every time you call .writeStream, you must specify the checkpoint option with a unique checkpoint location. Even if you're using foreachBatch and the writeStream itself doesn't specify a path or table option, you must still specify that checkpoint. It's how Spark Structured Streaming gives you hassle-free fault tolerance.
Efforts to manage the checkpointing in your stream should be of little concern in general. As Tathagata Das has said, "The simplest way to perform streaming analytics is not having to reason about streaming at all." That said, one setting deserves mention as questions around the maintenance of checkpoint files come up occasionally. Though it is an internal setting that doesn't require direct configuration, the setting spark.sql.streaming.minBatchesToRetain (default 100) controls the number of checkpoint files that get created. Basically, the number of files will be roughly this number times two, as there is a file created noting the offsets at the beginning of the batch (offsets, a.k.a write ahead logs) and another on completing the batch (commits). The number of files is checked periodically for cleanup as part of the internal processes. This simplifies at least one aspect of long-term streaming application maintenance for you.
It is also important to note that some changes to your application code can invalidate the checkpoint. Checking for any of these changes during code reviews before deployment is recommended. You can find examples of changes where this can happen in Recovery Semantics after Changes in a Streaming Query. Suppose you want to look at checkpointing in more detail or consider whether asynchronous checkpointing might improve the latency in your streaming application. In that case, these are covered in greater depth in Speed Up Streaming Queries With Asynchronous State Checkpointing.
State Management and RocksDB
Stateful streaming applications are those where current records may depend on previous events, so Spark has to retain data in-between microbatches. The data it retains is called state, and Spark will store it in a state store and read, update and delete it during each microbatch. Typical stateful operations are streaming aggregations, streaming dropDuplicates, stream-stream joins, mapGroupsWithState, or flatMapGroupsWithState. Some common types of examples where you'll need to think about your application state could be sessionization or hourly aggregation using group by methods to calculate business metrics. Each record in the state store is identified by a key that is used as part of the stateful computation, and the more unique keys that are required the larger the amount of state data that will be stored.
When the amount of state data needed to enable these stateful operations grows large and complex, it can degrade your workloads' performance, leading to increased latency or even failures. A typical indicator of the state store being the culprit of added latency is large amounts of time spent in garbage collection (GC) pauses in the JVM. If you are monitoring the microbatch processing time, this could look like a continual increase or wildly varying processing time across microbatches.
The default configuration for a state store, which is sufficient for most general streaming workloads, is to store the state data in the executors' JVM memory. Large number of keys (typically millions, see the Monitoring & Instrumentation section in part 2 of this blog) can add excessive memory pressure on the machine memory and increase the frequency of hitting these GC pauses as it tries to free up resources.
On the Databricks runtime (now also supported in Apache Spark 3.2+) you can use RocksDB as an alternative state store provider to alleviate this source of memory pressure. RocksDB is an embeddable persistent key-value store for fast storage. It features high performance through a log-structured database engine written entirely in C++ and optimized for fast, low-latency storage.
Leveraging RocksDB as the state store provider still uses machine memory but no longer occupies space in the JVM and makes for a more efficient state management system for large amounts of keys. This doesn't come for free, however, as it introduces an extra step in processing every microbatch. Introducing RocksDB shouldn't be expected to reduce latency except when it is related to memory pressure from state data storage in the JVM. The RocksDB-backed state store still provides the same degree of fault tolerance as the regular state storage as it is included in the stream checkpointing.
RocksDB configuration, like checkpoint configuration, is minimal by design and so you only need to declare it in your overall Spark configuration:
spark.conf.set( "spark.sql.streaming.stateStore.providerClass", "com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
If you are monitoring your stream using the
streamingQueryListener class then you will also notice that
RocksDB metrics will be included in the
stateOperators field. For more detailed information on this see the RocksDB State Store Metrics section of "Structured Streaming in Production."
It's worth noting that large numbers of keys can have other adverse impacts in addition to raising memory consumption, especially with unbounded or non-expiring state keys. With or without RocksDB, the state from the application also gets backed up in checkpoints for fault tolerance. So it makes sense that if you have state files being created so that they will not expire, you will keep accumulating files in the checkpoint, increasing the amount of storage required and potentially the time to write it or recover from failures as well. For the data in memory (see the Monitoring & Instrumentation section in part 2 of this blog) this situation can lead to somewhat vague out-of-memory errors, and for the checkpointed data written to cloud storage you might observe unexpected and unreasonable growth. Unless you have a business need to retain streaming state for all the data that has been processed (and that is rare), read the Spark Structured Streaming documentation and make sure to implement your stateful operations so that the system can drop state records that are no longer needed (pay close attention to dropDuplicates and stream-stream joins).
Running multiple streams on a cluster
Once your streams are fully tested and configured, it's time to figure out how to organize them in production. It's a common pattern to stack multiple streams on the same Spark cluster to maximize resource utilization and save cost. This is fine to a point, but there are limits to how much you can add to one cluster before performance is affected. The driver has to manage all of the streams running on the cluster, and all streams will compete for the same cores across the workers. You need to understand what your streams are doing and plan your capacity appropriately to stack effectively.
Here is what you should take into account when you're planning on stacking multiple streams on the same cluster:
- Make sure your driver is big enough to manage all of your streams. Is your driver struggling with a high CPU utilization and garbage collection? That means it's struggling to manage all of your streams. Either reduce the number of streams or increase the size of your driver.
- Consider the amount of data each stream is processing. The more data you are ingesting and writing to a sink, the more cores you will need in order to maximize your throughput for each stream. You'll need to reduce the number of streams or increase the number of workers depending on how much data is being processed. For sources like Kafka you will need to configure how many cores are being used to ingest with the minPartitions option if you don't have enough cores for all of the partitions across all of your streams.
- Consider the complexity and data volume of your streams. If all of the streams are doing minimal manipulation and just appending to a sink, then each stream will need fewer resources per microbatch and you'll be able to stack more. If the streams are doing stateful processing or computation/memory-intensive operations, that will require more resources for good performance and you'll want to stack fewer streams.
- Consider scheduler pools. When stacking streams they will all be contending for the same workers and cores, and one stream that needs a lot of cores will cause the other streams to wait. Scheduler pools enable you to have different streams execute on different parts of the cluster. This will enable streams to execute in parallel with a subset of the available resources.
- Consider your SLA. If you have mission critical streams, isolate them as a best practice so lower-criticality streams do not affect them.
On Databricks we typically see customers stack between 10-30 streams on a cluster, but this varies depending on the use case. Consider the factors above so that you can have a good experience with performance, cost and maintainability.
Some of the ideas we've addressed here certainly deserve their own time and special treatment with a more in-depth discussion, which you can look forward to in later deep dives. However, we hope these recommendations are useful as you begin your journey or seek to enhance your production streaming experience. Be sure to continue with the next post, "Streaming in Production: Collected Best Practices, Part 2."
Review Databrick's Structured Streaming Getting Started Guide