Care and Feeding of Catalyst Optimizer

Download Slides

You’ve seen the technical deep dives on Spark’s Catalyst query optimizer. You understand how to fix joins, how to find common traps in a logical query plan. But what happens when you’re alone with Spark UI and the cluster goes idle for 40 minutes? How can you diagnose what’s gone wrong with your query and fix it? Spark SQL’s ease of use can have a deceptively steep operational curve. Queries can look innocent but cause issues that require a sophisticated understanding of Spark internals to diagnose and solve. A tour through puzzles and edge cases, this talk challenges us to a better practical understanding of Spark’s Catalyst Optimizer:

  • Everything about how you – and the optimizer – reason about UDFs is based on the idea they’re cheap to run. What if they’re not? Betrayed by salt, a surprising source of skew!
  • What do you do when Spark’s codegen stage generates a method that exceeds 64k? What’s really going on, and is it possible to fix it other than just disabling whole stage codegen?
  • How can tuning the JVM code cache improve your Spark application’s performance?

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Care and Feeding of Sparks SQL Catalyst Optimizer.

So about me, I am a software engineer, with a long history of being in the wrong end of the data processing pipeline. So and eventually I realized that I was trying to fix errors that I could never fix at the wrong end of the pipeline. So I went back and started learning about big data and about Spark and about data processing in general. So I’m currently at the AI Group in Bloomberg, and here are some of my previous talks about running Spark on large clusters in the cloud and Serverless Spark. I also have an upcoming talk on integration testing in Spark.

So this is a talk about a very technical topic, what the catalyst optimizer does in Apache Spark, but it’s a non-technical talk as well, because it’s going to address some… When you’re running a Spark app, what is the catalyst optimizer doing? And if you’re having a problem with that app, how would you know if query planning is the problem? What does that look like in production and what would you do to fix it?

First, some data

So we’re gonna start with just a very simple data set that’s available on NYC Open Data, forestry planting spaces about all of the green spaces in New York city.

And we’re just gonna do a very simple schema inference and read it in and caches see?

Represent, ingest] cache

All right, so the amazing thing about Spark SQL is,

What are the greenest parts of NYC?

how low the initial barrier to entry is, see? If we want to know what are the greenest parts of New York city? This is so easy to do, right? It looks just like collection operations, or it could look almost like SQL, SQL, right? I mean, this was very easy to do, but the actual barrier to running the application can prove to be surprisingly high because when you optimize the query, sometimes unexpected things can happen. The outcome is not always wholly logical, it doesn’t always run in the same order that it’s written down in. So the barrier to understanding exactly what happened can be surprisingly high. So let’s take a look into understanding how does declarative SQL turn into something that execute in a distributed fashion across a cluster.

But the results are the start of your questions

How are those results compiled back together? When you see that your results took a specific amount of time to come back, well, why was that?

So the catalyst optimizer is what takes you from the “What” your query to the “How” how your query is going to run on the cluster.

Catalyst goes to work: Watch these colored pieces move through the query plans

So Catalyst goes to work. We’re going to take a very simple part of this query that you just saw the greatest sites in New York city. So we’re looking at streets sites, all of the places where trees are planted on the street, and we’re going to watch catalysts go to work. So you’ll notice that this query is highlighted, is color highlighted. And over the next couple of slides, you’re gonna be watching these colors move around the query plan so that you can understand how catalyst is thinking about this query, how catalyst is optimizing this query. So the first stage…

Getting the guery plans

Well, first you’ve got to understand how you’re gonna get the query plan. And if the query already ran, then you can see the executed query plan in the Spark UI, under the SQL tab. If the query didn’t run yet, then you can run explain on the query. And in Spark 3 there are some really exciting options, available for explain and you’re… Well, I mean, it’s probably not exciting to you yet, but when we get to the end of this presentation, trust me, you’re gonna be very excited. So now the what, parsing and analysis.

The what: parsing and analysis

This is the first stage of how something you’ve written down, that isn’t a data frame or a SQL query. Something declarative is going to be turning into something that can be executable. So parsing, analysis, these are logical plans. It’s an abstraction of the computations that are gonna run on the cluster. So we’re not saying anything about how, just what.

Parsed Logical Plan

So do you see these color highlighting here? Now what you’re gonna notice here is keep your eye on the project that comes after the three arm filters, okay? Where that is in this query is gonna be really interesting. So the first logical plan is not interesting to otherwise. Let’s take a look now at the analyzed logical plan.

Analyzed Logical Plan

Okay, the stretch should be a little bit interesting because what you’re gonna notice here is right under the top title, you’ll see Borough postcode and street site count. So analysis is already clever enough to figure out exactly what projection this query is gonna be bringing back, right? So let’s keep going.

The better what: Optimization

Here’s where the exciting thing happened, optimization. So the optimized logical plan is what’s going to be transformed into one or more physical plans. So optimization is gonna check for all the tasks, which can be performed together in one stage, it’s gonna fuse the map operations. If there’s more than one joint clause, it’s gonna decide how to order the execution of the query, and it’s not going to make a preconceive decision rather, it’s going to make a bunch of different decisions. And then it’s going to use cost-based optimization to decide what the cheapest, what the most efficient way to order these joints is. And finally, probably the single pick up, that’s gonna give you the greatest bang predicate, push down. It’s gonna evaluate the filter clauses before any type of project. So in short, move the least amount of data across the cluster as possible. So I mean, obviously there’s loads more to logical optimization than this, but I think that’s kind of a really good start. An idea that you’ve can apply various rules to transform your query, rule after goal, and then evaluate which one wins So what our query gonna look like after logical optimization And the interest, it’s gonna be a little bit different, right?

Optimized Logical Plan

Do you see here how the filter has been yanked down underneath the projection? And also something else interesting has happened here. Do you see that a little asterisk that highlighted red? That is a whole stage co-generation Mark, and whole stage co-generation is what we were talking about when we were talking about that is the mechanism by which multiple operations can be fused together. That is one of the reasons why, by catalyst optimization is as fast as it is. It’s capable of taking multiple operations and making them run together in one logical unit. So next up, the physical plan.

The how physical plan to execution

So the physical plan is executable. It is a strategy that specifies how the selected physical plan is gonna execute as a distributed a cyclic graph of RDDs across the cluster. So and if you’re really interested in the details of this, I’m not gonna talk about Thompson here, but my references at the end of the presentation contain a lot of links to papers and presentations that go into very great detail. It’s quite fascinating the optimizations that it’s making. So looking at the physical plan, here’s where things get really interesting because if you look at the asterisks and the numbers in bold, you’re gonna see that there are gonna be three separate whole stage coach and phases here, okay?

Physical Plan

And that if you look at the first phase, that’s where it’s going to push down those three filters statements and you’ll notice it that it some of its own it’s added checking that the site and the postcode are not known. And you’ll notice also that now there is a partial count that runs on the executors, and then in the third stage, a final account, that’s gonna run back on the driver. And then finally, the very last thing that it’s gonna do is take ordered and protect. So how’s that going to… So this looks like… without the highlighting, or maybe even with the highlighting, this looks like a great big wall of text, right? How is that gonna look if you were looking at it visually in Spark UI and the answer is let’s just focus on…

Spark UI SQL tab – first stage codegen details

So you see here in the detail on the side, here are the big blue boxes, these are the three whole stage Codegen stages, but let’s focus on the first one. You can see that this correlates back to the hashaggregate, the project and the filter. So you can clearly see here the filter is now before the project. So that’s how you can kind of begin to understand how the printed query plan and the graphical query plan correlate with each other.

So now we’re gonna talk about what does this look like when something goes wrong in production because probably what it’s going to look like is your cluster [indistinct] nothing happens. And here are some case studies from things that actually happen in production.

Case study D Groundhog Day

So the first study Groundhog Day was one,

a job that we were putting out on a large Spark cluster just stopped working for 45 minutes, like the whole cluster just wow. And then when no logs, no activity, the driver just kind of trumbling along, and then all of a sudden it springs back to life falters along and then dies sometime later. Wow. What just happened here? Okay, so when your Spark cluster is idle, hung, whatever, not seeming to do anything that when you need to take thread dumps, that’s what’s gonna give you an explanation of what’s happened.

Diagnosing Groundhog Day

And you’re gonna have to be persistent here because getting a thread dump with the smoking gun is going to be a matter of persistence, timing, and luck, and just being patient and analyzing all of them. What is causing the problem is in there, but it might not be in the first time thread dump, okay? Keep going, you will find it. So in this case, this is the snippet that led us back to the problem.

So the smoking gun, here’s the code that caused the problem, a stack traceable moment. So we had a data set and we’re about to do some highly repetitive calculations. I know this comes as a surprise, but sometimes our lives, this developer’s involved, a certain element of repetition. So we have a bunch of different column names here, signifying different metrics, and we’re gonna run them for different periods because long. All right, so we produced this kind of Cartesian metric columns, and then we apply that to the data frame, we applied that to the data frame with a full block, okay? So and the data frame expands out like that and hey, it should be fine, right? I mean, because Spark is awesome, right?

Groundhog Day: what happened

Well, yes and no, okay? Because when you’re gonna do something with full left, it hold in how you do it. And with column would be exactly the wrong way to do it because it turns out that the query optimizer cannot optimize chain coles to with column the way it optimizes a call to select because each week with column call is going to do this kind of repetitive process where it is creating a new data set and initiating a new analysis of the query plan. Good morning, Groundhog day. So and these repeated calls to with column can result in the party optimization process, taking a long, long, long time, a massively large query.

Groundhog Day: practical demonstration

So as just a practical demonstration here, here we have streets sites. It is just a trivial query, it’s got a filter and a account distinct, okay? So the original query plan before we do anything is 10.5 MB, and you can see this query plan, just expanding out. Imagine that we add 26 columns, each one of which is just a liberal, single character spring. Okay, at the end of doing that, our query plan is now 134.78 MB, okay? All right, and the data set that was running in production for Groundhog Day, it was much larger, had depend much larger to this to start with. So I think you can imagine, like, wow, it was pretty big.

So how big?

Bonus stacktrace: beware the mega-query

All right, well the actual query plan at the time that something that was trying to walk the query plan, because there was an error blew up that query plan was bigger to David Copperfield’s, that’s how big.

All right, so what’s the fix? The fix is when you add a whole bunch of columns to your data set with the Cartesian product of a whole bunch of metrics, and do it just once, okay? And if necessary, you might wanna catch that or truncate the query lineage in some way, your mileage may vary, but start with dot select, and you’re gonna be a lot happier.

Case study 2 “Salmiak”*

All right.

Next case study, Salmiak. Now, if you don’t know what Salmiak is, it’s a kind of salty licorice that is common to a couple of Northern European countries. And if you’ve never traded and you weren’t expecting it, you’re probably not gonna like it a whole lot, okay? So this is gonna become clear why it’s called Salmiak when you see that this was a data set that had, this is a large data set that had a very heavy heat. So that means a key where the frequency of the key is very heavily that you had a very high frequency of key A, pretty high frequency of key B, high frequency of key C and so on. And then you had a very long tail of keys with low frequency. So if you need to partition by this key, then you can immediately see that this workload is horribly skewed. So we’re gonna solve that to evenly distribute the workload where we’re gonna break up heavy key A into keys A1, A2 through A-N, Key B, one B2, through B-N, whatever. And then we’re gonna take the long tail and crunch that together into a synthetic key so that the workload is evenly distributed across the cluster. Then we needed to add a column, based on that heavy key that would use a UDF, and it turned out that these UDF was both deterministic, but also expensive. It was some type of unpleasant text processing, not a reg X whatever. It was expensive comparatively. So and then we’re gonna go on and run repeated calculations on the data set because that’s what you do in finance. So and the first front of the job just paralyzes the Spark cluster and runs out of memory.

But first! talk about Spark and UDFs

Now, first let’s talk about Spark and UDFs, the catalyst optimizer expects UDFs to be both deterministic and cheap. And when it optimizes queries, it could call this UDF either more or less fumes than that UDF appears in the original query.

Now, all right, so our first thought perhaps thought loosely, was, “Hey, the UTF is expensive, “the drive is running out of memory.

Salmiak: first try Proposal: give each executor a local cache. Exnensive UDF will lazily cache its

“You know what? “We’ll give each executor a local cash “and cash that cash the computation “inside the expensive UDF, “and then everything will be great.” Right? Except, you know we really hadn’t thought through very well how salting the key and adding the cash were really gonna kind of We’re at cross purposes with each other, but as you can see here in this table. Yeah, basically, you get a really high cache hit rate on the heavy keys, which just zip right through, and then the long tail just runs forever and eventually dies. Okay, so that didn’t work.

Salmiak: second try

Next up, we try just hate play it as it lays, take the keys and whatever distribution, add in a cash and just go. Except that didn’t work out so hot either. First off you’ll notice that the cash hit ratio is half or less of what I would expect to make it worth my time to thread in a cash and allocate the memory to it. And secondly, it still bogged down and died. So what happened here?

Salmiak: post mortem

All right, so first off we have two things across purposes. So salting is meant to combat skew by distributing the work easily while caching is meant to amortize the time for the expensive calculation by lazily populating the results in the cache. However, when they were combined, we got the inverse of what we wanted, namely skewed runtime, and about the passionate rate that like each memory. So it turned out that by not patching the data set after adding this column, and then calling transformations like filter, what was happening was that on top of the bad cache hit ratio, as things were shuffled around, the UDF was getting pulled again and again, and on places where the cash was not populated for that. So, yeah, awful, awful. Okay, so in short, it was like none of the benefit of caching, none of the benefit of distribution. So and as you can see from this article, which we found in the data breaks knowledge base, we’re not the first people to have been here, but man was that painful. So in conclusion, if your UDF is cheap and deterministic, stand back and admire the majesty of the catalyst optimize you’re doing what it does best.

Salmiak: conclusions

If your UDF is cheap or expensive, and non-deterministic, market is non-deterministic, it acts like you would expect, you’ll miss some of the optimizations, but hey, your problem is solved. If you fall into the space of a UDF that is expensive and deterministic, then you need to cache that data set before you opened Pandora’s box, okay? So now welcome to the pharmacy.

So assuming that you look back at those query plans and thought, “Wow, I’ve never really tried to read those before.” Good news, you really don’t need to be a genius to read through these query plans and figure out what’s going in on those. There’s only a few things that really commonly go wrong with Spark queries and you can learn how to solve those pretty easily. So here in the query planning pharmacy, we’re gonna talk about slow queries, how to investigate query planning issues, what to do if the code cache is full, what to do if Codegen generated a method of more than 64K, how to go in under the hood and debug Codegen if you need to supply additional information about your query, how to disable Codegen, and then finally how to we examine the number of partitions that are participating in each stage of your query.

Slow query –

So slow queries. Is it a query planning problem? I mean, there’s all kinds of reasons… To be clear there are all kinds of reasons that a query could be slow on Spark, okay? And only some of those, in fact, perhaps a minimum of them are really related to query planning by now, but here’s how to tell if it is related to query planning. So first off, when you look at the query plan, a signature problem with query planning is the optimized logical query plan, besides of Dickens novel. Was your original query this big, and the query plan is like that big. That’s probably a sign that you should investigate the query plan. And in general, this can be caused by some type of iterative algorithm, common to graphs or ML, okay? You know who you are, or in our case calling with column many, many, many times with column with query name, anything that causes a new data set to be created and a new query plan to be regionalized. So you can fix that pretty cheaply.

How to investigate query planning issues OK, so it’s not something LOL iterative, what next?

So what if you need to dig a bit more deeply? So now you can start running through Spark Jira because the good news is, it’s pretty unlikely that you’re the first person ever to have had this problem, and Spark Jira is full really well documented issues with responses from core developers. So some things to look at are for instance coalesce. You should know that the Spark optimizer will try to push coalesce as far up into the query graph as possible. So that means if you have a node with… If you have a Spark cluster with a thousand nodes and you say coalesce two at the end of your query, what’s really gonna happen is coalesce two will run at the beginning of your query. and then all the rest of your query will run on those two nodes, well, 998 node shit idle, okay? So there are ways to get around that, but you should be aware of that. That may be why your query is slow. Joins, I mean, that’s like a whole presentation in and of itself, but as far as query planning goes, one interesting gotcha I thought I would highlight is you did not write across Join but the logical evaluation of your query under certain conditions could lead to a cross Join. Here’s a Spark issue that illustrates how that could happen. UDFs again, that could be another hole, that’s a pathology all its own, but don’t nest them, stuff like that. Case statements that could also lead to trouble. Repeated calls to cast an alias. Look, schema inference is beautiful, it’s amazing that that works, but if schema inference what you want running in production on 20 outer joins, probably not. And then finally, aggregation on decimal columns. If you need precision out to a certain number of places, this could be something that trips you up. So all things to look at, these are issues that I’ve included as an illustration, not as the only problems that can occur. All right. So the code cache is full. Now, this is one of those, JVM, blah, blah, blah, blink lights messages that maybe you’ve never looked at before, but you absolutely should. Because if you see this message in your Spark logs, that means that Spark cannot optimize your queries. If Spark cannot optimize your queries, it’s gonna fall back to running the physical plan, using a slower methodology. How much slower? It could really impact a large application. So if you see this, then there are two things you need to do, first off, give the Spark driver more memory and secondly, tune your code cash to ensure that there will be enough room. So the way that you do that is first, these are two diagnostic JVM flags that you should just put into the driver option and do the driver extra options on every single Spark driver that you run ever, because this is gonna give you an idea, like what does code cache usage look like?

Code Cache figuring but usage

Am I really getting close to using up all of my available code cache? That would be really bad. And if so, here is how you would tune it. What you wanna do is first off, in part the total amount that you allocate for the code cache, that’s kind of important. I mean, obviously too little is bad, but what you can also tune is the reserve code cache size and the minimum free space to make the code cache flush more aggressively to keep the code cache to ensure that enough of the code cash is free when Spark needs it. There are some additional options that will be available to in JDK 11 that I haven’t highlighted here because most of us are still living in JDK eight, but there are things that you can do to make the code cache more efficient.

Codegen failed because generated code > 64K

All right, now here this Codegen failed because the generated code is greater than 64K. This would probably be the most common and one of my least favorite, actual like exceptions seen in production. This happens just when the generated query grows so large, that Codegen just can’t do it anymore. Because you’ve hit a hard limit on the JVM. And at first you think 64K, that’s absolutely massive. That is a lot of white code what happened? But when you start looking at the generated code and see what’s going on, you realize that in certain pathological or iterative cases, that it could be pretty easy to hit that limit. So really, and the impact is yuck on Spark just loggers, this massive stacktrace and then it’s gonna execute a slower plan. So now we’re gonna decide like depending on what the impact your application is, how to fix it.

Codegen 64Kno quick fix

So the bad news is that there’s no immediately quick fix, first off, if you wanna do the right thing, you’re gonna have to figure out why the large query is so large. In most cases, probably in 90% of cases, either you’re doing something iterative or you’re doing something that’s causing a new data set to be created over and over, and then required plan to be analyzed over and over again. If you fix that large query has gone, otherwise you’re gonna have to really dig in and figure it out. The other suggestion would be either cash it or checkpoint it to truncate query in the news. There could be a performance impact that, okay, but better performance impact from checkpointing than your entire job, like falling over. So and the other thing that you can do is in the rare case where you have hit some type of a critical Codegen bug, you can disable Codegen for everything. That’s probably not what you really want to do, though.

Codegen >64K capturing generated code in ogs

So now capturing the generated code in logs. Now, if Codegen does fall down, you probably want to log some of the code generated. The default is that those in lines, which have Codegen generated more than 64K is going to be nothing.

I mean, barely any any of it. You can set this to negative one to log the entire error, but where that may really like that can be a massive amount of query. So instead what you, if you prefer, you can just disable logging error. That’s another option, I mean, if you know query that’s the problem and you’re working to fix it you can disable logging the error.

Codegen >64K stop logging the erron

And then finally, if you want to turn off the carbon monoxide detector, what you can do is configure the logging to turn off logging for that specific class, not recommended, but better than paging the on call engineer at two o’clock in the morning for an exception that did not halt the application.

Now, let’s say that you wanna debug what’s going on with Codegen. Like you wanna get roll up your sleeves and get deeper into that. You can call debug Codegen, or in Spark 3 you could say, “Key out explain with Codegen mode.” And that was gonna give you… In this particular query, you’ll notice I ditch the count to reduce it to just two whole stage Codegen subtrees I mean, the original greenish query had seven of them, which is why we’re not looking at this here. They can get big, fast, but let’s just take a look at a really small one, okay? So you’ve got…

So you’ll see the time for the command and you’ll see that it’s creating two whole stage Codegen subtrees. So let’s take a look at what’s going on. So the first subtree, so what this slide here is showing you is what does the… Is what you’re looking at, the executed query plan that you’re seeing in Spark UI, how does that correlate back to the output that you’re seeing from diva Codegen, so that you can kind of have a picture in your head when you’re looking at it. And instead of focusing on the individual closets, like we did back in the capitalist optimizer section. Now the colored arrows are pointing at kind of the different boundaries. So for instance, you’ll see that the in-memory table scan, hey, that was prepared by cache and you’ll see that whole stage Codegen has keys together, a filter, a project and a hashaggregate. So I’ve kind of artificially captured here, what the details, what those three causes, what those three different phases are, but you can see here that generated 419 lines of code, right? So now taking a look at the second stage.

So exchange, whenever you see exchange in Spark UI or exchange hash partitioning in a plan, what you’re seeing is shuffle, something that sends data across a network. The reason why it can’t be fused in a whole stage Codegen is this is not a math operation. This is something that’s physically moving data. So that’s why it’s a boundary. But you’re seeing here kind of that really the exit, that really what Spark UI is showing you is kind of almost, a combination of the physical plan and some parts of the logical plan. So you can kind of build this up in your head, but look here in subtree two, what you’re seeing, do you see how I’ve highlighted the numbers? One is blue and two is green. What you’re seeing here is kind of stage one is executing on individual executors, then this pink highlighted, exchange hash partitioning, that’s a shuffle. And then the yellow highlighted, stage two, that is everything folding back together, the final merge. That’s how we’re getting the final answer, and then you’ve got kind of a single take ordered in project that is handing us back the answer, like, “Hey, what are…” like ordered by Borough and postcode, what’s the count of streets sites? So and like I didn’t do a count distinct, so that eliminated the stage.

All right, so in certain rare cases, you gonna wanna point the finger at a specific role.

Under the hood: rule based optimization

So logical optimization is rule-based and physical optimization is partially rule-based. So how can you find out what rules are being applied and when? And the answer is rule executor collects metrics, and you can see those metrics. So and the way to see that is to kind of reset the metrics, and you can either to see the logical optimization you can just do and explain, and then print out dump the time spent or to see the physical. You can just do a show and then dump the time spent, and that’s gonna show you the metrics. Now, I have to tell you that this is not the easiest output to show on a slide, sorry. But what’s going on here is, in this text, you’re gonna see the effect four each rule that applied and keep in mind, I’ve redacted all the laws that had zero time that were not applicable here. You’re gonna see the total time that each role spent the number of runs, and then also the effective time, the effective runs, how many times and how many runs did this actually apply and do something. And what you’re gonna see here is that because this query is trivial on the optimizer spent, most of its time trying to dry out the, like not to have multiple methods with exactly the same content. And the reason why that’s important is that when Codegen generates the code, it’s gonna try to produce the smallest code. It’s gonna try to produce the smallest code size possible that will run. And if it can produce small methods that can be jetted in line, that’s gonna re-inflate the size of the code, but it’s gonna trade memory for space. So if the initial quote that starts out is this small as possible. If it does not contain duplicates, then you’ve got some budget to spend later when it’s gonna blow up through trying to Git it. So, I mean, that’s all kind of… There’s a lot of complicated optimization that Thompson does under the hood. But in short, if you start from the smallest code possible, you’ve got the most budget left to do optimization later. You can see kind of in green, I have highlighted a vendor specific authorization, different distributions of Spark making under specific optimizations. You can also see, I’ve highlighted in yellow, kind of the logical optimizations. There’s also analysis, there’s also rules that were applied during analysis. So this was a super simple query, so you don’t see any of the real Join based optimizations or anything like that. But in the event that there were a rule that were going wrong, you might be able to see something like a rule that had a vastly, a very long run time and a very short number of applicable runs or something like that. So you might find something interesting in here if a specific rule is the cause of your bum.

Unlikely, but it’s fun to look at.

Disabling codegen

All right, so what would be the impact of disabling Codegen? Just for fun, we’re gonna take a look at that, and you can see here we are comparing… We’ve gone back, once again, to the first stage of Codegen for the street types query, and we are taking, we’re just comparing, what’s in Spark UI for Codegen on one side and no Codegen on the other side. And what you’re gonna notice immediately is that without Codegen these operations have not been fused together, and if you see the area highlighted in red, it’s less efficient. So without Codegen, you’re not just giving up fusing the operations together, you’re giving up a whole class of optimizations, which allowed the code to run faster, to be more memory efficient, to be more CPU efficient. So you’ll see here that the time and aggregation without Codegen was 715 milliseconds, the time and aggregation with Codegen was 489 milliseconds. The peak memory usage, without Codegen was 32 MB the peak memory usage with Codegen was 256 kilobytes. So huh, okay, I mean, yeah. And this is a very trivial example. You could easily find far more subjective example. So however, the problem with this query actually turns out to be totally unrelated to Codegen.

Now look at the final HashAggregate operation

You’ll notice that Codegen makes this query faster, but the memory usage is still terrible, why is it 4.1 gig for a data set of this size? And you’re gonna notice that query planning cannot fix poor partitioning choices that this data set is being run that, that there are 200 partitions in the RDD and it’s being brought down to one at the end. And the clue to that was back in the physical plan with the exchange, that there were 200 partitions. That is way too many for a data set of this size. So when we update the shuffle partitions to only two, you’ll see that this aggregate now takes place, that this aggregation now takes place in only three milliseconds and uses only 64.5 MB of memory as compared to 4.1 Gigs.

Updating shuffle partitions

So this is thousands of times faster. So just to go back to my original point, there are many reasons why a query could be slow and looking at the number of partitions involved could very well be the reason that your query is slow. So I hope that this talk has given you the ability to better diagnose and understand query planning issues. So taking a brief look at some exciting changes coming to Spark 3.

All right. So Scala-212 and JDK 11 support will allow Spark 3 to create and compile more efficient query code.

Spark 6+ exciting changes

Adaptive SQL, query optimization remember predicate pushed down, remember, Oh, well, sorry that’s… Can we stop? Sorry. Okay. Yeah, thank you.

All right. Exciting changes coming to catalyst optimizer in Spark 3. Adaptive SQL Query Optimization is the single biggest feature coming in Spark 3, but on top of that, there’s gonna be Dynamic Partition Pruning. Remember predicate push down, what if you needed on the other side of the Join? Now you can, there’s going to be New Join hints, there’s going to be a Data Source V2 API that will last standardize something that’s been thorn in people’s side for awhile. There’s improvements to Pandas UDFs and Python type hints. And also the Spark UI SQL tab is now going to show you your query instead of the whole site in the code. So I urge you to take a look at everything exciting that’s gonna be happening in Spark 3.

That’s the conclusion of the presentation.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Rose Toomey


Rose Toomey joined Bloomberg as a senior software developer in the AI Group in April 2020. Previously, she worked as a senior software engineer at Coatue Management, Lead API Developer at Gemini Trust, and a Director of Engineering at Novus Partners.