Table-Valued Functions (TVFs) have long been a powerful tool for processing structured data. They allow functions to return multiple rows and columns instead of just a single value. Previously, using TVFs in Apache Spark™ required SQL, making them less flexible for users who prefer the DataFrame API.
We are pleased to announce the new DataFrame API for Table-Valued Functions. Users can now invoke TVFs directly within DataFrame operations, making transformations simpler, more composable, and fully integrated with Spark’s DataFrame workflow. This is available in Databricks Runtime (DBR) 16.1 and above.
In this blog, we’ll explore what TVFs are and how to use them, both with scalar and table arguments. Consider the three benefits in using TVTs:
Key Benefits
spark.tvf.<function_name>,
without needing SQL..filter(), .select(),
and more.We'll start with a simple example using a built-in TVF. Spark comes with handy TVFs like variant_explode
, which expands JSON structures into multiple rows.
Here is the SQL approach:
And here is the equivalent DataFrame API approach:
As you can see above, it’s straightforward to use TVFs either way: through SQL or the DataFrame API. Both give you the same result, using scalar arguments.
What if you want to use a table as an input argument? This is useful when you want to operate on rows of data. Let's look at an example where we want to compute the duration and costs of travel by car and air.
Let’s imagine a simple DataFrame:
We need our class to handle a table row as an argument. Note that the eval
method takes a Row
argument from a table instead of a scalar argument.
With this definition of handling a Row
from a table, we can compute the desired result by sending our DataFrame as a table argument.
Or you can create a table, register the UDTF, and use it in a SQL statement as follows:
Alternatively, you can achieve the same result by calling the TVF with a lateral join, which is useful with scalar arguments (read below for an example).
You can also use lateral joins to call a TVF with an entire DataFrame, row by row. Both Lateral join and Table Arguments support is available in the DBR 17.0.
Each lateral join lets you call a TVF over each row of a DataFrame, dynamically expanding the data based on the values in that row. Let’s explore a couple of examples with more than a single row.
Let's say we have a DataFrame where each row contains an array of numbers. As before, we can use variant_explode
to explode each array into individual rows.
Here is the SQL approach:
And here is the equivalent DataFrame approach:
Sometimes, the built-in TVFs just aren't enough. You may need custom logic to transform your data in a specific way. That's where User-Defined Table Functions (UDTFs) come to the rescue! Python UDTFs allow you to write your own TVFs in Python, giving you complete control over the row expansion process.
Here's a simple Python UDTF that generates a sequence of numbers from a starting value to an ending value, and returns both the number and its square:
Now, let's use this UDTF in a lateral join. Imagine we have a DataFrame with start and end columns, and we want to generate the number sequences for each row.
Here is another illustrative example of how to use a UDTF using a lateralJoin
[See documentation] with a DataFrame with cities and distance between them. We want to augment and generate a newer table with additional information such as time to travel between them by car and air, along with additional costs in airfare.
Let’s use our airline distances DataFrame from above:
We can modify our previous Python UDTF from above that computes the duration and cost of travel between two cities by making the eval
method accept scalar arguments:
Finally, let’s call our UDTF with a lateralJoin
, giving us the desired output. Unlike our previous airline example, this UDTF’s eval
method accepts scalar arguments.
The DataFrame API for Table-Valued Functions provides a more cohesive and intuitive approach to data transformation within Spark. We demonstrated three approaches to employ TVFs: SQL, DataFrame, and Python UDTF. By combining TVFs with the DataFrame API, you can process multiple rows of data and achieve bulk transformations.
Furthermore, by passing table arguments or using lateral joins to Python UDTFs, you can implement specific business logic for specific data processing needs. We showed two specific examples of transforming and augmenting your business logic to produce the desired output, using both scalar and table arguments.
We encourage you to explore the capabilities of this new API to optimize your data transformations and workflows. This new functionality is available in the Apache Spark™ 4.0.0 release. If you are a Databricks customer, you can use it in DBR 16.1 and above.