Spark SQL provides a convenient layer of abstraction for users to express their query’s intent while letting Spark handle the more difficult task of query optimization. Since spark 2.3, the addition of pandas UDFs allows the user to define arbitrary functions in python that can be executed in batches, allowing the user the flexibility required to write queries that suit very niche cases. At Quantcast, we have developed a model training pipeline that collects the training data for tens of thousands of models from petabytes of logs. Due to the scale of data that this pipeline deals with we spent considerable effort trying to optimize spark SQL to make our queries as efficient as possible. This resulted in several techniques that use pandas UDFs to run highly specialized batch processing jobs that speed up our data processing pipelines by over an order of magnitude. This talk will go over the learnings we gained from this process, focusing mainly on how we were able to leverage our custom UDFs to provide significant performance gains. The main takeaways of this talk are:
– Hi everybody, my name is Michael Tong. I work as a machine learning engineer at Quantcast and the title of my talk is accelerating data processing in Spark SQL with Pandas UDFs. So at a very high level, we’re going to first start by a review of what Panda UDFs are, we’re going to go over some development tips, then we’re going to spend a few minutes talking about modeling at Quantcast. We’re going to discuss how we use Spark SQL in production and discuss some of the modeling problems that Quantcast tries to solve. The bulk of our talk will be dedicated to an example problem. So I pulled a problem from our production model training pipeline and we’re going to discuss all of the optimization tips and tricks we use in order to make this data processing step extremely fast with Pandas UDFs, and yeah, for the last bit just optimization tips and tricks. We’re going to iteratively and very aggressively optimize the problem using Pandas UDFs. And just as a high level overview of what all the tricks we’ll be using are, we’re going to first try to do as many things in memory as possible.
In general, loops are better than generating Spark SQL intermediate rows. So we’re going to look for as many ways to do as many things in memory as possible. We are also going to try to aggregate keys. Typically, data processing works better if there are less unique keys. So we’re going to look at our specific data problem in order to try to reduce the number of unique keys on our problem. We’re going to use inverted indices, inverted indices tend to work very well with sparse data and it just so happens that for this example problem, we have very sparse data. And lastly, we’re going to talk about Python libraries. And this is essentially now that you have written a fast UDF, you can use some of the basic builtin Python libraries to speed up your Pandas UDFs by, in our case, the factor of about three.
So again, just your feedback is very important. Don’t forget to rate and review these sessions and without, out of the way, we’re going to start with our first section. So our review of Pandas UDFs. So what are Panda UDFs? Well, first of all, a UDF is a user defined function and Pandas does UDFs are part of Spark SQL or Apaches Spark’s module for working with structured data. And Panda does UDFs are a great way of writing custom data-processing logic in a developer friendly environment. And so right now, Spark SQL has hundreds of builtin functions and if it turns out that you need to do some custom data processing that isn’t supported in one of those hundreds of functions, well, you can write your own and Pandas is a great way to do all of this. So there are two main types of Pandas UDFs. The first is scalar UDFs, which are one-to-one mapping functions that support simple return types. So if you use map or struck types in Spark SQL, you cannot use those with scalar UDFs. There are group map UDFs. So this does require a group by operation, which usually means you need to sort or shuffle your data. But in return you can return a variable number of output rows with more complicated return types. So again, you can return via map or struck types on a group map UDF. And there is a group aggregate UDF. Although I personally recommend you use the group map UDF instead, because anything you can write with the group aggregate UDFs, you can also write with the group map UDF and it’s easier to just remember two types instead of three.
So just one developing in this Pi Spark framework just some development tips and tricks if possible, use an interactive development framework. At Quantcast we use Jupyter notebooks develop with mock data because Panda UDFs is called Python functions. If you have an interactive development environment, it makes it very easy to spin up mock data and develop in your local environment and iteratively call your function. It allows faster iteration while developing code. If you are using Jupyter, one thing I typically use are a lot of the magic commands such as time at time and prun, so you can easily profile your methods. And if you’re trying to squeeze every bit of performance out, having a one line command to do performance tuning is quite handy. And if you really need something more heavyweight, there is a Python debugging tool that’s called pdp. Okay, so now we’re going to go off on a little bit of a digression to modeling at Quantcast.
So the main theme at Quantcast at least in my head is scale, scale, and even more scale. Our mission is to try to simplify advertising on the open internet and because of this to predict various behaviors of people online, we train tens of thousands of models that refresh on a daily to weekly basis depending on the type of model. The models are trained off of primarily first party data derived from global internet traffic and it’s huge. If you look at the raw logs, there’s about 400 terabytes of raw logs on compressed rate in each day. We do have a lot of jobs that clean and compress the data to take up around four to five terabytes a day on disc. And for each model, we typically try to train off of somewhere in the neighborhood of several days to several months of data for each model. And from this, it comes out to order of several tens of petabytes of data that we could potentially read to train a single model. And because of this, we’re going to try to write our functions to be as fast and as scalable as possible. And with that in mind, we’re going to go into our example problem. So our example problem is we have about 10,000 models that we want to train, and we’re going to want to train these in some batch operation because intuitively training 10,000 models in a batch should be faster than training 10,000 individual models.
Each of these models will cover different geographic regions. So some of them are over large regions, like we want to train a model on everybody in the US, some of them are over specific regions, like everybody in San Francisco and some of them are over large regions but exclude specific regions. So everybody in the US except San Francisco, and for each model, we want to know how many unique IDs or users were found in each region over a certain period of time. And the reason we want to know this is because eventually we want to calibrate our models to behave in some way according to how big each region is. So to go into a little bit more detail, each model will have a set of inclusion regions. So in the previous two examples, these are like the US or San Francisco where each ID must be found in one of these regions to be considered part of a model. Each model will have a set of exclusion regions like San Francisco. This exclusion region that may be empty. But each model must be in none of these regions to be considered part of the model. And for the purposes of this query, each ID only needs to satisfy the geo constraints, wants to be part of the model. So if I was started in the US and moved to Canada during the model training timeline, I’m still considered a valid user for a US model.
And for those of you who prefer looking at tables, here I’ve constructed a set of tables to make things a little bit clearer. So I’ll start with our feature store table. So if you look at the feature store table, we’re going to have a set of IDs and timestamps and feature IDs. So essentially for this user at this time, we see that they have these features associated with this person. To see what all of those feature IDs correspond to, we have a feature map where in this case if you have feature ID zero or 100, this means we saw you in the US and if you have this feature ID one or 101, we see you in San Francisco. And lastly, if you look at the bottom left, there is this table of model data. So first example, like the US model, you need to have a feature row with the features zero or 100. In other words, we need to have seen you in the US. And for the model one where you’re in the US and not SF, we need to have seen you in the US and you do not have San Francisco feature associated with you. And so once we have this feature store table, we essentially scan through for every model for every row, find out which model where you associated with. And then we say, group by model selecting unique IDs, and then count.
So I’m going to pause here for a little bit to allow everyone to take in the data from the table and then proceed.
So now that you guys have looked at the example problem and looked at some of the example data, we’re going to discuss how to actually solve this problem in Spark SQL. So now we’re going to go to the optimization tips and tricks. And so I guess the first question that we need to answer is why do we need to even use Pandas UDFs.
We could just use spark SQL and in fact, we can. Spark SQL has built in functions to do everything we need. We can get all of the role model pairs using a cross join. There is a functions .array intersect to perform all of the inclusion, exclusion logic. We have grouped by an aggregate to get all of the counts. And it turns out when I was writing this it’s really simple. It’s less than 10 lines of code. So what we’re going to do is we’re first going to test this on a small sample of 100,000 rows and see how it performs. So I went and ran this on my local cluster. If you see this slide contains all of the source code I used, you’ll notice that I have a sample of 100,000 rows. I cross join it with all of the model data. I’ve wrote a little function to check if the inclusion hashes on the feature IDs intersect. So your filter, you should have at least one inclusion hash, and you should have zero exclusion hashes. And then, I basically apply a filter, select all of the valid modelId, ID pairs and then grouped by modelId under count. And so if we look at the results of query performance, the solution is terrible and it’s not totally obvious why the solution is terrible, so I’ll go into it. It turns out we only process about 25 rows per computer, per second as a reference point typically at Quantcast, we can get stuff easily running and be thousands to 10,000 range. So this is really concerning. And to see why the first thing is to look at the graph. You’ll notice that we have 100,000 input rows and about 10,000 models. And the number of intermediate rows we get from this cross join is about 70 million. So in other words, we generate about 700 times the number of intermediate rows as our input data to process this. And this is because every row on average may belong to several models. For example, if I have a lot of advertising things in the US and I happen to have a lot of us rows, in general, I will have more than one output row per input row. And just intuitively, there has to be a better way. I shouldn’t need to generate 700 times the intermediate data in order to solve this. And so the rest of our talk is basically going to be dedicated to how can I make this faster? So the first idea is we’ll just use Pandas UDFs to do all of the loops.
Instead of generating, right now, what spark SQL has to do is look at every ID model, ID pair, and potentially generate a new row for it. So what if we wrote a UDF to just iterate over all of the rows and memory instead instead of like doing a naive Spark cross joint, and it turns out for this example problem, I wrote some code to speed things up by almost doubled, 1.8x. If you look at what the code does, the code in a nutshell will store the model data in a Pandas data frame. I’m going to use a grouped map UDF to process the data for each ID. And I’m gonna figure out which models each thing belongs to in a nested for loop. And this is faster ’cause we can generate a few less intermediate rows. So if you look at what this is, I’m writing a Pandas data frame to take in all of the data from a single ID and just in the for loop where I write do the group by operation as nested for loop, instead of doing the cross join filter, I’ve just done it in a Python for loop and then returned. One note is this is also slightly faster because I implicitly do the ID deduplication here.
So it’s a little bit faster but there’s better ways to do it.
And the next trick we’re going to try to do, is we’re going to try to aggregate keys. So in model training, there are some commonly used filters and it turns out if you look on the table to the right, I have out of the 10,000 models, what are the most common inclusion and exclusion geos. And it turns out since Quantcast is primarily a US based company, it should come as no surprise that we have a lot of models that are just trained off of events in the US. And it seems silly that we would have to in the case of the US models, do 2000 checks to make sure are you in the US, when we could do it once. So what we’re going to do instead is we’re going to count on the number of unique model filters instead of the number of unique models. And it turns out in this data set, there’s about 473 model filters which is about 20 times less than the 10,000 models. So we’re going to write our Pandas function to iterate over the filters instead of the unique models. And it turns out this is really, really powerful. It makes our solution about 10 times faster than the previous solution. So again, to discuss the code in a nutshell, what we’re going to do is we’re going to write our UDF to instead of going over each model in the nested for loop, we’re going to go over each filter and the nested for loop and in order to generate the actual model IDs instead of a filter IDs, since there are only about 10,000 unique models, we can just store a table with all of the filter ID, model ID pairs in memory and use a broadcast hash join. So for our like expensive data processing step, we’re going to iterate over the filter IDs because it’s very, very fast. At the very end, we’re going to dedupe all of the keys with a small 10,000 rotatable at the end where they broadcast hash joint. And this already does a lot of work but we can go better. We can go much better in fact. So the next optimization trick we’re going to use is we’re going to aggregate keys in batches. So the idea is we only care about the number of unique IDs and we don’t necessarily have to group by IDs in our Pandas UDF, and what if we grouped by something that’s bigger than an ID. We could generate a lot less intermediate rows and we can take advantage of a lot of implicit Python vectorization that happens whenever you use things like Pandas or NumPy. So what we’re going to do is we’re going to write a user defined function that instead of taking in the data from a single ID, we’re going to take in data from batches of around 10,000 IDs per UDF call. And so what this allows us to do is to generate much less intermediate rows and almost have our, not almost, we’ll actually have our UDF work as like partial batch operation into which we take in batches of IDs and we compute partial counts and then we’re going to take the sum of the partial counts to get the final answer. And it turns out this is also a really powerful idea. In our case, it makes our solution about three times faster than the previous one and for those of you keeping score, we’re now at about 50 times faster than the naive solution. So just to see what this code looks like in a nutshell, so one thing you’ll notice on the table on the right, we use one million rows instead of 100,000 rows, ’cause at this point our code is running too fast to accurately profile 100,000 rows. So I had to increase the sample size, that’s pretty good.
And how we’re going to group things into things larger than an ID, is we’re going to take a hash of an ID modular batch size, and we’re going to basically group by the hash your ID. So that’s, if you look in the code, it’s where it says with column batch, and then there’s a like functions .hash ID module 100. So we’re going to have the UDF group each batch by ID and count the number of IDs that satisfy each filter. Again, we’re going to return a partial count instead of just returning one row for every valid model filter. The other thing you’ll note is that we group by operation where previously we were aggregating and counting, we are now aggregating and summing. Again, this is because we’re aggregating over partial counts instead of over individual elements. Okay, so now we have done a lot of aggregate keys. We’ve reduced the number of keys, and now we’re doing keys in batches. But we can still go further.
There’s this concept of inverted indices and this admittedly is a little bit more specific to our problem and for the rest of you, it may or may not apply as strongly though since it gave us such positive results, I feel is worth mentioning here. So the main idea here is each feature store row has relatively few unique features. In the case of our data, we have about 10 to 20 features per row. But again, there are about 500 unique filters. So it would be really nice if we had some way of iterating through our rows and returning answers and 0 of number of features per row time instead of 0 of number of filters time. And so there is a technique to do this, it’s called inverted indices, and you can look it up, but the high level idea is, again, we want to have an algorithm that runs roughly proportional to how sparse the data is. And because our data is extremely sparse, using inverted indices happens to be a better way to do things. And so again, we’re going to use an inverted index to iterate over the set of unique filters, unique features by that, instead of filters and it turns out we can apply one more trick of using set operations for the inclusion, exclusion logic. I’ll go into a little bit more detail on the next slide. But again, this is really, really powerful for our data set. It makes things about six and a half times faster than our previous solution. So if we go into the code, the code is fairly complex. So this is obviously more than the 10 lines of code I did for the Spark SQL. But the performance boost is amazing. So this is why we did this. So for the Pandas UDF, there’s this high level idea into which you can store or precompute dictionaries and maps and the like to make your actual UDF run faster. And so we’re going to use this trick to create maps for each filter ID to the inclusion and exclusion filters it’s associated with. So we’re going to construct our inverted indices to basically say, if you have this feature ID that represents you when you’re in the US, we’ve seen you and these inclusion ID filters and these exclusion ID filters. So we’re going to, not included in the slide, but I have constructed these dictionaries where for each possible feature you might care about, I know which corresponding inclusion and exclusion filters you belong to. And so if you notice in the for loop, what we’re going to do is for each row, we’re going to iterate through all of the feature IDs, and figure out what is the set of inclusion, filter IDs, and exclusion filter IDs those features belong to. And the code it’s where the loop says for feature ID and ID rows, and eventually I generate inclusion filter IDs and exclusion filter IDs where the comments has flattened the list and take uniques. And so now the site operation part is you must be included in at least one inclusion filter and zero exclusion filters. So I can take the sets of all inclusion filter IDs associated with that row and take that difference of the exclusion filter IDs to get the set of filter IDs. And this again is also kind of specific it has to do with how I’ve explicitly defined how an IDs is a region but you could imagine doing something similar with your code. And again, the thing that makes this really, really fast is using the inverted index instead of the weird set logic I do at the end.
And then finally, you’ll notice the final outer for loop, where it says for each ID in the comments is just so again, ’cause I’m taking in batches of unique keys, so I need to essentially group all my data for each ID and then for each ID, compute all of the filter IDs that belongs to and then do a little partial aggregate count. So I’m going to pause here for a little bit to allow you guys to read the code and then we’re going to go on to the last trick we’re going to use.
And now moving on, the last trick we’re going to use is Python libraries.
So Panda in general was optimized for ease of use and not speed, and there are other Python libraries such as itertools that are designed to make Python code run a lot faster.
It turns out for our use case, we only need to use itertools for other people’s use cases using reduce in NumPy are potentially also good candidates to consider for other UDFs, they just don’t happen to apply to ours. And so by using these Python libraries optimized to do very specific things, we can make it two and a half times faster than our previous solution. And this gives us something that’s 860 times faster than the naive solution. So we’re just going to dig into the code really fast, if you’ve seen the previous code it’s very, very similar. The only differences are just a couple of changes to facilitate using the default Python libraries as opposed to Pandas. So we’re going to use a Pandas data frame .values to extract the columns from a Pandas data frame. And we’re going to use itertools to iterate through our loops faster than default Python for loops. So again, if you look at the top where the comments says group things with itertools.group_by instead of Pandas, we’re going to extract all the values with .values instead of Pandas data frame .groupby, we’re going to use itertoolsgroup_by, and it’s the same thing except, just a couple of syntax changes. The last thing is to get all of the inclusion and exclusion filter IDs, instead of having a list and using a Python for loop to initialize everything, we’re going to use itertools.chain.from_iterable to generate the set instead of having a nested Python for loop. And it turns out these two simple changes are what’s needed to make our example code run two and a half times faster.
All right. So as a conclusion, Pandas UDFs they’re great.
They’re extremely flexible. They can write a lot of custom logic and they can be used to speed up a Spark SQL. We’ve discussed one of our example problems that we use at Quantcast, which is for each advertising campaign, how many users are considered in the universe for each model and applying these optimization tricks allows us to get almost 1000 times speed up over the naive Spark SQL. And you can apply a lot of these tricks to your own problems and watch things accelerate.
So I guess this doesn’t apply here, but I have a question slide. If you guys would like, I will hang around the stock for a little bit to answer any questions that you may have, and it may be, if you guys are interested, you can go onto some of the other potentially interesting tricks we use at Quantcast. I couldn’t fit them all in this example problem. Please don’t forget to have your feedback or send your feedback and rate these talks. Your feedback is very valuable to us, and I appreciate any feedback any of you happen to send and with that thank you. Thank you for listening.
Michael Tong is a Machine Learning Engineer at Quantcast. His current projects at Quantcast focus on developing model training pipelines to process petabytes of data to train tens of thousands of models. In order to accomplish this, he has heavily utilized spark sql by writing queries that utilize specialized UDFs to achieve performance orders of magnitude over the naive spark sql solutions. Michael has a Master's degree from UC Berkeley in Electrical Engineering and Computer Science where he focused on physics simulators and machine learning.