Stateful processing in Apache Spark™ Structured Streaming has evolved significantly to meet the growing demands of complex streaming applications. Initially, the applyInPandasWithState
API allowed developers to perform arbitrary stateful operations on streaming data. However, as the complexity and sophistication of streaming applications increased, the need for a more flexible and feature-rich API became apparent. To address these needs, the Spark community introduced the vastly improved transformWithStateInPandas
API, available in Apache Spark™ 4.0, which can now fully replace the existing applyInPandasWithState
operator. transformWithStateInPandas
provides far greater functionality such as flexible data modeling and composite types for defining state, timers, TTL on state, operator chaining, and schema evolution.
In this blog, we will focus on Python to compare transformWithStateInPandas
with the older applyInPandasWithState
API and use coding examples to show how transformWithStateInPandas
can express everything applyInPandasWithState
can and more.
By the end of this blog, you will understand the advantages of using transformWithStateInPandas
over applyInPandasWithState
, how an applyInPandasWithState
pipeline can be rewritten as a transformWithStateInPandas
pipeline, and how transformWithStateInPandas
can simplify the development of stateful streaming applications in Apache Spark™.
applyInPandasWithState
is a powerful API in Apache Spark™ Structured Streaming that allows for arbitrary stateful operations on streaming data. This API is particularly useful for applications that require custom state management logic. applyInPandasWithState
enables users to manipulate streaming data grouped by a key and apply stateful operations on each group.
Most of the business logic takes place in the func, which has the following type signature.
For example, the following function does a running count of the number of values for each key. It’s worth noting that this function breaks the single responsibility principle: it’s responsible for handling when new data arrives, as well as when the state has timed out.
A full example implementation is as follows:
transformWithStateInPandas
is a new custom stateful processing operator introduced in Apache Spark™ 4.0. Compared to applyInPandasWithState
, you’ll notice that its API is more object-oriented, flexible, and feature-rich. Its operations are defined using an object that extends StatefulProcessor
, as opposed to a function with a type signature. transformWithStateInPandas
guides you by giving you a more concrete definition of what needs to be implemented, thereby making the code much easier to reason about.
The class has five key methods:
init
: This is the setup method where you initialize the variables etc. for your transformation.handleInitialState
: This optional step lets you prepopulate your pipeline with initial state data.handleInputRows
: This is the core processing stage, where you process incoming rows of data.handleExpiredTimers
: This stage lets you to manage timers that have expired. This is crucial for stateful operations that need to track time-based events.close
: This stage lets you perform any necessary cleanup tasks before the transformation ends.With this class, an equivalent fruit-counting operator is shown below.
And it can be implemented in a streaming pipeline as follows:
applyInPandasWithState
and transformWithStateInPandas
differ in terms of state handling capabilities and flexibility. applyInPandasWithState
supports only a single state variable, which is managed as a GroupState. This allows for simple state management but limits the state to a single-valued data structure and type. By contrast, transformWithStateInPandas
is more versatile, allowing for multiple state variables of different types. In addition to transformWithStateInPandas's ValueState
type (analogous to applyInPandasWithState’s GroupState
), it supports ListState
and MapState
, offering greater flexibility and enabling more complex stateful operations. These additional state types in transformWithStateInPandas
also bring performance benefits: ListState
and MapState
allow for partial updates without requiring the entire state structure to be serialized and deserialized on every read and write operation. This can significantly improve efficiency, especially with large or complex states.
applyInPandasWithState |
transformWithStateInPandas |
|
---|---|---|
Number of state objects | 1 | many |
Types of state objects | GroupState (Similar to ValueState ) |
ValueState ListState MapState |
For the sake of comparison, we will only compare applyInPandasWithState’s GroupState
to transformWithStateInPandas's ValueState
, as ListState
and MapState
have no equivalents. The biggest difference when working with state is that with applyInPandasWithState
, the state is passed into a function; whereas with transformWithStateInPandas
, each state variable needs to be declared on the class and instantiated in an init
function. This makes creating/setting up the state more verbose, but also more configurable. The other CRUD
operations when working with state remain largely unchanged.
GroupState (applyInPandasWithState) |
ValueState (transformWithStateInPandas) |
|
---|---|---|
create | Creating state is implied. State is passed into the function via the state variable . |
self._state is an instance variable on the class. It needs to be declared and instantiated. |
def func( key: _, pdf_iter: _, state: GroupState ) -> Iterator[pandas.DataFrame] |
class MySP(StatefulProcessor): def init(self, handle: StatefulProcessorHandle) -> None: self._state = handle.getValueState("state", schema) |
|
read |
state.get # or raise PySparkValueError state.getOption # or return None |
self._state.get() # or return None |
update |
state.update(v) |
self._state.update(v) |
delete |
state.remove() |
self._state.clear() |
exists |
state.exists |
self._state.exists() |
Let’s dig a little into some of the features this new API makes possible. It’s now possible to
applyInPandasWithState |
transformWithStateInPandas |
|
---|---|---|
Work with multiple state objects | Not Possible |
class MySP(StatefulProcessor): def init(self, handle: StatefulProcessorHandle) -> None: self._state1 = handle.getValueState("state1", schema1) self._state2 = handle.getValueState("state2", schema2) |
Create state objects with a TTL | Not Possible |
class MySP(StatefulProcessor): def init(self, handle: StatefulProcessorHandle) -> None: self._state = handle.getValueState( state_name="state", schema="c LONG", ttl_duration_ms=30 * 60 * 1000 # 30 min ) |
Debugging a stateful operation used to be challenging because it was difficult to inspect a query’s internal state. Both applyInPandasWithState
and transformWithStateInPandas
make this easy by seamlessly integrating with the state data source reader. This powerful feature makes troubleshooting much simpler by allowing users to query specific state variables, along with a range of other supported options.
Below is an example of how each state type is displayed when queried. Note that every column, except for partition_id
, is of type STRUCT
. For applyInPandasWithState
the entire state is lumped together as a single row. So it’s up to the user to pull the variables apart and explode in order to get a nice breakdown. transformWithStateInPandas
gives a nicer breakdown of each state variable, and each element is already exploded into its own row for easy data exploration.
Operator | State Class | Read statestore |
---|---|---|
applyInPandasWithState |
GroupState |
display( spark.read.format("statestore") .load("/Volumes/foo/bar/baz") ) ![]() |
transformWithStateInPandas |
ValueState |
display( spark.read.format("statestore") .option("stateVarName", "valueState") .load("/Volumes/foo/bar/baz") ) ![]() |
ListState |
display( spark.read.format("statestore") .option("stateVarName", "listState") .load("/Volumes/foo/bar/baz") ) ![]() |
|
MapState |
display( spark.read.format("statestore") .option("stateVarName", "mapState") .load("/Volumes/foo/bar/baz") ) ![]() |
applyInPandasWithState
doesn’t provide a way of seeding the pipeline with an initial state. This made pipeline migrations extremely difficult because the new pipeline couldn’t be backfilled. On the other hand, transformWithStateInPandas
has a method that makes this easy. The handleInitialState
class function lets users customize the initial state setup and more. For example, the user can use handleInitialState
to configure timers as well.
applyInPandasWithState |
transformWithStateInPandas |
|
---|---|---|
Passing in the initial state | Not possible |
.transformWithStateInPandas( MySP(), "fruit STRING, count LONG", "append", "processingtime", grouped_df ) |
Customizing initial state | Not possible |
class MySP(StatefulProcessor): def init(self, handle: StatefulProcessorHandle) -> None: self._state = handle.getValueState("countState", "count LONG") self.handle = handle def handleInitialState( self, key: Tuple[str], initialState: pd.DataFrame, timerValues: TimerValues ) -> None: self._state.update((initialState.at[0, "count"],)) self.handle.registerTimer( timerValues.getCurrentProcessingTimeInMs() + 10000 ) |
So if you’re interested in migrating your applyInPandasWithState
pipeline to use transformWithStateInPandas
, you can easily do so by using the state reader to migrate the internal state of the old pipeline into the new one.
Schema evolution is a crucial aspect of managing streaming data pipelines, as it allows for the modification of data structures without interrupting data processing.
With applyInPandasWithState
, once a query is started, changes to the state schema are not permitted. applyInPandasWithState
verifies schema compatibility by checking for equality between the stored schema and the active schema. If a user tries to alter the schema, an exception is thrown, resulting in the query's failure. Consequently, any changes must be managed manually by the user.
Customers usually resort to one of two workarounds: either they start the query from a new checkpoint directory and reprocess the state, or they wrap the state schema using formats like JSON or Avro and manage the schema explicitly. Neither of these approaches is particularly favored in practice.
On the other hand, transformWithStateInPandas
provides more robust support for schema evolution. Users simply need to update their pipelines, and as long as the schema change is compatible, Apache Spark™ will automatically detect and migrate the data to the new schema. Queries can continue to run from the same checkpoint directory, eliminating the need to rebuild the state and reprocess all the data from scratch. The API allows for defining new state variables, removing old ones, and updating existing ones with only a code change.
In summary, transformWithStateInPandas's
support for schema evolution significantly simplifies the maintenance of long-running streaming pipelines.
Schema change | applyInPandasWithState |
transformWithStateInPandas |
---|---|---|
Add columns (including nested columns) | Not Supported | Supported |
Remove columns (including nested columns) | Not Supported | Supported |
Reorder columns | Not Supported | Supported |
Type widening (eg. Int → Long) | Not Supported | Supported |
applyInPandasWithState
has a single function that is triggered when either new data arrives, or a timer fires. It’s the user’s responsibility to determine the reason for the function call. The way to determine that new streaming data arrived is by checking that the state has not timed out. Therefore, it's a best practice to include a separate code branch to handle timeouts, or there is a risk that your code will not work correctly with timeouts.
In contrast, transformWithStateInPandas
uses different functions for different events:
handleInputRows
is called when new streaming data arrives, andhandleExpiredTimer
is called when a timer goes off.As a result, no additional checks are necessary; the API manages this for you.
applyInPandasWithState |
transformWithStateInPandas |
|
---|---|---|
Work with new data |
def func(key, rows, state): if not state.hasTimedOut: ... |
class MySP(StatefulProcessor): def handleInputRows(self, key, rows, timerValues): ... |
transformWithStateInPandas
introduces the concept of timers, which are much easier to configure and reason about than applyInPandasWithState’s
timeouts.
Timeouts only trigger if no new data arrives by a certain time. Additionally, each applyInPandasWithState
key can only have one timeout, and the timeout is automatically deleted every time the function is executed.
In contrast, timers trigger at a certain time without exception. You can have multiple timers for each transformWithStateInPandas
key, and they only automatically delete when the designated time is reached.
Timeouts (applyInPandasWithState ) |
Timers (transformWithStateInPandas ) |
|
---|---|---|
Number per key | 1 | Many |
Trigger event | If no new data arrives by time x | At time x |
Delete event | On every function call | At time x |
These differences might seem subtle, but they make working with time so much simpler. For example, say you wanted to trigger an action at 9 AM and again at 5 PM. With applyInPandasWithState
, you would need to create the 9 AM timeout first, save the 5 PM one to state for later, and reset the timeout every time new data arrives. With transformWithState, this is easy: register two timers, and it’s done.
In applyInPandasWithState
, state and timeouts are unified in the GroupState
class, meaning that the two are not treated separately. To determine whether a function invocation is because of a timeout expiring or new input, the user needs to explicitly call the state.hasTimedOut
method, and implement if/else logic accordingly.
With transformWithState
, these gymnastics are no longer necessary. Timers are decoupled from the state and treated as distinct from each other. When a timer expires, the system triggers a separate method, handleExpiredTimer
, dedicated solely to handling timer events. This removes the need to check if state.hasTimedOut
or not - the system does it for you.
applyInPandasWithState |
transformWithStateInPandas |
|
---|---|---|
Did a timer go off? |
def func(key, rows, state): if state.hasTimedOut: # yes ... else: # no ... |
class MySP(StatefulProcessor): def handleExpiredTimer(self, key, expiredTimerInfo, timerValues): when = expiredTimerInfo.getExpiryTimeInMs() ... |
A peculiarity in the applyInPandasWithState
API is the existence of distinct methods for setting timeouts based on processing time and event time. When using GroupStateTimeout.ProcessingTimeTimeout
, the user sets a timeout with setTimeoutDuration
. In contrast, for EventTimeTimeout
, the user calls setTimeoutTimestamp
instead. When one method works, the other throws an error, and vice versa. Additionally, for both event time and processing time, the only way to delete a timeout is to also delete its state.
In contrast, transformWithStateInPandas
offers a more straightforward approach to timer operations. Its API is consistent for both event time and processing time; and provides methods to create (registerTimer
), read (listTimers
), and delete (deleteTimer
) a timer. With transformWithStateInPandas
, it’s possible to create multiple timers for the same key, which greatly simplifies the code needed to emit data at various points in time.
applyInPandasWithState |
transformWithStateInPandas |
|
---|---|---|
Create one |
state.setTimeoutTimestamp(tsMilli) |
self.handle.registerTimer(tsMilli) |
Create many | Not possible |
self.handle.registerTimer(tsMilli_1) self.handle.registerTimer(tsMilli_2) |
read |
state.oldTimeoutTimestamp |
self.handle.listTimers() |
update |
state.setTimeoutTimestamp(tsMilli) # for EventTime state.setTimeoutDuration(durationMilli) # for ProcessingTime |
self.handle.deleteTimer(oldTsMilli) self.handle.registerTimer(newTsMilli) |
delete |
state.remove() # but this deletes the timeout and the state |
self.handle.deleteTimer(oldTsMilli) |
Chaining stateful operators in a single pipeline has traditionally posed challenges. The applyInPandasWithState
operator does not allow users to specify which output column is associated with the watermark. As a result, stateful operators can’t be placed after an applyInPandasWithState
operator. Consequently, users have had to split their stateful computations across multiple pipelines, requiring Kafka or other storage layers as intermediaries. This increases both cost and latency.
In contrast, transformWithStateInPandas
can safely be chained with other stateful operators. Users simply need to specify the event time column when adding it to the pipeline, as illustrated below:
This approach lets the watermark information pass through to downstream operators, enabling late record filtering and state eviction without having to set up a new pipeline and intermediate storage.
The new transformWithStateInPandas
operator in Apache Spark™ Structured Streaming offers significant advantages over the older applyInPandasWithState
operator. It provides greater flexibility, enhanced state management capabilities, and a more user-friendly API. With features such as multiple state objects, state inspection, and customizable timers, transformWithStateInPandas
simplifies the development of complex stateful streaming applications.
While applyInPandasWithState
may still be familiar to experienced users, transformWithState's
improved functionality and versatility make it the better choice for modern streaming workloads. By adopting transformWithStateInPandas
, developers can create more efficient and maintainable streaming pipelines. Try it out for yourself in Apache Spark™ 4.0, and Databricks Runtime 16.2 and above.
Feature | applyInPandasWithState (State v1) | transformWithStateInPandas (State v2) |
---|---|---|
Supported Languages | Scala, Java, and Python | Scala, Java, and Python |
Processing Model | Function-based | Object-oriented |
Input Processing | Processes input rows per grouping key | Processes input rows per grouping key |
Output Processing | Can generate output optionally | Can generate output optionally |
Supported Time Modes | Processing Time & Event Time | Processing Time & Event Time |
Fine-Grained State Modeling | Not supported (only single state object is passed) | Supported (users can create any state variables as needed) |
Composite Types | Not supported | Supported (currently supports Value, List and Map types) |
Timers | Not supported | Supported |
State Cleanup | Manual | Automated with support for state TTL |
State Initialization | Partial Support (only available in Scala) | Supported in all languages |
Chaining Operators in Event Time Mode | Not Supported | Supported |
State Data Source Reader Support | Supported | Supported |
State Model Evolution | Not Supported | Supported |
State Schema Evolution | Not Supported | Supported |