Define more performant UDFs with ease.
by Ruifeng Zheng and Yicong Huang
Python User-Defined Functions (UDFs) are an essential extensibility mechanism but have traditionally suffered from high overhead due to row-based execution. In Apache Spark™, Pandas UDFs addressed part of this problem by introducing Arrow-based serialization and batch processing, significantly improving throughput compared to scalar Python UDFs.
However, Pandas UDFs still have fundamental limitations:
By dropping the Pandas/Arrow data conversion, the Arrow UDFs execute faster than Pandas UDFs, consume less memory, and provide better datatype support.
We’re thrilled to introduce Native Arrow UDFs starting with Databricks Runtime 18.0 (release notes), an exciting leap forward for performant UDF execution.
Native Arrow UDFs operate directly on Arrow data without converting inputs into Pandas or NumPy objects. This preserves the columnar layout end-to-end, avoids unnecessary data copies, and lets UDFs use vectorized processing by leveraging Arrow’s native compute and memory model.
To define an Arrow UDF, users are able to use a new python decorator @arrow_udf, with specified return type and optional evaluation type. For instance:
Users can also define it with existing decorator @udf with complete type hints. For instance:
Note: The function definition should include type hints for all of the arguments and the return value.
This design aligns with the interfaces of scalar Python UDFs, providing a consistent and intuitive experience for users already familiar with scalar Python UDFs.
The following demonstrates how to use the Arrow UDF:
Python Usage:
SQL Usage:
We provide support for variants of Arrow UDF interfaces. Including Scalar Functions, Aggregate Functions and Table Functions. In the data frame API we also provide mapInArrow and applyInArrow to use Arrow UDFs. We will next introduce them one by one.
Arrow Scalar Functions perform row-wise transformations. They are the Arrow equivalent of scalar Pandas UDFs and can be used anywhere a column expression is expected, such as df.select() or df.withColumn(). Three input modes are supported: direct, iterator, and iterator of multiple arrays. The iterator variants are useful when the UDF requires expensive one-time initialization (e.g., loading a model or compiling a regex pattern), as the setup cost is amortized across all batches. In all cases, the output row count must match the input row count.
pyarrow.Array and returning one pyarrow.Array. The input and output array must have the same number of values.pyarrow.Array and returning an iterator of pyarrow.Array. This type is useful when the UDF execution requires expensive initialization. pyarrow.Array and returning an iterator of pyarrow.Array.Arrow Aggregate Functions take one or more pyarrow.Array inputs and return a scalar value, reducing a group of rows into a single result. They are the Arrow equivalent of grouped aggregate Pandas UDFs and are used with groupBy().agg() or Window operations. Similar to scalar functions, aggregate functions also support three input modes.
Arrays to Scalar: receiving pyarrow.Array and returning a scalar value.
pyarrow.Array and returning a scalar value. This is useful for processing large volumes of data in aggregation-style operations.Iterator of Multiple Arrays to Scalar: receiving an iterator of a tuple of multiple pyarrow.Array and returning a scalar value. More complex aggregations can be defined.
Arrow Table Functions, also known as Arrow UDTFs (User-Defined Table Functions), accept a pyarrow.RecordBatch or multiple pa.Array as input and produce a pyarrow.Table as output. This represents the predominant pattern for table-in, table-out transformations implemented in Python utilizing columnar execution. Arrow UDTFs possess the capability to:
Consequently, they are optimally suited for operations such as filtering, row expansion, data restructuring, and the generation of derived columns.
The arrow_udtf interface is designed for simplicity, employing a decorator syntax where you define the return type using a DDL-formatted string. In this setup, the eval method takes PyArrow objects as input and is expected to yield PyArrow Tables or RecordBatches. The interface accommodates two input modes. When processing table arguments, the eval method is provided with a pa.RecordBatch object that encapsulates all columns from the input table:
For scalar arguments, the method receives pa.Array objects, one for each scalar input:
Here is another example:
This UDTF can work in two distinct ways:
Python Usage:
SQL Usage:
In addition to User-Defined Functions (UDFs) and User-Defined Table Functions (UDTFs), PySpark furnishes Arrow Function APIs that facilitate the direct application of Python native functions to Arrow data at the DataFrame level. These APIs operate analogously to their Pandas counterparts (mapInPandas, applyInPandas) but utilize pyarrow.RecordBatch and pyarrow.Table instead of Pandas DataFrames, thereby circumventing the conversion overhead between Pandas and Arrow formats.
DataFrame.mapInArrow transforms an iterator of pyarrow.RecordBatch into another iterator of pyarrow.RecordBatch, enabling row-level operations such as filtering, transformation, or expansion.groupBy().applyInArrow() applies a specified function to each group, accepting and returning a pyarrow.Table. This functionality proves beneficial for per-group transformations, such as data normalization.cogroup().applyInArrow() permits the cogrouping of two DataFrames based on a shared key, subsequently applying a function to each cogroup. The function receives two pyarrow.Table inputs and is expected to return a single pyarrow.Table.By removing the expensive Pandas/Arrow data conversion, Arrow UDFs generally execute faster than Pandas UDFs, with less memory usage. Let’s compare the two simple UDFs:
The Arrow UDF is ~10% faster than the Pandas UDF, and the memory profiler shows that ~40% memory is saved in the execution.
Databricks Runtime 18.0 introduces Native Arrow UDFs, offering a faster, leaner alternative to Pandas UDFs for performant Python UDF execution in PySpark. By operating directly on Arrow data and eliminating the Pandas/Arrow conversion overhead, Arrow UDFs deliver ~10% faster execution, ~40% less memory usage, and better support for complex datatypes -- all with a familiar, intuitive decorator syntax.
Ready to explore more? Try out Native Arrow UDFs today on Databricks as part of Databricks Runtime 18.0. To get started, simply replace your existing Pandas UDFs with Arrow UDFs. In most cases, it only takes a few lines of change to unlock immediate performance gains. See the Arrow UDF documentation and Arrow UDTF documentation for the full API reference and additional examples.
Subscribe to our blog and get the latest posts delivered to your inbox.