Flash for Apache Spark Shuffle with Cosco

Download Slides

Cosco is an efficient and reliable shuffle-as-a-service that powers Spark jobs at Facebook warehouse scale. Cosco is built on in-memory aggregation across a shared pool of distributed memory and provides much more efficient disk usage than Spark’s built-in shuffle. In this talk, we present how adding a little flash to Cosco goes a long way in further improving shuffle efficiency: Flash decreases memory usage, and larger write-ahead (aggregation) buffers further help decrease disk IO. We also demonstrate, via careful experiments and analysis, that dynamically leveraging both memory and flash protects flash endurance even for write-once/read-once workloads like shuffle. Finally, the long time-scale at which flash’s endurance bottleneck applies allows it to gracefully absorb short-term spikes in workload. We discuss how flash fits into Cosco’s architecture and deployment model at Facebook, learnings from deploying at scale in production, and potential future work. We first presented Cosco at Spark+AI Summit 2019.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Hi, I’m Aaron, I work on Cosco, which is a service that is basically part of the Spark engine at Facebook.

Flash for Spark Shuffle with Cosco

Specifically, it handles shuffle, which is the transfer of data between Spark stages. I’ll talk about how we can use flash hardware to make Cosco more efficient.

I’ll start with some motivation, then I’ll go over the basics of shuffle architecture. I’ll describe the basic idea of using flash to improve efficiency. I’ll go into some more advanced techniques. I’ll talk about future improvements that we can make. And, I’ll talk about some testing and analysis techniques.

Why should you care?

So, some motivation. The reason why we use Cosco at Facebook is because it dramatically improves IO efficiency. So, you can think of disk service time as the amount of time that spinning disks spend to serve shuffle data and this metric goes down about 3X with Cosco. That said, Cosco does use some compute resources, specifically, it consumes memory. By using flash, we can consume a lot less memory with a relatively small amount of flash. So, two reasons to care are improving IO efficiency and improving compute efficiency. We’re less focused on query latency because we mostly use Spark for batch workloads. And, I’ll also talk about some techniques for development and analysis, which are generally useful, even outside of Cosco.

So, some basic shuffle architecture.

Spark Shuffle Recap

I’ll call a Spark task a mapper when it is sending map output data, shuffle data. And, I’ll call a Spark task a reducer when it’s reading shuffle data. So, in plain Spark, without Cosco, mappers write their output data to local disk grouped by a reducer partition. So, one partition for each reducer.

And then, reducers read this data from the local disk of the mappers.

If needed, reducers sort this data before proceeding with their application logic.

So, there are kind of two problems here in terms of IO efficiency. One of them is write amplification.

In practice, we observe a write amplification of about 3X, which means that for every byte that mappers logically try to shuffle to reducers, we actually end up writing three bytes to local disk and there are two reasons for this. One is on the mapper’s side. If a mapper has so much data that it doesn’t fit in the mapper’s memory, it needs to write to disk, and then produce some more data, write that to disk, and then read it back in order to group by partition to have its final output file.

So, I said that’s called mapper-side merging. Similarly, on the reducer’s side, we have reducer-side merging if sort is needed. And, if the data is too big to fit in reducer memory, reducer similarly needs to do a sort of external merge. So, that results in write amplification. And then, another problem is small IOs. In practice, we see, on average, about only 200 kilobytes per read, which is pretty small for spinning disks. And, the reason for this is because the number of reads is basically M times R, where M is the number of mappers and R is the number of reducers. And, this kind of scales quadratically with the size of the job, whereas the amount of shuffle data only scales linearly. So, that’s why we see small IOs.

So, to describe how Cosco improves IO efficiency, I’m first going to just simplify this drawing.

And, now Cosco comes into the picture because mappers will stream their output data directly to Cosco Shuffle Services, rather than writing the output data to local disk.

Cosco Shuffle for Spark

So, we have potentially thousands of Cosco Shuffle Services and each Shuffle service has an in-memory buffer for each reducer partition. Map output data is appended directly to these buffers. When the buffers are full, we write them to a distributed file system, like HDFS.

And then, reducers read this data from the Distributed File System. So, each file that a reducer reads is dedicated entirely to that reducer and these files are tens of megabytes, so that solves the small IO problem. And, we also solve the write amplification problem, or we improved it a lot, because there’s no mapper-side merging. Again, the mappers stream their data directly to Shuffle Services. There might be some reducer-side merging if there are too many files for a reducer to open them all at once, but it’s less of a problem.

So, this is a shuffle with a focus on what’s relevant for flash. There are other parts of Cosco, particularly metadata management. I won’t go into them in this presentation, but, if you like, you can see our presentation last year that goes into some of those details. There’s a link at the bottom of this slide. And, I’ll also summarize by saying that Cosco is a reliable, fault-tolerant, scalable distributed system.

So, let’s talk about how we can use flash to make Cosco more efficient.

Buffering Is Appending

Let’s take a closer look at what happens inside of these Shuffle Services. The data is sent from mappers in, we call them, packages, a few tens of kilobytes and the data is appended in memory to this in-memory buffer for partition in the Shuffle Services. So, the basic idea of using flash is that we can replace this in-memory buffer with an in-flash buffer.

Replace DRAM with Flash for Buffering Simply buffer to flash instead of memory

Just write directly to flash, rather than writing to memory. And, I’ll observe that this is a very friendly pattern for using flash. The main challenge of working with flash is that there’s limited flash endurance. That is, if you write too much to the drive, it’s going to wear out more quickly. But some patterns of writing to flash are much more friendly for write endurance than others, specifically lots of random small writes are bad for the drive, relative to the amount of data written. And, this appending pattern, writing a few tens of kilobytes, all just appended to the same file, is very friendly for flash, so this is how we minimize wear on the drive.

After the buffer is full, we read it back to main memory for sorting if sorting is needed.

And, I’ll also observe that flash is a bit slower than main memory, but it’s not a problem for this use case because all of these reads and writes to flash are generally non-blocking and also the latency is much less than the total amount of time that this data spends buffering in memory.

Example Rule of Thumb

So, let’s evaluate this with a rule of thumb. The rule of thumb that I’ll use is, let’s assume that you’re indifferent between deploying one gigabyte of RAM in your cluster, or an amount of flash that can endure 100 gigabytes of writes per day. So, this first main bullet point has some numbers justifying that, but I’ll note that there can be lots of factors that go into this decision of how much DRAM would you prefer to deploy versus how much flash? And, one of the factors is that DRAM consumes a lot more power than flash does.

So, to apply this rule of thumb, let’s imagine a hypothetical cluster with 10 nodes, each of them using 100 gigabytes of DRAM for buffering.

Basic Evaluation

Applying this one-to-100 rule, we would be indifferent between using this cluster versus one where each of those nodes has that 100 gigabytes of DRAM replaced with one terabyte of flash.

And so, in total, the cluster can endure 100 terabytes of writes per day if it’s using flash versus it can have one terabyte of DRAM if it’s using memory.

So, this is a picture summarizing it. If the cluster is shuffling 100 terabytes per day, then these two pictures are equal. Each cluster has 100 services. On the left, each service has 100 gigs of DRAM. On the right, it has one terabyte of flash, so each service can endure 10 terabytes of writes and, in total, the cluster can endure 100 terabytes of writes.

Yeah, so they’re equal if you’re shuffling 100 terabytes per day and if you’re shuffling less, then the flash is better.

Let’s talk about some hybrid techniques that can improve this efficiency further.

Two Hybrid Techniques Two ways to use both DRAM and flash for buffering

The first technique will be to buffer in DRAM first and flush to flash only under memory pressure. And, the second technique will be to use DRAM for the fastest-filling partitions and use flash for the slowest-filling partitions because the slower partitions wear less on flash.

Hybrid Technique #1

So, let’s talk about the first technique. We will basically take advantage of variation in Shuffle workload over time. So, on the y-axis, we have the number of bytes buffered in Shuffle Service memory at any given time. So, we could have our cluster with one terabyte of DRAM handling this workload, or, again, using our rule of thumb, we could have a cluster that endures 100 terabytes of writes per day. Let’s assume that this is 100 terabytes. We would just use flash for all of this buffering.

But then, the hybrid technique is that we could keep just 250 gigabytes of DRAM and send the buffer data to DRAM first. And then, if we run out of DRAM, then we’ll send it to flash and maybe this only results in us writing about 25 terabytes per day to flash. So, in summary, we’ve replaced this pure DRAM cluster with a cluster that has 25% as much RAM and 25% equivalent amount of flash, but we still support the entire workload, so that’s about a 2X efficiency win. I’ll also note that it’s safer to push this system to its limits with flash because if you’re using just memory and you run out of memory, bad things kind of start happening immediately. Whereas, if you’re using flash and you exceed the guideline for how much you should write to flash per day, that’s not really a huge problem as long as, over the lifetime of the drive, you don’t write too much to it, so you can make up for it later and absorb the small spike over a shorter time period.

One implementation detail in this hybrid approach is we need to decide when to flush to flash. In particular, we need to decide when to do that versus these other things that we could do. So, we could redirect the partition to another Shuffle Service. We could flush it immediately to a distributed file system to free up that memory. Or, we could backpressure mappers by telling them to slow down and send data less quickly. So, Cosco already handles all of this balancing decision making in the pure DRAM scenario.

So, how do we modify this to support flash? One way to do this is basically we can plug into this existing balancing logic. We can introduce some threshold and say that if the amount of flash that we have in the working set of the drive, basically the amount of flash that we’re currently taking up in files, if that amount is less than some threshold, then we can flush to flash. And, if it’s greater than that threshold, then we can apply the same balancing logic that we used before.

And, we can configure this threshold in production so that we stay under our flash endurance limits. And then, whatever that threshold is, we can use it to predict cluster performance. Basically, the cluster should perform as well as if that threshold amount of flash were just DRAM because we’re basically just replacing DRAM with flash in a way that stays under the flash endurance limits. This might not be quite the optimal way to make this balancing decision, but it’s really nice because it’s easy to implement and easy to predict what the performance is. Just plugs into the existing balancing logic.

And so, in summary, we can take advantage of variation in shuffle workload over time using this hybrid technique of use DRAM first and use flash only under memory pressure. And, we can adapt the balancing logic. The second hybrid technique takes advantage of some partitions filling up more slowly than others. The slower partitions wear out flash less quickly. So, the idea is to use flash for the slower partitions and use DRAM for the faster ones.

Some numbers to evaluate this, in our pure DRAM cluster, we have one terabyte of DRAM using this small example cluster. This can support, I’ll call it, 100,000 streams, each buffering up to 10 megabytes. So, you can think of a stream as an eternal partition, a partition that just lasts forever. We’re just using it as a hypothetical thing to model and analyze. So, this cluster supports 100,000 of these streams because 100,000 times 10 megabytes equals one terabyte. On the other hand, if we’re using flash, then in order to get 100,000 streams, we want each of the streams to write less than 12 kilobytes per second or less because this will result in writing 100 terabytes per day in the whole cluster, which, again, using our rule of thumb, is what makes these DRAM and flash clusters equivalent in terms of what we would like to deploy.

And, if the streams are slower than 12 kilobytes per second, then flash can support more of them. So, these slower streams are better on flash, whereas, if the streams are faster, then they’re better on DRAM.

So, the technique is to periodically measure the fill rate for each partition. If we find that it’s less than some threshold, then we can send future data for that partition to flash. Otherwise, we can continue buffering the partition in DRAM. And, there’s some math to evaluate this. Depending on your workload, you can calculate how much you expect to save using this technique.

So, this technique, again, it depends on the workload, it depends on slow partitions existing, so let’s look at what some real world workload looks like.

Basically, there are some partitions that are extremely fast, but most of them are quite a bit slower. And, this looks like an exponential curve, so this is good news for using this hybrid technique because it means that there are potentially a lot of partitions that we can efficiently use flash for. I’ll just note that what we really care about, in terms of replacing DRAM, is how much DRAM is replaced by moving a partition to flash. And so, this depends on basically the amount of memory times time that the partition was consuming. So, we can also look at the same graph, but weight each partition by the amount of buffering time that it spends and we see it’s a pretty similar graph, so the same conclusion applies. There are potentially a lot of partitions that we can send to flash.

Combine both hybrid techniques

And then, maybe we’re interested in combining these two hybrid techniques. We would buffer in DRAM first, and then, once DRAM fills up, we’ll send the slowest partitions to flash. So, how to evaluate this, how to estimate the efficiency, one way is to use a discrete-even simulation, which I’ll discuss later in this presentation. First, let’s talk about some potential future improvements.

Lower-Latency Queries

One class of improvements can try to get lower latency queries. This might be particularly useful for interactive workloads. One way to do this is we can serve the data directly from flash. In a sense, it’s kind of free to keep this data around on flash until the drive fills up because we’re expecting to be bottlenecked on flash endurance, rather than flash working sets ties. So, there’s no reason to delete the data from flash until the drive is full, or at least full enough to start wearing it out faster.

So, this is one way to improve the latency of interactive queries.

And then, another way is that flash allows us to have bigger chunks when we write a file from Cosco to Distributed File System. We’ll call that a chunk. And then, bigger chunks means there’s less chance that the reducer needs to do merging to get sorted reducer input. So, that can also be useful for interactive queries, lowering query latency in general.

Further Efficiency Wins

Another class of future wins that we could potentially realize with flash is more efficiency wins. One way to do this is to reduce the replication factor. So, I noted that Cosco is a durable distributed system.

One way that we achieved durability is we actually have R2 replication. Before the data is written to Distributed File System, it’s replicated on two different Cosco Shuffle Services. But, with flash, since flash is non-volatile, whereas memory is volatile, we can maybe get away with decreasing this replication factor. In practice, we see that most shuffle service crashes are resolved within a few minutes just by the process restarting on the same machine. And so, we can potentially recover that data and get away with a lower replication factor. And then, again, bigger chunks here, it allows us to improve efficiency by using a more efficient Reed-Solomon encoding on the Distributed File System.

So, let’s talk about some practical evaluation techniques. Some of these techniques will help us predict efficiency wins, as I was doing earlier in this presentation. And then, some of them will also help us check that there are no reliability concerns with using flash.

Practical Evaluation Techniques

So, here are four different techniques, ordered from kind of most theoretical to most similar to testing in production. And, I’m going to focus on two of them, discrete-event simulation and special canary on a production cluster.

Discrete Event Simulation

Discrete-event simulation is a kind of well-known technique. There is a Wikipedia page on it.

And, the basic idea of discrete-event simulation is each step in your simulation corresponds to an event. So, here, events are packages, a few tens of kilobytes arriving from each mapper, and each package arrives at a specific time and it is appended to a specific in-memory buffer in the Shuffle Service. And, we can track metrics that we care about, like the total amount of data written to flash.

Another type of event is when a buffer is sorted and spilled to DFS. And, another metric that we’re interested in tracking is the average file size in Distributed File System, so we can track that as well.

And, we can drive this simulation with real world data. We have this Cosco chunks data set. We have one row written for every chunk file that we write to DFS and this contains information, like the chunk size and the amount of time that the chunk spent buffering. We can use those two numbers to drive the chunk fill rate and we can use that, together with the chunk start time, that is the time that the chunk started buffering, we can use that to drive the simulation and decide which discrete events we should try simulating when.

And, again, we can use this to predict efficiency. In particular, you can imagine having a certain memory limit on each simulated Cosco Shuffle Service and once we hit the memory limit, we’ll send the slowest partitions to flash. And, we can use this to figure out, if we have such and such memory limit, we end up writing this much to flash. And, if we have more memory, how much less do we write to flash? And, overall, we can predict how much flash we need to place how much memory.

Canary on a Production Cluster

So, that was discrete-event simulation. Another technique is basically to canary in a production cluster, but there’s something interesting here because the most naive thing that you could do is you could replace some pure memory Shuffle Services with some pure flash Shuffle Services. Or, rather, some Shuffle Services that have some flash. But it’s not particularly easy to observe the results of that canary because many of the metrics that we care about are observed on mapper tasks and each mapper talks to potentially many Shuffle Services.

The mapper could even talk to different Shuffle Services over the life of the map task. Because we do this dynamic balancing, we might redirect the mapper to a different Shuffle Service for a particular partition. So, it’s hard to say.

If we see a change in mapper metrics, is it due to this

pure flash Shuffle Service or is it just a change in workload over time? Is it due to the pure memory Shuffle Services? One way to get a better understanding of these metrics that are observed on mappers is to use this feature that we were already using in Cosco, even before using flash. We call the feature subclusters. Basically,

all Shuffle Services are divided into different groups that we call subclusters and each mapper only talks to Shuffle Services from one subcluster. Before flash, we used this to limit failure domain and for some other reasons.

But it’s nice for flash evaluation because we can put the canary machines with some flash, we can put those all in one subcluster, and then we can compare the aggregate metrics between mappers that used a subcluster that had flash machines versus mappers that used pure DRAM subcluster. And, this insulates us from changes in the workload over time. We can just run this in production and see what the mapper metrics are on the different subclusters.

I’d like to thank Chen and Serge for all of their work on this project.

Here are some links to previous talks that we gave about shuffle in previous years.

I’m looking forward to hearing your questions now.

Watch more Spark + AI sessions here
Try Databricks for free
« back
Aaron Feldman
About Aaron Feldman


Aaron joined Facebook's data platform team four years ago where he designed and implemented core components of Cosco and helped drive it to wide internal adoption. Before Facebook, Aaron studied math and computer science as an undergraduate at Caltech.