When I was tasked with improving our predictions of when customers were likely to purchase in a category, I ran into a problem – we had one model that was trying to predict everything from milk and eggs to batteries and tea. I was able to improve our predictions by creating category-specific models, but how could I possibly handle every category we had?
Turns out, PandasUDFs were my One Weird Trick to solving this problem and many others. By using them, I was able to take already-written development code, add a function decorator, and scale my analysis to every category with minimal effort. 10 hour runtimes finished in 30 minutes. You too can use this One Weird Trick to scale from one model to whole ensembles of models.
Topics covered will include:
Paul: Good afternoon, everyone. My name is Paul Anzel and I am a data engineer and former data scientist with H-E-B, which is a major grocery retailer here in Texas. A lot of my work over the past year has been focused on productionalizing other people’s workflows. Taking one-off data science projects and turning them into mature processes. And I’m here to show you something that I’ve started to call my one weird trick for getting some of these workloads to run fast and cleanly. Quick note about how we’re going to go through this. I’m going to provide an introduction, just kind of a motivating example of where I really discovered PandasUDFs and how they became very helpful for me. I’m going to go over a number of examples of use cases. There is a two gigabyte serialization, DC realization limit that you will need to be aware of with this process. And finally, I’m going to be going over some equivalents in R and Koalas, if you’re not as tied to Python and PySpark as I am.
So, introduction. About a year and a half ago, I had a problem. We had built a model where we were trying to predict how people would purchase items and if we could see if a given customer would purchase something in the next four weeks. The model worked, but one thing we found was it didn’t work very well, especially for very long cycle, low velocity items. If you think about it, it makes a little sense. I purchase eggs, I purchase milk very frequently, almost every week. Batteries, I’ll get a few and then I’m good for a while. A frying pan, I might purchase that every year at the most. So expecting one model to really be able to handle all of these different use cases was really expecting too much.
Instead of trying to have one model do everything, why not have one model for each category and build an ensemble of models from this? This really improves my metrics, especially for the low velocity items where it could properly note that if you purchase something recently, you’re not in a space where you’re going to buy it again soon. But in having done that, I ran into a lot of trouble. Trying to manage and keep track of all the models that we built was a lot harder and much worse, a process that ran relatively quickly in about 20 to 30 minutes, moved on to many, many hours of runtime and was very for me to test. The fundamental problem is this, taking this naively, I’m going to start with my big data frame of features here. I’m going to look at all my categories and go over them. And I built a loop. I thought, let’s build my eggs model, my milk model, my tea model, and my battery model and my frozen pizza model. As we trained each model, we would filter out the data here and then we do our machine learning process.
If you actually think about what happens, we start with data like this, a big spark data frame of all of our data, all of our features, all of our labels. And we go through once for a first model. Then we go through a second time, a third time, a fourth time, a fifth time. We have a lot of time that we’re spent seeking through the data. We can partition ourselves through the data. We can partition the data beforehand to save on some of the seeking, but we’re still not really doing a lot of parallelization. This is a case of data that is embarrassingly parallel. The model that I train for eggs does not affect the model that I train for tea, and vice versa. So this is really something where you can scale it out as widely as possible and get as much benefit as you can.
So we tried looking at some approaches for trying to paralyze this. The first thing we did was using Python’s multithreading module, which gave us some amount of improvement in speed. But what we ended up finding was, once you have more than two or three processes happening at the same time, they really start competing for resources with each other. If one process finishes before another process is complete, that process sits idle. If you’re in a system where you have a dynamically scaling cluster, you’re not really in a good position to take advantage of more machines getting added to the pile. There’s still a lot of redundant searching. And in the end, don’t go this solution.
So with a lot of searching, a lot of going on stack overflow, a lot of pestering the Databricks team on Slack, I came across a technology called PandasUDFs. PandasUDFs let you write Python and Pandas user defined functions to do whatever you need. Initially I was very hesitant to work with these sorts of things. I had been warned against using UDFs because they’re slow. They don’t really leverage Spark very well. But what I found is by going from a regular Python UDF to a PandasUDF, you really get the speed up and compute and numerical work that you do as you move from Python to Pandas. There are a number of different PandasUDF available, but I’m going to start with the one I found the most helpful, the data frame input, data frame output known as the Grouped Map.
From a high-level view, a PandasUDF is going to scan through your big data frame once and partition the data by your grouping key. Each of these separate Spark data frames then get converted into a Pandas data frame, at which point Spark is going to run your pandas function in parallel. It’ll process these, it’ll work through everything and then at the very end you’ll return a Pandas data frame, that’ll get converted from Pandas to a Spark data frame and these Spark data frames are concatenated together at the end.
From a very practical view, let’s look at some code. So this is very prototypical data science code that I would have written as I was testing things. I have my imports at the top. I have a lot of Spark code, that’s going to build my features and my labels for training that I’m just sticking in one line here. I’m going to say which column is my label, which columns are my features, which column I’m using to divide up my data, my category, where I want to save my model and some test category. So let’s start with just building a single model on frozen pizza, for example. I will take my data frame and filter it down to that one test category and turn it into a Pandas data frame.
I’ll extract my features and my label, do a train test split, toss it into a scikit-learn random forest classifier, not a spark random forest classifier. Get my predictions for my test set, get some model metric. In this case, I’m using the precision recall AUC. Save my model to DBFS. Move it here. And finally, I’m just going to print my models for my PRN AUC here. Now, first thing I wanted you to do is just pay attention to these lines over here. I will just stick these inside of a function. So now, instead of just having a script that does this, I have a function called fit model, where I look where I just run this function on my Pandas data frame, and I’m going to return my PR AUC as a value. This is a very small change, but now I’m very close to being able to run this on every single category.
The last thing I’m going to do is add a decorator at the top of my function here, PandasUDF. I’m going to tell it what my output is going to be. So I will have a column of categories, which is string type, a column of PR AUCs, which is a double precision floating points. I am going to tell it that this is a grouped map PandasUDF and then at the very bottom here, instead of just returning my PR AUC, I’m going to return a Pandas data frame with one row where I’m returning my category and my PR AUC.
Now, instead of calling my function on my Pandas data frame, I’m going to take my Spark data frame, called group by, give it the grouping key and apply this fit model. And last, I call it collect. That is it. I have taken my data science code, I just made it from a script into a function. I added a decorator and messed around a little bit with return types, but that’s all it took. I have moved from being able to fit one model to fit in all of my models and letting Spark handle all the parallelization for me. I’m returning a data frame of model metadata, but probably the more interesting thing here is that this function has a side effect where we save the model to DBFS or we use ML Flow to register the model, or we do something else to stash the model for later. This is perfectly fine. PandasUDFs can do it. It has no trouble.
I am bringing in global variables from the script. That’s fine too. You can define these things outside of the function and bring them in. And this took a 10 hour code and let it run in 30 minutes. Some important refinements we can make to this technique. One thing, instead of giving a string description of the schema, you can use the PySpark SQL types module to define your schema from there. The one real advantage you get from doing this is you can be more strict about what data is nullable or not. This is the features that are labeled to true up in the fit schema.
Another refinement I found is that Spark is not great at giving you stack traces if you have an error. If I ran the spit model on a Pandas data frame, it gives me a stack trace. It’s fairly easy to diagnose because it’s maybe four or five layers deep. With Spark, you end up getting a very, very long stack trace. If you run into an error, if you raise an exception, that becomes more of a problem if you’re trying to diagnose errors.
A work around I found is simply to just put your code into a tri except block, run your regular code under the tri block, add an extra column called error message or something like that. If the code runs as expected, no error message, everything is good. If we do have an exception, though, we will not send out the model metadata, but instead we will provide the error message as a string. After I’ve run my code, run my PandasUDF, I will look back at that metadata table and see, is this error message column all null, or are there values where we had an error? If that’s the case, I can either filter those things out or I can go back and I can test my function with the categories that I found gave me an error and try and diagnose what went wrong and how I can fix it.
Another thing to know, you can have multiple grouping keys. In my initial example, I was only using product category as a grouping key. But we could, for example, have product category and product brand, be a grouping key. Instead of just providing one key, you’ll provide a list of keys and PandasUDFs will group by both of these things. Last thing to note is, this is a transformation, it is not an action. When you initially run the apply method, it will simply add it to the queue of the things it needs to do. You’ll need to run something like collect, count, save as table, to actually run your PandasUDF and see if things ran as expected, or if you run into any trouble.
So I gave an example of fitting data. Let’s look at using our models to score data. The process is very similar. I’ll take my script for scoring data. I will put it into a function. I will add the PandasUDF decorator. The only two real changes that I’ve made from fitting to scoring is instead of saving a model, I am loading a model. And instead of returning a data frame with one row of my category and my bottle metrics, in this case, I’m returning a data frame with multiple rows. One row per household for each category and the various propensities for each. It’s perfectly valid. PandasUDFs has no trouble with this.
If you are working with PandasUDFs, one thing to know is that as soon as you apply the decorator, the function no longer works on a Pandas data frame. My initial issues, if I was trying to handle errors, was to comment and uncomment the decorator, but I quickly realized another thing we can fix around this is just to have a wrapper function. So the fit model function is left alone. I can throw Pandas data frames at that, but the wrapper function will get the PandasUDF decorator. And that’s the function that gets run in my group I apply.
And that’s basically it. This has really become my one weird trick for productionalizing workflows. It’s not always the right tool. If you’re in a situation where you can do your code in pure Spark, I would recommend staying with that. But there are a lot of cases where you are wanting to build these ensembles. You have some for-loop that you should get rid of. I’ve gotten it to work with hundreds of thousands of models, and I’ve done multiple things with it. We’ve done regressions, classifications, clustering, networks and graphs, factor analyses, some file IO problems. All of these cases, you have the capability of Python and Pandas available to you.
So I have given you an overview of the group map UDF, which I think is probably the most helpful. With that said, there are some other PandasUDFs available and let’s go through them and see some of their use cases. The first is the group aggregation. Simple aggregations like count, sum, mean, medium, these are already available in Spark. And if you’re in a situation where you need to use them, use them. But let’s say you were looking at a more unique aggregation, like the Hodges layman send estimator. This is an estimator for location, much like mean or median, but it gets some nice benefits from both. It’s more robust than the mean estimator. It doesn’t have as much trouble if you run into outliers here or there and it’s more efficient than the median estimate.
So when you do have small cases of data, it’s better able to estimate the center of your data as opposed to the median. In Python, this is very easy to express. The HLS estimator simply looks at each pair-wise average for your data and gets the median from there. By using the editor tools, I just get combinations of my data of two elements. I look at the medians of that and I divide by two. Spark, not so much. So being able to express this in Python, I can just do that. I can use my group ag PandasUDF and by using the ag function, I’m now able to use the HLS estimator to get the look sort of the center point for my data.
The scalar UDF has two iterations. Scalar and Scalar Iter. I’m a bit more familiar with Scalar, but they tend to go for the same thing. I had a problem where I had a bunch of embeddings for my data. So an embedding takes categorical data and turns it into vectors. And I wanted to see which items were closest to other items. To do that, I needed to compute the cosine similarity of all of these vectors. This is easy to do in Python. You just get vector.vector B and you divide by the norms of these two vectors. Theoretically, you can do this in PySpark using the linear algebra module, and I could not get that to work.
So by converting this to a scalar UDF, the first thing I did was just to create this cosine SIM function. This returns the cosine similarity of two vectors. Next I added the NP vectorize decorator, which now lets this work on a raise of arrays. Finally, I just create another wrapper for this function. I grabbed an index for my series and then using my with column on my Spark data frame. I’m able to use my cosine similarity.
This is the scalar. If you want to use Scalar Iter, what instead you would do is, you would have your cosine SIM function. You would not have the NP vectorized decorator, but you would replace return with yield. Changing this from a function to a generator. Functionally, it works out to be the same. I’m more familiar with using NP vectorize, so I tended to stick with the scalar rather than the Scaler Iter, but it will get at the same thing. One thing worth noting is, in Python, it’s easy for me to return multiple values from a function. You essentially return X, Y, Z, which gets packed as a tupple. And you can unpack the tupple as you call the function.
This is not a valid thing to do with Scalar and Scalar Iter and UDFs. If you have to return multiple values, you are either going to have multiple UDFs for each column of data that you want, or you can return a more complex data type, either a dictionary, which turns into a map, or you can pack all of your values into adjacent string and unpack your values later down the line. The code group map is a new UDF that got added in Spark 3.0. It is similar to the grouped map, but instead of having one data frame input, you now have two data frames for input. These get grouped by the same key and so I would use them in cases where a grouped map seems appropriate, but you just need two inputs rather than one. Maybe you were doing some interesting merging on the two data frames inside of your UDF.
Now, everything I’ve talked about up until now is old Spark. This was available and Spark 2.3 onwards. It is still syntax that is valid. It’s still how I tend to write my code. However, as of last year, with Spark 3.0, there are some changes that have been made to how you can work with PandasUDFs. First, instead of defining a PandasUDF as a group ag, Scalar, Scalar Iter, you can instead use Python type hits on the function that you’re giving, and it will infer the type of PandasUDF that you’re giving it. You still need the decorator and you still need to give it the schema with that.
More interestingly for me is how grouped map has been changed. The old approach, as we see you have your decorator, where you give you the schema and the group map UDF type and your function, you say it has a Pandas data frame input, a Pandas data frame output, but I can’t actually run it on a Pandas data frame and so I needed my wrapper function. For the new type, I don’t need the decorator on my UDF function. It just has a Pandas data frame input, a Pandas data frame output and instead of calling group by apply, I called group by apply in Pandas. I give it the function and I define the scheme of there. This works for both the PandasUDF group map and the PandasUDF covert map and I find this is helpful. It simplifies my code a little bit more.
So having gone over these, when are PandasUDFs the right tool? As mentioned before, this is not a replacement for pure Spark data frame operations. Those are more optimized. If you’re positioned to use them, use them. But if you’re in a position where your code is not available and PySpark and you need to use some Python and you might tend to go towards a regular Python UDF, or when you find yourself reaching for a four loop on your Spark data frame, that’s the time when you should think PandasUDF and start pulling them out.
Now, as I mentioned in the introduction, there is a serialization deserialization limits of the amount of data you can get in and out into a Spark, into these Pandas data frames, or at least there was. So previously, the Pandas data frame that went into the function has to be less than two gigabytes. The Pandas data frame that went out of the data frame had to be less than two gigabytes. This was based on using an older version of Apache Arrow as an intermediate data type format. And there really was not a way around that. As of Spark 3.1, which came out late last year, this has been relaxed. They have upgraded the version of Arrow that’s available. You no longer have to worry about this being your data frame limitation, although you’re limited around RAM available for data frames inside the function. But I think it’s worth going over some workarounds for this two gigabyte limit.
Either if you’re working in a situation where you can’t upgrade to beyond Spark 3.1, or you’re looking at legacy PandasUDF code that did have to deal with this limit and you want to understand some of the design decisions that were made. Some simple workarounds you can do. You can lower the data intensity. So by moving, say, from double precision floating points to single precision floating points or shortening strings that are used for defining categories, those are ways you can reduce the amount of data here and there and possibly squeak yourself under the two gigabyte. If you have any symmetries in your data, which is not common, but sometimes comes across, you can give part of the data and then for the rest of the data from the symmetry. Case in point, we were working on a factor analysis problem, which involves creating a matrix of all the pair wise distances between the items.
Now, the distance between product A and product B and then embedding, is the same as the distance between product B and product A in the embedded. So, really, the matrix we can send in only needs to be the upper triangular matrix. We only need this to send in data where product A’s ID was less than product B’s ID. And then I could essentially just flip product A and product B inside the PandasUDF, recreate the entire matrix and do my factor analysis from there. This cut the amount of data we were sending in by two and got us around the two gigabyte limit.
If you have any data and you can offload, either two variables or inessential to your data, so data that is not your grouping key, your features or your label, get rid of that. You want to send in as little data as possible. Now, if you’re doing model fitting, one approach is to subsample your data. You don’t necessarily need all your data for your model. What I ended up doing for some of my models was I was taking a random forest, I knew what my hyper parameters were approximately. So using Spark machine learning, I would train my random forest with the same hyper parameters on 100 data points, 300, 1,000, 3,000, 10,000, 30,000 and so on and so on. And I looked at how my PR AUCS and other model metrics would change as I had more and more data.
Looking at this learning curve, I found that, for example, I only needed about 100,000 data points before I started getting really marginal returns on improvements in my model metrics. So that told me that I only needed to send about 100,000 data points or maybe 200,000 if you’re doing a train test split, to essentially get the same model metrics that I needed. If you’re doing a classification problem, this might be an occasion to look at your class cardinality. You might have a case where your negative class is so much larger than your positive class, and you’d want to get rid of a bunch of examples of your negative class anyway, in order to balance your two categories. So get rid of those extra data points in the large category beforehand, before you send it to the PandasUDF.
This is an approach for fitting, it’s not for model scoring. Because for model scoring, you don’t want to throw away your data. But there’s an alternate approach for scoring. If you’re scoring, I recommend an approach called salting. As I mentioned before, you don’t need to have one grouping key, you can have a list of grouping keys. So the approach here is to add an extra grouping key that I’m calling a dummy key, and that grouping key gets assigned a random number. In the example, here, I’m assigning it a number between one and five.
I will group by my main keys, as well as this dummy key. And by doing this, I’m only sending in 20% as much data to my UDF as I did before. This puts me under the two gigabyte limit. I have five times as many operations to do, but Spark is able to handle that, that’s not a problem. If you’re in a case where you have very unequal sizes of data by your grouping key, so your data is very skewed, you could even have a dummy key whose cardinality is more dynamic based on the size of the category. And that would actually let you partition your data into more equal sized groups. And that will help Spark run through the process even more efficiently.
All right, so everything I’ve gone over now is Python and PySpark. If you’re working in some other frameworks like R and Koalas, what do you do? Koalas is not very different. It is fundamentally PySpark in the bottom. It has very similar syntax to Pandas data frames. And much how you can have a Pandas data frame group by apply, you can have a Koalas data frame group by apply. I don’t have to give my UDF function a decorator here, but one that is worth noting is my type hint is a little unusual.
It’s worth remembering, Python type hints are just that. They’re hints. So this UDF function is a Pandas data frame to Pandas data frame operation, but I’m going to give it a Koalas data frame return. And I’m going to give the schema value there. And that is how we tell it what the output schema is. If you don’t give it the Koalas data frame schema, it will run a few operations to try and infer what the schema is. But this is something that I think is worth being more explicit than implicit about. And finally, if you have a two gigabyte limit for a PySpark and as UDF, you’re going to have your two gigabyte limit here in Koalas.
If you were in R, there are two frameworks you’re generally working with. You’re either working with SparkR, which is closer to the PySpark, ScalaSpark API, or you’re working with SparklyR which is more close to how you do things with Tidyverse and that types of operations with Python. For SparkR, you have gapply as your grouped map and dapply your scalar. And for SparklyR you have the function spark_apply, which does both. Gapply, dapply, et cetera, are not terribly different from how you’re handling things with PandasUDFs. But there’s a couple of refinements I would like to point out here. First, with PandasUDFs, I did my imports at the top of my script, at the top of my notebook, whereas here I’m actually doing my library call, my imports inside of the function. It doesn’t pass some libraries as well as it does for PandasUDFs.
The schema is similar. You can give it a string, or you can look for struck field structure type. For your R data frame that you’re returning, one thing you need to be aware of is if you’re returning string values, you need to give it the input strings as factors equals false. R’s data frame will automatically turn strings into factors thinking that this is categorical data and Spark doesn’t know what a factor data type is. So by giving it the strings as factors value, you’ll tell it, no, this is string type. And then Spark will say, you’re giving me a string, okay. I know how to work with this string. Finally, there is gapply collect and dapply collect, which is gapply and dapply with a collect call put on top of it.
In SparklyR, you have the spark_apply function. Much like Koalas, you don’t need to specify the schema. It will run your code a few times to try and infer what the output schema should be. You can give it the grouped by value as an optional parameter. Whether or not you have this group by value is essentially what turns it from being a grouped map operation to a Scalar type operation. Now it’s worth noting that the spark_apply function is actually a little bit more than the gapply, dapply type function. Because it has a final operation that runs at the very end to try and clean everything up that dapply and gapply do not have. So if you’re starting with kind of a fresh code base, that might be something that biased you towards using SparkR as opposed to SparklyR.
So to summarize everything we’ve talked about, PandasUDFs are a very powerful tool, but they are various situation. They give us capabilities in Python or R that we wouldn’t otherwise have available in Spark. And they work fairly efficiently, as opposed to just a regular UDF function. Look through your code. Look for cases where you’re trying to put for-loops. Look for cases where you have regular Python UDFs and try applying PandasUDFs as a replacement. And you should see that they may be able to really speed up your workflow. That is everything. Thank you very much for your time listening to me. I am happy to answer any questions that you may have.
Paul Anzel is a data engineer (and former data scientist) who has focused on taking one-off analyses and turning them into production data products. He has worked on statistical process control for da...