In this talk, I will dive into the stage level scheduling feature added to Apache Spark 3.1. Stage level scheduling extends upon Project Hydrogen by improving big data ETL and AI integration and also enables multiple other use cases. It is beneficial any time the user wants to change container resources between stages in a single Apache Spark application, whether those resources are CPU, Memory or GPUs. One of the most popular use cases is enabling end-to-end scalable Deep Learning and AI to efficiently use GPU resources. In this type of use case, users read from a distributed file system, do data manipulation and filtering to get the data into a format that the Deep Learning algorithm needs for training or inference and then sends the data into a Deep Learning algorithm. Using stage level scheduling combined with accelerator aware scheduling enables users to seamlessly go from ETL to Deep Learning running on the GPU by adjusting the container requirements for different stages in Spark within the same application. This makes writing these applications easier and can help with hardware utilization and costs.
There are other ETL use cases where users want to change CPU and memory resources between stages, for instance there is data skew or perhaps the data size is much larger in certain stages of the application. In this talk, I will go over the feature details, cluster requirements, the API and use cases. I will demo how the stage level scheduling API can be used by Horovod to seamlessly go from data preparation to training using the Tensorflow Keras API using GPUs.
The talk will also touch on other new Apache Spark 3.1 functionality, such as pluggable caching, which can be used to enable faster dataframe access when operating from GPUs.
Tom Graves: Hello everyone. I’m Tom Graves. I work at Nvidia on the Spark team. I’m also a Spark PMC and committer. I’ve been working on Spark for 10 years. I’m going to go over the new stage level scheduling feature in Spark 3.1, and how it can be used to improve big data and AI integration. So I will first go over resource scheduling in Spark as it is today, and then I’m going to go into the details of the stage level scheduling feature, the implementation details, API, and so forth. I’m going to go over a use case example and then get into a demo.
So resource scheduling on Spark today. So currently, the way that you request resources from Spark is that you specify the driver resources, the executor resources, and then the task requirements that you need for each task. A lot of these, you end up just using the defaults on, so you might not realize you’re requesting these, but you are. So on the driver, you can specify cores, memory, and then any additional resources like accelerators, GPUs, FPGAs, and et cetera. On the executors side, you kind of have more options, and it kind of depends on the environment and use case that you’re using. If you’re using PI Spark, for instance, you might be asking for memory for your PI Spark, or if you’re running on Yarn, you have to specify memory overhead, for instance. You can also specify accelerators for executors, and then you also specify your task resource requirements.
And here is where one of the main defaults come in that people might not realize. By default, you get one CPU per task. And so this means that if you use an executor with four cores, you’re going to get a parallelism of four. So four cores with one CPU per task, you end up with four tasks on that executor. In Spark 3.0, we added accelerator aware scheduling, which adds another task requirement. And so in Spark 3.0, it still requires the CPU’s to be limiting resource. This was due to some scheduling stuff, but with Spark 3.1.1, the accelerators can now be the limiting resources. The reason that I talk about this is it affects the way that we implemented the stage level scheduling feature.
So a little bit more concrete example here in the configs that you actually use. So for cores, you can specify the driver cores, executor cores, and so forth. And memory, similarly, these are the configs. You might be using these configs, or there’s also command line options, like if you’re using Spark submit, there’s dash dash executor course, for instance. Either way, you’re doing the same thing when you’re asking for executors of a certain size, for instance. And then here’s an example of the accelerator aware configs that you specify. So if you’re asking for a GPU, for instance, you can specify executive resource GPU amount equals one. And then here, you have to actually specify the task resource requirements for those GPU’s. It doesn’t have a default setting, so you have to specify it. This example here, where it’s 0.25, means that I actually want four tasks to run on one GPU. And then, like I mentioned before, the number of tasks that you get per executor is basically your executive resources divided by the task requirements.
Okay. So stage level scheduling. So what is stage level scheduling and why do I need it? So many applications in Spark have distinct stages where you potentially want different resources per stage. The typical use case here that we’re targeting is the ETL to ML use case. So in your ETL stage, you might be processing a large amount of data, so maybe you want lots of cores and lots of executors to process that quickly. And then once you’ve preprocessed it, you’re going to send it into your ML application. Well, on the ML side, maybe you’re using TensorFlow where you want a GPU and you don’t need as many cores because the GPU is doing the processing. So at that stage, you want to get different containers that have GPS for instance. And so before this feature, Spark didn’t really have any way to do this within a single application, at least. So this feature allows you to specify resource requirements at the stage level to give you that flexibility.
So the stage level scheduling feature, you can find more information on Spark 27495. The resource requirements are specified at the RDD level. We did the RDD level because it’s more straightforward, it’s a little easier to understand for the user, and the ETL to ML use case kind of hits this. Usually you do your ETL and then you’re going to change to do a map partitions and send your data to TensorFlow. So the RDD operation there makes sense. The data frame side of things is a bit more complex because of the optimizer and the way it can actually kind of hide stages from users. It’s a little bit less obvious to the user where to apply these. It also requires dynamic allocation, and Spark will handle basically giving you new containers that meet your requirements whenever that RDD is processed, and it handles assigning tasks appropriately.
So the benefits of this are hardware utilization and cost. So again, if the only way to do this currently without this feature is you allocate the resources upfront. So if I need a GPU for my ML side, I allocate it for the entire job. So if I had a long ETL job, I might be wasting those ETL resources, or sorry, the GPU resources during that whole ETL phase. And the other way you can do this is split your application in two. So one application does the ETL side and I don’t request a GPU. And then the other application does the ML side, where I request a GPU. That kind of complicates things and makes your pipeline less obvious. So with this feature, you don’t have to do that splitting.
Use cases. So really it’s beneficial anytime that you want to change your container resources between stages. Some examples of that are, like I said, the ETL to ML use case. If you have large data in certain stages, or you have skewed data, perhaps you want to get more memory for your container during those stages so you’re not spilling the disk and you get better performance. Another use case is if you’re doing caching. So perhaps you’re doing a bunch of ETL upfront, and then later, you’re going to cache the data and process that data over and over again, right? Whether it’s ML or graph processing. So when you cache the data, you may want to get executors with larger memory so that you can fit all that in to cache.
The resources supported with stage level scheduling. So on the executor side, it handles all the cores, all feed memory, heap memory, Pyspark memory and memory overhead, and then any additional resources like GPUs or FPGUs, and then the task requirements again are the CPUs and then any additional resources like GPUs.
Requirements. So to use this, you need to use Spark 3.1.1 or higher. It requires dynamic allocation and external shuffle, or shuffle tracking feature. This is so that this is the way it gets new containers, basically, that match the resource profile that you’ve requested. It’s currently supported on Yarn and Kubernetes. It’s only supported with the RDD API, like I mentioned, and it has Scala, Java and Python interfaces.
So some implementation details. So like I mentioned, it’s going to get a new container for every new resource profile. It will not try to fit other resource profiles into containers that don’t match it. So let’s say I create one resource profile with the requirements for one of my RDDs, and let’s say it has two cores and two GPUs. Then I create another profile that has one core and one GPU. It’s going to get brand new containers for that second resource profile. It’s not going to try to fit that into the first resource profiles containers. And that was done just for simplicity, and so that we’re not wasting resources. This is something that we want to look at implementing in the future. Another detail is beyond use containers, so say go to a new stage and you use a different resource profile, that’s going to get new containers. The containers from the previous stage, we’ll just idle time out with the dynamic allocation rules. We don’t explicitly kill them.
Currently if you’re doing a join and you’re combining two RDDs together, we only support one of those RDDs having a resource profile associated with it. It will actually throw an exception if there’s two. There is a config that will allow this so that you can put a different resource profile on each RDD. And what we end up doing is we just take the max of each resources and those resource profiles and combine them to make a new resource profile. And so if you want that functionality, you can turn it on, but if it’s something you’re not expecting, then it will throw by default.
On the Yarn side, we use the external shuffle service and dynamic allocation. Another implementation detail here is Yarn doesn’t let you request containers with different resources of the same priority. And so the way that we handle this is we use the research profile ID to be the priority on Yarn. Lower numbers are higher priority. So that generally means resource profiles you’ve created earlier, kind of have a higher priority. Normally within a single application, this isn’t an issue, but if you’re running a job server type scenario, you may want to keep this in mind, because it might affect how Yarn hands out containers. On Yarn, GPU and FPG are predefined. The other resources you have to specify additional configs, the Spark.Yarn.Executor.Resource, for instance, and those other additional resources you specify do not propagate to other resource profiles. So the reason is because we don’t have any way to override them in the resource profile and stage level scheduling APIs right now. Also with Yarn, the discovery script must be accessible and sent with your job.
On the Kubernetes side, it requires the shuffle tracking feature to be enabled. So the dynamic allocation shuffle tracking enabled config, and this is in order to use the dynamic allocation basically. One thing to keep in mind here is that since it’s shuffle tracking, if your executors from an older resource profile have shuffle data on them, they’re not going to idle timeout. So this could mean that you’re using more resources than you expect. You can kind of control that with the config here, the dynamic allocation shuffle tracking time out, but you want to be a little careful because you don’t want a timeout executors that have shuffled data that you’re actively using, because then you’ll just have to reprocess it. Another implementation detail here is the pod templates in Kubernetes. So you can specify resources in your pod template as well, and that will only be used with your default profile. Those will not go on and be carried to other resource profiles. And again, that’s similarly, there’s no way to override them, so we don’t carry them along, and you’d have to specify all resources you need in the resource profile you’re creating.
So here’s how you can find information if you’re using stage level scheduling. So here’s the executors page on the UI for Spark. So here, I’ve created executors with two cores, one GPU, and allow two tasks per GPU. You’ll see here, there’s a “show additional metrics” twisty, and you can select resources and resource profile ID. And then you can see what resource profile is associated with each of your executors. And then you can also go to the environment page, and here it’ll show you details of what resources and requirements are associated with that resource profile. So here we have profile ID zero, which is actually just the default profile. So you’re always going to have this one, and it just matches the configs that you specified when you launched your application. And so you’ll see the executive requirements and the task requirements.
So on the API side, so this is the skull interface. So we import our executive resource requests, our resource profile builder and our task resource requests. The resource profile builder is the main thing that you’re going to start with, and that allows you to build up the executor and task requirements you want for the RDDs, for instance. And this is a mutable thing, so you can change it multiple times if you need to. So in this example, I created a new resource profile builder. I created a new executive resource request and a new task resource request. Then here, I’m specifying the executor requirements. So I’m saying I want each executor to have four cores, six gigs of memory, and two gigs of overhead, as well as two GPUs each. And then I’m saying each task requires four CPUs and two GPUs. So basically I can run one task on each of these executors.
So then I say the resource profile builder requires the executor requirements and then the task requests, and then I build it. So once I’ve built it, I get a resource profile out, and that resource profile can be associated with the RDD using the width of resources API. And at that point, Spark will know that whenever this RDD is executed, I need to get containers that match that resource profile. So in this example here, I’m doing an ML map partitions over the data where I’d feed that data to ML and get the results. So at that point, when the collect is executed is when Spark would go off and get new containers if it needs them.
So here’s a screenshot after I’ve used the resource profile ID. So if you look at the resource profile ID column there, you’ll see that it got two new executors with resource profile ID one. It has four cores and two GPUs. So it matches what I requested. And then again, if you look at the environments page, you’ll see the mapping of the resource profile ID one to the actual resources that I requested. So some further API. So if you just want to see what your builder has or what’s associated with your RDD, there are APIs to do that, right? This top one’s the resource profile builder, and it’s showing you the executor and task resources associated with it. And then I had this ML RDD, or you can use “get resource profile,” and we’ll show you any profile associated with an RDD. So in this case, this is profile ID one, and then it has all the associated resources.
So like I kind of mentioned before, the builder and executor and task resource requests are all mutable. So you can change them as many times as you want. You might start out with one profile and you just want to tweak something. You can use the same builder for that. But as soon as you say, “build on that,” that resource profile you get back is immutable. So you can’t change it. We don’t want you to associate that with an RDD and then have it change and weird things happen. So when you say “build,” it creates a concrete resource profile, you get a unique ID for that. And then whatever executor requirements and task requirements were associated with it. In this example here, we’re asking for two cores per executor, six gigs of memory, two gigs of overhead, and two GPs each, and we’re saying each task needs one core and one GPU. So I tell it to require that. I build it.
And then you’ll see here, we get a resource profile ID too, and then the corresponding resources. So I have two cores per executor. And if you look at the task, it’s one CPU and one GPU. I can then just change that task resource requirements. So here I just say, “Okay, well now I want two CPUs and two GPUs.” I tell the builder I require it and build it. And then I get a new profile with ID three, and you’ll see the task resources have been updated here and it kept existing executive resources.
Okay. A use case example in end to end pipeline. I’m going to give you an overview, a couple of components I’m using in this first, and then I’ll go into detail and give a demo of this. So the ETL, using rapids accelerated for Spark. So on the ETL side, I’m going to give you a brief introduction to something else at Nvidia we’ve been working on, which is the rapids accelerator for Spark. So the rapids accelerator for Spark allows you to run Spark on a GPU to accelerate the processing. So underneath, this uses a library called rapids cuDF, which is CUDA data frame library. This library implements all your ETL operations, like your joins and your ash aggregates and filters and so forth. And so we combine that with Spark to accelerate processing.
So this is built on the SQL and data frame APIs, and the idea is that it’s seamless for the users. So we don’t want the users to have to change their code. They can specify a config to turn this on and send the necessary jars. And then we’ll just run whatever we support on the GPU to accelerate the processing, and if we don’t support it, it just falls back to the CPU like it would normally run. So this does require a Spark 3.0 and newer. And it also has a built-in UCX shuffle component, which can be used to accelerate shuffle.
So the ETL stack for this, so if we’re looking at the right side here, we have Spark DataFrames, Scala, and Pyspark. It’s built upon Java. And then, so we wrote some JNI bindings to let us call into the cuDF C plus plus library, which is the thing that actually uses CUDA libraries to do the GPU operations. Little bit more detail here. So here we have the Apache Spark core, and then we have our plugin. So this plugs into Spark, and if you’re using the SQL or data frame API, we look at the operation you’re doing and say, “Is this a GPU enabled operation?” If so, we can use the JNI layer to call up to rapids. And if not, it will just run on the CPU like normal. On the right hand side here, we just have the Spark shuffle component. Again, it has a JNI bindings layer that would call into the UCX libraries.
A little bit more detail on the compilation flow. So if you have your queries up top here, your data frame or SQL API, you’re going to get a data frame out. And the way Spark internally handles this is it ends up creating some plans and optimizing those plans. So you start out with a logical plan. It does some optimizations on that, and then you can get a physical plan out. The physical plan is what you’re actually going to execute. And this is where our plugin plugs in at. And we look at those physical operators and say, “Is this something we can do on the GPU? If so, we can replace it with the corresponding GPU operation.” And then we also handle any transitions. So the GPU side, we’re dealing with columnar data. So you get an RDD of columnar batch back out. But if you’re going to send that back to the CPU, for instance, we convert that back to an internal road for you.
So this is an example of some of the performance we’re seeing. This is an adaptation of an industry standard benchmark derived from PPCDS. So this is query 38. In this case, the entire query is GPU accelerated, so everything’s running on the GPU. This is running on AWS, so we’re comparing a eight node CPU cluster here to an eight node GPU cluster. And you can see our query time is roughly three X improved with the accelerator. So it was 163 seconds on the CPU, and 53 seconds on the GPU. We also save cost here. So the CPU side was about 20 cents. And with the GPU and the runtime, it was only 9 cents. So we’re seeing some pretty good benefits from the plugin.
So on the deep learning side, I’m going to give a brief introduction to Horovod, because I’m using that night demo. You can find more information about this at the talk from last summit, we gave one. So Horovod is a distributed, deep learning training framework. You can use it actually to run TensorFlow, Keras, PI torch, or Apache MXNet. And once you have it set up, it’s fairly easy to switch between those. It also has some high-performance features like Nickel, GPU direct, RDMA, and TensorFusion. It’s pretty easy to use just a few lines of Python. It’s open source and it’s easy to install. So I’ll be using this in the demo for the deep learning side example.
I’m going to give a demo of the [inaudible] scheduling API in action. So this is the Horovod, Keras, Spark, Rossman, Estimator example. You can find this in the Horovod repo, but this is a typical use case where you’re reading from file. You’re going to do some pre-processing on the data before you send it into TensorFlow to do some training. So in this demo, we’re running on data proc. I’m running on a Hadoop, 3.2 cluster using Spark 3.1.1, and I’m using Jupiter Notebook here. So these parameters here is what I started the Jupiter Notebook and Spark with. So for each executor, we’re requesting 24 gigs of heap, four gigs of overhead memory, six cores, and one GPU. And then it sets up the task requirements so that it can run six tests per executor. The reason that I requested a GPU here in the ETL phase is because we’re using the rapids accelerator for Spark. So that’ll allow any ETL operations we support to run on the GPU to help accelerate the ETL side.
If you weren’t using the rapids accelerator for Spark, you wouldn’t normally request a GPU here, and wait and use the stage level scheduling API later to request that GPU so that you’re not wasting it. So if we go over and look at the Spark UI, this is the executor page, and you can see that it’s gotten executors with the default kind of parameters that I requested. So it’s got six cores, one GPU. The resource profile ID is zero here, and that’s just the default profile you get, whenever you start a Spark and it just uses the configs.
So going back to our example, we see that this starts the data preparation. We set up some configs for the demo. This is reading from CSV file, and then it’s going to do some data frame manipulation. So it’s going to do your typical joins and filters and adding columns and so forth. If we go back to the Spark UI and look at the SQL page, we can actually see it using the rapids accelerator as well. So normally in this plan, these would all be scans and filters and so forth. The things with GPU in front of it, so GPU filter, are things that were accelerated by the GPU.
So continuing on with our example, this is setting up both a train data frame and a test data frame. If we scroll down to where we actually start the training, that’s where the stage level scheduling API comes in. So here we’re going to do the training. So we store the data, we set up the Spark backend, set up the Horovod Keras estimator. And within the Horovod code, I’ve modified it to use the stage level scheduling API. So this is the code, and here, we created a resource profile builder, and then I add executor and task resource requirements. So for each executor, I’m going to request one core, 10 gigs of heat memory, two gigs of overhead, and one GPU. And then each task, we’ll require one CPU and one GPU. So basically it allows one task to run on each of these executors.
We build that, and then we’re going to pass that into the RDD with resources API to associate that profile with that RDD. This tells Spark that whenever that RDD is executed, it needs to go and get new executors that match that resource profile. And so basically when this Keras estimator fit is called, Spark will go off, use dynamic allocation, and get new executors.
So if we go over to Spark UI and refresh it, we will see that here, it’s got new executors. The resource profile ID on those executors is one. And we can see that, yeah, each of these executor has one core and one GPU each. And here it’s actually finished the training already. So if we go back to the page, we should see that in the output, and we can kind of see here that it did run on the GPU and it used CUDA 11. If we scroll to the bottom, we can see that the model’s generated and it’s doing some final prediction. So this is a typical use case that we’ve targeted stage level scheduling for. Here, your ETL phase might use quite different resources than your training phase, and so this allows you to switch those and saves resources between those stages. So one stage might use a lot of CPUs for instance, and your training stage might use a lot of GPUs, or you might just be requesting different amounts of memory, depending on what you’re doing.
So on future enhancements, we want to look at four stage level scheduling. So main one is actually, we want to collect feedback from users. So we love people to go use it and tell us what they think and what they are wanting from it. And then there are also other things that we have known, like specifying certain configs and the resource profile. So there’s the dynamic allocation configs like the max and mins. That’s a global setting right now, and we want to be able to change that on different stages. We also want to look at the fitting of new resource profiles into existing containers, because there’s definitely some use cases that would be useful. We want to do some cleanup. And then theoretically, catalysts internally could use this. So if that optimizer realizes that it needs a container of a certain size, theoretically, it could use this internally. That would be quite a ways down the road then.
So other performance enhancements in Spark 3.1. So we made the caching format pluggable, so I’ve been wanting this for a while, but we’re also using this specifically on the GPU side. So the format currently in Spark is columnar. It’s fairly costly to write, but then it’s very efficient to read. So if you want to try playing with that format to make other things efficient, you can now do that, and it’s pluggable. Like I said, we’re working on a GPU specific format that will be efficient when you’re doing GPU operations. So any questions, please feel free to ask them. I’ll be online for a while. And thank you for coming. Please give your feedback and rate and review the session.
Thomas Graves is a distributed systems software engineer at NVIDIA, where he concentrates on accelerating Spark. He is a committer and PMC on Apache Spark and Apache Hadoop. Previously worked for Yaho...