Improving Apache Spark for Dynamic Allocation and Spot Instances

May 28, 2021 11:05 AM (PT)

This presentation will explore the new work in Spark 3.1 adding the concept of graceful decommissioning and how we can use this to improve Spark’s performance in both dynamic allocation and spot/preemptable instances. Together we’ll explore how Spark’s dynamic allocation has evolved over time, and why the different changes have been needed. We’ll also look at the multi-company collaboration that resulted in being able to deliver this feature and I’ll end with encouraging pointers on how to get more involved in Spark’s development.

In this session watch:
Holden Karau, Software Engineer, Apple



Holden Karau: Hey, everyone. I’m really excited that you’re here for Improving Spark for Dynamic Allocation and Spot Instances. I’m Holden Karau. You can follow me on Twitter, it’s just my full name. As I said, my name is Holden Karau. My preferred pronouns are she or her, it’s tattooed on my wrist. I’d show you, but it turns out that I also broke this wrist. It’s kind of a pain to show. I’m on the Spark PMC, which is really awesome. I’m really honored to be a part of the Spark community in this way. And they also contribute to a bunch of other Spark projects.
I’m a co-author of a few books about Spark and some other books as well. I, of course, think these books are wonderful, but I might be a little bit biased. In addition to writing books, I also do a bunch of YouTube videos, mostly around Spark, but I also do some other tools like DAS, and stuff like that, and some Kubernetes stuff. And so if you’re interested in seeing those videos, definitely check out my YouTube.
In addition to the professional-related stuff, I’m trans, queer, Canadian, in America on a green card. Finally, it took a while. And part of the broader leather community. You might be wondering why would I bother bringing this up in a data talk? And it’s because I think it’s really important for us to talk about where we’re all from, because we perhaps have some opportunities to improve the diversity of our communities. And part of that is going to involve talking about where we’re from, seeing what sort of people aren’t being represented in our conferences, or in our PMCs or committer groups. And so I think it’s really important for us to all talk about our backgrounds, and make sure that we create a diverse community so that we can solve problems that are fun and make the world a better place, which is presumably why some of you are here.
With that slight segue, I want to start at the beginning of why this is a problem, and why I decided to work on this. Spark is really wonderful and it achieves resilience by recomputing anything when we have failures, right? This is compared to the classic model, which Spark replaced MapReduce, where we achieved resilience by writing data out multiple times. And this is really great, right? Because writing data to disk three times is slow, compared to just storing it in memory. And if anything goes wrong, you just recompute it.
The problem is when executors are removed that contained data. And in traditional MapReduce world, that wasn’t a thing that mattered because there was no data inside of the executors that mattered. Everything was stored in that persistent storage layer. So we could remove executors all we wanted, either for dynamic allocation or if we were running on spot instances, it was fine.
This is different with Spark and we solved it for the cluster managers that we used to use or, sorry, still use today, are YARN and Mesos. And many, many moons ago, I drank a lot of coffee and I came up with an alternative idea. But it turned out that we didn’t really need this alternative idea that I came up with to solve this problem. It didn’t really go anywhere despite all of the delicious, delicious coffee that I drank. It just kind of landed flat on its face. And I closed the Google Doc and forgot about it. And you might be going like, “Holden, this as a very long story”, but I assure you we’ll get to the code soon enough.
Few years after I’ve given up on these changes, the Cloud became really very popular, and then Kubernetes also became really popular. And then everything caught on fire. Mostly for that inside of Spark, what I mean by everything caught on fire, is that our traditional model of achieving resilience while it was really, really fast, had some substantial downsides. We found ourselves spending a lot of time recomputing data. That’s not a pleasant fire.
Thankfully, I had a lot more coffee again, and I remembered that I’m very lazy. So whenever I’m faced with a problem, the first thing that I do is go, “Hey, I bet someone else has had this problem before. And maybe they’ve solved it.” It turns out that I’d had this problem before and I thought about it. I found the search results in my own search history. And I was like, “Oh, okay, cool. I guess I’m going to work on this thing.”
I started writing some code and then I got hit by a car. I want to say, getting hit by a car is not a lot of fun. Do not recommend. I broke both of my wrists and spent a week at SF General, and some more time in a nursing home. Really, really nice people there. I got to say, but still, not a pleasant life experience. The really nice thing though, is people kept writing code even though I was gone. So, that was cool.
And then, we ended up having a vote to see if this was the kind of solution that we wanted to have. If everyone agreed this was what we wanted to do? And thankfully, everyone agreed. Then we wrote a lot more code. And then, in theory, everyone lived happily ever after. But in actual fact, there’s a lot more work to do and I would really, really love your help.
With that being said, for a little bit more context, how did we do dynamic allocation on YARN? Scaling up with a model like Spark is relatively easy, right? Our problem only occurs when nodes are going. Whereas, when executors are going away, adding more resources is A-OK. It’s very much like budgets, adding more money to a problem, tsk-hmmm [affirmative], taking the money away…[negative] very sad. So scale down in YARN, how we handled this is we had a stay resident program on each node in the YARN cluster that would serve the files left behind by any executor that may have been exited or killed.
This worked out really well because Spark stored its shuffled data as files. It did of course lose data that was stored only in memory. Some types of data was cached in memory and not actually written out to disk. And so, that was still lost, but the truffle files tended to be the most expensive parts. This was a really good solution, and it didn’t require huge architectural changes. And so, that’s how we did it on YARN. The problem though, is if we deployed YARN on top of the Cloud. Ooo, [affirmative] spot and preemptible instances give you really big cost savings, like around 50%. The exact details, of course, change depending on your vendor, talk to your vendor.
But, if we think back to the solution that we had for this problem in YARN, of leading a stay resident program, we can’t leave a stay resident program if our entire computer is going away. Right? The Cloud had some strong limitations with this. We needed to find another way to make sure that data was still available and we didn’t have to recompute it.
Okay. Let’s say that makes sense for the Cloud, but what about Kubernetes? Surely, if we were deploying Kubernetes on Prem, we wouldn’t have to worry about this. And it turns out that even if we’re deploying Kubernetes on Prem and we’re not using spot instances, we do have to worry about this. That’s because Kubernetes isn’t super down with the idea of a stay resident program on every node that serves files. And it turns out that in the interim, for a combination of reasons; including general security concerns and quota management, people were much less excited about the idea of setting up shared disks that span between jobs and users. That also limited the ability to essentially repurpose the YARN solution to Kubernetes. For one thing, didn’t support it very well architecturally and for another thing, the security people that weren’t super excited with it anymore.
And just for a little bit of context here, because I got hit by a car while I was riding a scooter. And actually this is me being very happy and excited in front of a bunch of scooters. I still love scooters, but…and these scooters were at CubeCon and…somewhere, Italy, I think? A lot of fun. I really look forward to the day that Data + AI Summit, it comes back to in-person events and I can actually see all of you in person. This is nice, but I miss catching up with you all in person. And getting a chance to ride scooters in foreign countries.
Okay. That’s not particularly related, but close enough. How did we solve this problem? We copied the data, and we’re going to look at the code very, very shortly. I promise.
But the first thing that we’re going to do is we’re going to look at this JIRA issue right here. I said, we’re going to get to the code really soon. And I promise we are, but I want to take a quick, just a super, super brief detour to the JIRA issue. That has a lot of the code inside of it. That’s because the solution to all of this problem is copying the data. Copying the data sounds like a relatively simple thing to do, right? Especially if you weren’t super familiar with the internals of Spark, you might be like, “Oh yeah, that makes sense. Not just CP, that’s all I have to do.” But it turns out that copying the data is actually…pretty complicated. And we can see that the simple task of copying the data was broken up into these 23 sub tasks. Not all of which are done. Enough of which are done that it works, but it’s pretty involved.
This thing does not particularly like it when I try and do this, but we’re going to hope, fingers crossed, that Emacs isn’t going to crash on us. And that’s just because Emacs on full screen sometimes misbehaves on me.
Most of the magic happens inside of this class called Block Manager D Commissioner. As we see from the description, it’s a class to handle block manage or decommissioning and the retries. And it creates multiple threads to offload both RDD cache and shuffled blocks. As we talked about, the YARN solution was really focused on shuffle blocks because the shuffle blocks are the most expensive part to handle.
I’m going to zoom in a little bit, just because I know that reading text can be a little bit hard.
The main magic here is all inside of this thing called Shuffled Migration Runnable. What this does, is we fire up a bunch of these, a one for each potential block manager that we could migrate our data to. If we know that an executor is going to be exiting soon, for whatever reason; be it dynamic allocation, be it of a spot instance is going to be preempted. We get some notice that essentially, we’re going to die soon.
Like our code, not us.
We try and get as much of the data off that machine as quickly as we can. Here, we have a producer consumer model with some limited retries and that it does back off. If there’s an IOException. For example, because we might get IOExceptions if the other host we were trying to migrate to was also shut down. Or, if the host we were trying to migrate to run out of disc space, and in that case retrying, it isn’t going to be effective. And, we just want to put the block back on the queue really quickly for another potential candidate block manager to pick up and try and get it.
We can see here that the blocks are actually comprised of sub blocks. That’s because this only supports the index shuffle blocks right now. There’s both an index and the underlying block. And so, we migrate all of the blocks. There’s also, and this is really important, this fallback storage option. That’s very important because if we only migrated data between different executors, we couldn’t scale down all the way to zero or even one. We would have to be able to make sure that the minimum number of nodes we were scaling down to still had enough disc space to store all of our data.
And if we’re doing big data, which is probably why we’re using Spark, that might be quite a number of machines. We might actually have a situation where, especially for a notebook user, that’s just really not what we need. And so here, this fallback storage allows us to go a little bit more back to the old MapReduce style model. What we do is we copy the data out onto HDFS or S3 or some other kind of persistent store, where it’s not tied to the compute in the same way. We can scale down in our compute much more than the initial design that I’d come up with five years ago. Shout out to [Dong Jung] for coming up with this design. With the fallback storage as a copy.
There’s a bunch of little fiddly bits. Just as a shout out, I got a lot of these little fiddly bits wrong initially. And I think this is one of the magics of Open Source is there were so many code reviews, and we’ll actually look at some of the code reviews in a little bit, that we managed to fix all of these little tiny things that I’ve gotten wrong. But that looked okay in testing, but there were actually many logic bugs in the first implementation of this. Where under certain circumstances, we wouldn’t actually migrate all the files, just most of them. And so, that’s really lovely. Shout out to all of the people who do code reviews in Open Source. Thank you for taking the time to do code reviews. You are amazing people.
But we can also see, like in addition to migrating shuffle blocks, we migrate RDD cache or persist blocks. We don’t follow the same model here as we do when we’re migrating shuffled blocks. And that’s because the overhead of recomputing these blocks tends to be not as high. It’s not as worth it to shotgun approach, copy data out to as many potential peers as possible. So in this case, we just really copy one block at a time with the RDD blocks. That’s pretty rocking.
This is the thread pool that we use for our shuffle migrations. We can see the producer side of the producer consumer is inside of this thing called Refresh Offloading Shuffle Blocks. And that’s for if shuffled blocks are added while we’re in the process of exiting. We do have some logic that tries to limit the number of times that would happen, but it might happen if there was an existing task running on an executor. That’s where the producer side is. And then, we have this stop function. And that just says like, “Hey, we’re actually done trying to migrate, let’s give up.”
This code is also, I had a logic bug here, got caught in code review. Thanks. Or was it caught in code review? It got caught by someone else, at some point. It might’ve been code review, it might’ve been a follow-up PR. But I essentially had wired some of the stuff in backwards. It was succeeding, but it was only succeeding because we were able to migrate things relatively quickly. Anyways, it’s been fixed by now, but, it’s a good thing to [inaudible]. Like, if I just been working on this by myself, there would be all kinds of bugs here.
As it stands, there’s still probably…some bugs, but much less. This is the part where, what we do is we keep track of, have we migrated what we think of as all of the shuffle files? We don’t just exit at the point where we believe that we’ve migrated all of the shuffle files, because the block manager doesn’t know if there are any running tasks on the executor that the block manager is associated with. What we do is we just update a timestamp that says, “As of this time, all of the blocks that had hit disk or hit memory have been successfully migrated.” And then we have another piece of code somewhere else, which checks that and says, “Hey, when was the last time I had a running task?” And, if we’ve got a situation where the last running task was less than the time that we migrated everything, then it’s safe to exit. But if we have a currently running task or a running task finished after the last successful migration completed, then we continue to wait until the migration refreshes itself again, and finally migrates everything.
I really liked this code. I think it’s really cool, I hope you do too. If you found any bugs in it during this conference talk, I would love PR fixing those bugs. Or if you find any bugs when you’re using this code, like please PRS are really, really welcomed. I would really super love them.
Okay. On that, we are going to go and I’m going to talk a little bit more about the collaboration part. I’m going to jump back to my slides just super, super quickly. I really want to talk about the collaborative aspects some more, in addition to looking at the code and calling out some of the places where I remember the mistakes I’d made. I think one of the things that’s really awesome is we have this, okay…Oh, and we got some ads, but we can see here that I brought up this proposal, it’s called the Decommissioning SPIP. We can see the design document that exists.
We can see comments from different people. We can also see…this is the full design document. We can see that there’s a lot of comments, from a lot of different people here. This is really wonderful. I think that these comments had a lot of really great suggestions, and really improved the quality of the eventual code and as a sort of some of the magic of Open Source. It’s part of why I really love working on Open Source. I get to work with all kinds of smart people and it’s really wonderful. And here, Devesh made it a PR from one of the parts. Wonderful, different company. But, made a PR for this work here too. I think that’s pretty cool. Another thing that I want to do really quickly is,
I really want to also just take a super brief look at…all of the different PR is around decommissioning. And some of these are less related, but we can see that there are from a variety of people, at a variety of companies, right? Like better metrics for tracking dynamic allocation. We’ve got things from Databricks. We’ve got things from Cloudera. We’ve got things from Apple. We have things from all kinds of different community participants. I think that’s really awesome. I don’t know, maybe I’m kind of a sap, but I really think the magic of Open Sources is…We don’t talk about it often enough. I like it a lot when features like this are implemented, not just from one company, but from all kinds of different companies collaborating. I think those are the times when we make some of the best features in Spark.
Now, I am of course bias because this is a feature that I was involved with. So [inaudible] I might think it’s the best and you might go, “Holden, no, this isn’t the best.” That’s pretty rocking.
Back to things that are less fun. Getting hit by a car sucks a lot. Here we can see me walking around SF General, with my very wonderful brands, actually from my friend who came down, and helped braid my hair. Because I’d broken both of my wrists and I really couldn’t do anything. I just want to take this time to thank everyone; from my wife, my girlfriend, partners, my friends, my parents, the hospital staff, the nursing home, the physical therapist, the occupational therapist, the ambulance people, my employer, and also everyone on the Spark community for understanding. I sent out an email that was like, “Hey, what’s up? I got hit by a car. I’m really sorry.”
Well, actually I think I had my wife send the email for me because I couldn’t type. But, everyone was really understanding of the fact that you’re working on a thing, but at the end of the day you got to look after yourself. And I think that’s something that I want to remind other people. It’s been a rough year for everyone. Don’t feel bad about taking time to look after yourself. I love Open Source. This stuff is really cool. We can do some really awesome things, but don’t push yourself, if you’re burnt out, take some time away. If you’re physically injured, it’s okay to not be as productive as you might always be. Shout out to everyone for understanding that.
And, we’re going to go back on a happy note. You can try this stuff out now. And this is so cool. If you run Spark, either on Kubernetes or in the Cloud, even if you run Spark on YARN, you can turn these features on. Although to be fair, it doesn’t really give you as much as with Spark on YARN. But I would love it if you were to try these features, they are very new and very experimental. But you can turn them on today and you can start taking advantage of some of the benefits.
You’ll turn on these, all of the ones where it says enable, you want to set it to true. Then you can optionally configure a backend for the external storage level. If you’ve got S3 or something like that, where it’s got TTLs to automatically clean up the data. I think really the best option for the backends. I would love you all’s feedback and bug reports here. I know we wouldn’t have gotten it perfect. I know that there’s going to be mistakes. So, please use this, tell me what’s wrong with it. And we’ll try and make it better. If you want to help us make it better, there’s some things that we know we need to do, to make this stuff better.
One of these things is that right now, data migration, we just migrate everything. I think there are some basic heuristics inside of Spark around the importance of data. They’re not, ehh [negative] great. But I think that we could have really, much better performance if we migrated data that was more likely to be used. If you’re the kind of person who likes writing heuristics, please write some to help us decide which data to migrate first.
Another one is if you’re like cloud-native or a container person, I really think that something that we could do that would have a really big impact for the Kubernetes users, is improving the container preemption selection. This comes from the idea of, yes, we can set static priorities. We can set the driver at a really, less likely to be preempted priority and the executors that are higher level of likelihood to be preempted. But not all of the executors are necessarily equal all of the time. I think improving how we tell the cluster manager, not the Spark cluster manager but the Kubernetes cluster manager, which containers are least impactful to us to lose, could be really beneficial. Another one that I noticed is we could really use better heuristics around when to scale up and down containers.
This is not specific to this work. You don’t have to be a cloud-native or a…Kubernetes user, if you’re on YARN. For example, I think better heuristics for scaling up and down containers is a really good place for people running on those systems to contribute. Because that will benefit people running on all cluster managers. Because I think right now our heuristics there are not perfect either. And we have a lot of room for growth. In fact, one of my coworkers did some testing, and we have some configuration knobs that are tuneable and it turns out that you get vastly, vastly different performance based on what you set those numbers to. I think we could probably, even for a non-code change, writing a blog post about to properly set this stuff, could be an excellent place for someone to contribute. Who doesn’t want to write code, but is more interested in like writing a blog post.
With that being said, I’d really appreciate it if you could review this talk. I would love to come back to the next in-person Spark Summit. One of the ways that you can help make that happen is tell people that you like my talk. If you didn’t, of course there is no need to review this talk. Or, of course, you can. You can also reach out to me with feedback about ways I can improve. I actually do read that stuff, because I do, I like giving talks. And if there’s things that I can be doing better, definitely give me a shout. Big shout out to my employer for giving me the time to work on, not only this talk, but all of the Spark stuff. They are hiring…? And I think there’s a bunch of other wonderful talks from my coworkers as well. If you’re interested in learning more…feel free to reach out to one of us about what it’s like to work at Apple on Spark. Thank you all for your time. Have a lovely day.

Holden Karau

Holden Karau

Holden is a transgender Canadian open source developer with a focus on Apache Spark, Airflow, Kubeflow, and related "big data" tools. She is the co-author of Learning Spark, High Performance Spark, an...
Read more