Code generation is integral to Spark’s physical execution engine. When implemented, the Spark engine creates optimized bytecode at runtime improving performance when compared to interpreted execution. Spark has taken the next step with whole-stage codegen which collapses an entire query into a single function. However, as the generated function sizes increase, new problems arise. Complex queries can lead to code generated functions ranging from thousands to hundreds of thousands of lines of code. This can lead to many problems such as OOM errors due to compilation costs, exceptions from exceeding the 64KB method limit in Java, and performance regressions when JIT compilation is turned off for a function whose bytecode exceeds 8KB. With whole-stage codegen turned off, Spark is able to split these functions into smaller functions to avoid these problems, but then the improvements of whole-stage codegen are lost. This talk will go over the improvements that Workday has made to code generation to handle whole-stage codegen for various queries. We will begin with the differences between expression codegen and whole-stage codegen. Then we will discuss how to split the collapsed function from whole-stage codegen. We will also present the performance improvements that we have seen from these improvements in our production workloads.
– Hello everyone.
And thank you for coming to this session of Spark Summit. My name is Michael Chen. I’m a software developer at Workday. Today, I will be talking about understanding and improving code generation in Spark. So first, let’s take a brief look at the agenda for today. I’ll begin by talking about the basics of Spark SQL. Then I’ll talk about the two different models of creative execution, the volcano iterator model and whole-stage code generation Next, I’ll talk about some of the problems of whole-stage code generation, and how splitting the generated code helps mitigate these problems. Finally, I’ll wrap up by looking at the performance of whole-stage code generation after splitting the generated code for a specific career. So first, let’s look at the basic Spark SQL.
This diagram details all the steps of Spark SQL, starting with an AST text in tax tree or a data frame and finishing with RDDs. So first, we take the data frame or SQL AST in tax tree and create a tree of logical operators that will represent it. We call this the logical plan as part of the analysis phase. Then we move into the logical optimization phase. Here, we will apply a few rule based optimizations such as constant folding or projection pruning which then optimize logical plan. Next, we’ll move into the physical planning phase. Here, we take the optimized logical plan and we create one or more physical plans. Then we feed this to a cost-based optimizer and Slack one find office call plan for execution. We also perform a few more rule based optimizations, such as predicate pushed up. By the way, we do some code generation on this physical plan to create our RDDs. This taco mainly focus on code generation part of Spark SQL So we not only have a brief view of stack SQL as a whole, let’s look at the two models for truly execution, starting with the volcano iterator model. So what is the volcano integrator model? The volcano iterator model is a classical creative evaluation strategy and the basis where each operator will implement a common interface, which we can think of as an iterator. This interface will have an X method which we’re going to turn one to pull out of time. This means each operator will be able to evaluate their special logic one row at a time. For a concrete example, let’s look at the bottom picture. Here, we’ll have a plan that has a project operator, a thought operator and a scan operator. The code generation will help us resolve iterator. Next, when we’re trying to collect our results from this result iterator, we started off by going to the next method on the project. Then, the project method will need to obtain one row for input. We’ll complete this by setting off a chain of recursive next walls that will propagate until we hit the end of the logical On Start plan, then rows will begin to be pushed back upwards, one road at a time. So once the project gets a one road from its child, our next call. So that’d be able to perform the expression logic of the project upon that rope. Once it’s finished, we’ll return, I’ll put road to resolve iterator. And the main benefit of this volcano iterator model is it’s very simple to compose arbitrary operators together without having to worry about the data types that they are uploading since they will all be cast to this common interface. So now, let’s look at some raw pseudocode on how the volcano iterator model works in a filter operator. You can see here that a filter operator will have a child which is also an operator and a pedal kit that takes in a row. And it turns a bullion. We also see that there’s one next function, now we want to turn a rope. This is the comment interface, that all operators will implement. So first, tries to get it and put row to interact with it, it does this by calling child bottlenecks. Then it evaluates the predicate on this row and it continues to ask for next rows when it’s child until the predicate is satisfied. I always returned the row as the operator of this next method.
So, how is the pedicab actually evaluated in the volcano iterator model? There are two ways that sessions are evaluated into the book in the volcano iterator model.
The first way is interpreted evaluation. Here, we are going to look at the interpreted evaluation for the filter operator of a predicate of key is greater than one and val is greater than one. So, we start off with the ad expression. In order to evaluate the addict session, first, we have to go to our left child and then our right child. On our left child, we have we have a greater non-expansion. Once I got into an order to evaluate this flushing, we have to go to our left and subsequently our right First, it goes to the left child, what kind of bowel reference? And we got the data for that. Then we go through the right child, and we got the value for the visual of one. Now that we have these two inputs, we can evaluate the greater than and we return it back to the ad. Then we’re going to do the same thing for the right child. And once it gets us and put back for the right child, I’ll be able to perform expansion logic for the ad operator. So, in order to evaluate the predicate, Spark has to traverse this tree of expression nodes for every single row of data that a filter will operate. It’s very poor for performance since it creates a large number of branches and virtual function falls over on the plus side. Once again, we don’t have to worry about the data types and it’s simpler for people to understand this history of expressions. The next method of expansion evaluation in the volcano iterator model is expression code generation.
And here, instead of traversing, the tree of expressions, it’ll directly generate some code that will evaluate the product kit. So the main benefit, there are a few main benefits to doing expression code generation. The first benefit is, we don’t have to do that traversal of the expression tree. So, it really cuts down on the number of virtual function calls. The next benefit is that in Spark, we noted types of our attribute references at runtime since we require a steamer. So, we can immediately cast our internal rows to the appropriate data type and then use the primitive operators and bake that into our generated code. This a comparison to the interpretive model. This surely will require us to look at the data types at runtime and then use the switch statement to get the correct operators. This is also a great performance improvement. The final benefit of expression code generation is that, the compiler can further optimize the code that we created. For example, in this eval function, we will be able, the compounder may be able to detect function. By doing this, we further reduce the number of functioning calls that we have, once again improving performance. So now that we have a basic understanding of the volcano iterator model, let’s look at the next model of create execution, whole-stage code generation.
Whole-stage code generation was introduced in Spark 2.0 as part of the tungsten engine. And it was inspired by Thomas Newman’s paper; “Efficiently Compiling Efficient Grade Plans For Modern Hardware.” The main idea of this paper is that we can try to collapse an entire query into a single operator. And then generate one function for the entire query. By doing this, we would be able to avoid all of the next function calls that are inherent in the box. You know, iterator bubble. In reality, it’s not possible to class an entire credit into a single operator. This is for a few reasons. The first reason is, not all expressions will implement code generation. So, if an expression does not implement code generation, then it cannot implement whole-stage code generation, as a result, the coding will be cut off. Another possibility is, some operators may have to materialize all their child operators before executing. But this will also cut off the class thing of whole-stage code generation. Because of this the whole-stage code generation must also adhere to the common interface of the volcano iterator model or implements the comment interfaced with next. So here, we can see what the whole-stage code generation, will actually look like. We have a node post postage code generation node and inside of it, we have three operators, the project operator, the filter operator and a local tablescape operator. So now we go to the next interface on the whole-stage code generation node. It doesn’t have to call next for all of the child operators that it contains. Instead, it will just create a while loop and process the logic of scan filter interject without any of the virtual function calls. So this is obviously a great benefit from a performance point of view since we cut down on the number of virtual function calls. The next benefit of whole-stage code generation is that we can try and keep the data in the CPU registers for as long as possible. This is in comparison to the volcano iterator model where all of the outputs of that operator hardly pass through a common interface, it don’t pass up the function called stack. Instead, in whole-stage code generation we can take the results of an operator and assign them to a variable. And then the parent operators can simply refer to those variables. This way, we keep it the data in the CPU registers. The final benefit is that whole-stage code generation, the compiler is able to work much better for a tight group that makes up the whole-stage code generation function in comparison to the function graph of the Volcano iterator model. So now, let’s look at how the code is actually generated in whole-stage code generation. There are really two main paths of whole-stage code generation, the produce path and then the consume path. First, when we try to generate a code, we enter the can produce stuff. And here, we call produce on all of our child operators until we hit the producer operator. And we know it’s a producer operator because only those operators will implement the produce method. This operator will set up a line that would drive off the data for the whole-stage code generation. Then it begins to call consume on its parents so they can fill in their logic for doing a special evaluation until we got back through the whole-stage code generation. And we’d be finished with all of the code generation. So once again, if we look at the diagram at the bottom we see a whole-stage code generation node that would contain a project filter and a local table scan operator. So, once the generate code call comes into the whole-stage code generation node, it follows to produce stuff like calling for Dusan’s children. The produce will fall through until it hits the local table step. And it will set up the wire loop to generate the data. Then it’ll begin calling consume on parents to generate code for their logic. Well, first they call us to consume what’s on the filter. So the filter can fill in the code to do the expression evaluation of key is greater than one and value is greater than one. Once that’s finished, (indistinct).
So it will fill in the code to do the projection logic. What we call consumed on the whole-stage code generation node and we’re done. We’ve finished generating our code. If this is very interesting to you, you can look at our previous Spark Summit talk called A Deep Dive Into Query Execution Of Spark SQL that goes into this produce and consume path in much more detail, along with other parts of Spark SQL.
So to really drive this home, let’s look at how the code would be generated for the previous query.
Like I said, the produce method falls through until we hit the produce operator and discuss the local tablescape. The local tablescape we’ll set up or we’ll create a while loop that will drive the data. And then also do some of the logic for the scape. Next, phone call consume on his parents which is a filter. The filter has the predicate of cues, greater imbalance. So we generate a code that will do this evaluation. In this case, we create two booleans. And if either boolean is false or quickly pass over at this iteration of the loop. Next, we’ll call consume or parent once again, which is the project. And that’s all assigning a value of project to whatever the expression evaluation is not operated, to exclusively to keep us warm.
So that was just some raw pseudocode of whole-stage code generation. Here, we see the actual code that was generated for this query. We can see that’s very similar. (indistinct) again, sets up this while loop and the filter does some particular evaluation, skips over this iteration of the loop if the predicate is false and then the project does a bit more to actually output the results.
So, now that we understand a little bit about whole-stage code generation, let’s look at some of the problems in whole-stage code generation.
First, how did we run into these problems? So, our workday, we have a few accounting use cases that really demonstrate the problems of whole-stage code generation. And these use cases, our customers are creating queries that contain key statements, a thousand adventures. One example is customers may map external data to internal representations using news case expressions, or maybe there’s some accounting use case for validating input. An example is filing expenses. You may think that this is very simple from the end user point of view, but the rules that govern how expenses are filed to be very complex. For example, your cost center may depend on a variety of inputs, such as where you bought, where you made the purchase, who is your manager is, why you made the purchase, so on and so forth. In our customers, we see the queries that create these, having expenses can be comprised of case expressions with thousands of when branches. And wondering how the case expression this large, we saw that a generated function would be over a million lines of code.
Now, you may not believe me when I say that case statements can result in code that is this long, but here, we are looking at the code as generated for our case statement with one branch. And we can already see that it’s too long for it to slept. So you can already imagine that one may have thousands of branches. This function will quickly explode.
So what are the problems, when your function size is this like? So there are three main problems. The first problem is that Java limits the method size of any method to 64 kilobytes of byte code. And once you exceed the 64 kilobyte code limit, it will throw an exception. So in our case, the whole-stage code generation would fail with an exception because our method is too large. The second problem is that JIT compilation can be disabled when methods exceed eight kilobytes of byte code. So if you’re able to fit under the 64 kilobytes of byte code on it and avoid the compilation error, you may run into performance issues. Since JIT compilation has turned off. For example, we would have a case statement that has 10 branches, and it would run pretty quickly. But then, we would add another branch to his case statement and the generation method would exceed eight kilobytes. So the JIT compilation would be turned off. Then we would see the performance being up to five times slower. The final problem is that the compiler can throw OOM exceptions for extremely large methods. The reason for this is that, the memory usage does not scale linearly with the size of the method. It’s actually super linear. So, Spark tries to mitigate the first two problems in a variety of ways. The way that Spark tries to limit the method size to 64 kilobytes, there are really two ways it tries to do this. The first way is they say, if an expression, if an operator is is operating on over 100 fields, then we should just avoid whole-stage code generation completely. The second way it tries to solve this problem is, they have another configuration called the huge method limit. And when they see that the JIT compiled method exceeds this limit, they will fall back to the volcano iterator model and do expression code generation.
So, how come the expression code generation can get away with these large functions while whole-stage code generation cannot. So, the main reason is that, in expression code generation, Spark has implemented this optimization where it will split a large function, into many smaller functions. By doing this, whole-stage code generation can always fall back to the volcano iterator model and compile the expression code generation. But by doing this, it has all the benefits of whole-stage code generation. What we did at Workday was we implemented using case expressions in the whole-stage code generation.
So, let’s look at how we did this, how we can split the code generation functions.
So first, let’s look at splitting expression code generation and why it’s so much simpler to do than whole-stage code generation. So once again, let me remind you that in expression code generation, each operator can be thought of as an iterator. The result of every operator will share this common interface of next. And then, all of the expression evaluation in the generated function will only rely on this, on the output of next.
So once again, if we look at the pseudocode, we can see that the predicate of filter only relies on one row.
So, if we look at the actual generated code, we can see this, the eval only takes the internal rep. So that means, if we want it to split this eval function into multiple functions, as long as we pass it into internal row to all the split of functions, we will be able to retrain the rest of the code. For example, if you look at the eval function here, we can see that there is really two distinct things that are happening. First, We’re going to get bent and assign that to a value two, then the value two is greater than one. We will quickly exit. If it’s true, we’ll enter the if statement, and we’ll do the same thing to check the value five is greater than two. So, if we decided that this function was too long and we want to split it into two, an easy boundary would be at this if segment. We can see that both, if we look at the top algebra, everything above if statement, everything below this statement, the only thing they rely on is i, which is internal row. So if we were able to pass that internal row to the express functions, we would be able to retrain the rest of the code.
Now, in whole-stage code generation, it’s not that simple. Like I said, one of the main benefits of a whole-stage code generation is we’re trying to keep the data in a CPU registers for as long as possible. In order to do this, we’re assigning them to variables and then having the parent operators refer to those variables.
So that’s the only comparison to the volcano iterator model, where we know that all the inputs to our function is just going to be an internal road. Now, there are determined amount of variables that we could be referring to. So in whole-stage code generation, we need to figure out what these variables are and pass those to our splitted ropes. So, one thing that we already do in whole-stage generation is before we call it, our parents consume method, we will store the output variables. So, let’s look at how that works for this project filter and we’ll go through.
So once again, we start off by calling the produce method, a local tablescape, and we set up this while loop. Here in this while loop, we realize and to scale, we’re going to output a variable called key. So let’s save that because we may have to refer to it later. Next, we call the consume on our parent, which is the project. And here we’re doing key plus one, is the expression. So, we realized that in the generator code 40 plus one, we’re probably going to have to rely on that output variable from the scape.
And of course we do. And the reason that we knew our variable was key is because we saved it into output variables. So once again, the output variable for this operator is areas field. So, we’re going to save that. Then we’ll call consume on our parent, and this case, it’s a pre data chapter with a case statement and this case, it will also rely on that field. So, now once we generate the code, we will directly refer to that variable. And that’s how we saved a parent’s influx in whole-stage code generation. But now, let’s say we want to split out the case statement logic into its own function. Can we just pass all the output variables as the function parameters? If we did that, maybe it’s just as simple as the volcano iterator model. Well, it turns out there’s a few problems with that approach. The first problem is that Java limits the number of parameters to a function. So if we had blindly task parameters, we’d be likely hit this limit. Instead, we should try and only pass the function parameters that are necessary to the function. The next problem is that some of the output variables will not have their code generated yet. For example, if the third project was actually a filter, they may not want to evaluate all of the operators. It’s a child operator before evaluation. This is because in the filter operator, maybe we’d be able to get away with short-circuiting and avoid evaluating some of the variables. Finally, there are also some operators that rely on rows in addition to variables. So we need to keep those in mind as well.
So, how do we track down the whole-stage code generation inputs that we need? It turns out, once we are in operator, we can look at our expression tree to determine which variables we need. First, I need evaluated variables from the current or child’s expression must be an input to our split function. Next, I need to have rows as referred by the current or child expressions. And finally, I need to have the sub expression must be inputs to discipline function.
So, we tacked this down and implemented it for case statements. And this is what it’ll look like when you do that. So, we can see here, instead of the logic for the case statement, we just call a function and pass in a few parameters instead. By doing this, we’re able to greatly magnify the size of our whole-stage code generated function and call other functions.
When this happens, we’re able to avoid the exceptions due to exceeding 64 kilobytes of byte code, avoid dynamic costs of compiling a huge function. And JIT also will not be turned off since we won’t hit that eight kilobyte of byte code limit. So, let’s look at the performance of our whole-stage code generation in case statements now that we have implemented this splitting logic.
So first, the performance setup. We’ve had one driver and I have 12 gigabytes of memory with one core. Then we have three executors. Each had 120 gigabytes of memory with 28 cores, and our dataset has 50 million input rows.
So, we had a simple project that had one case expression of 3000 branches, and we ran it against three different belts. The first one was whole-stage code generation in default Spark. That’s one. We turned off whole-stage code generation to see the performance in the volcano iterator model. Finally, we ran whole-stage code generation with our custom splitting logic. The reason we don’t see result for just volcano or whole-stage code generation is because they ran out of memory in the compiler and that was clearly felt. So that’s just not a good question. We see, in express code generation, takes about 740 seconds for this. And finally, the whole-stage code generation with our splitting logic only took 430 seconds. So, we only saw two X performance improvement by using whole-stage code generation with splitting. So, to quickly recap what we went through today. We started off with a basic introduction to Spark SQL. Then we talked about two different creative models, the volcano iterator model, and whole-stage code generation.
Next, we looked at some of the problems of whole-stage code generation from the weekend day, and then splitting functions. Splitting code generation functions helps to mitigate these problems. We looked at the differences in splitting functions between the expression code generation and the volcano iterator model and whole-stage code generation. Finally, we looked at the performance of whole-stage code generation after implementing the splitting. Thank you for coming to this talk.
Michael Chen is a Software Development Engineer at Workday, working on the Prism Analytics product. Michael received his Bachelor's in computer science from the University of Michigan