Uneven distribution of input (or intermediate) data can often cause skew in joins. In Spark, this leads to very slow join stages where a few straggling tasks may take forever to finish. At Facebook, where Spark jobs shuffle hundreds of petabytes of aggregate data per day, skew in data exacerbates runtime latencies further to the order of multiple hours and even days. Over the course of last year, we introduced several state-of-art skew mitigation techniques from traditional databases that reduced query runtimes by more than 40%, and expanded Spark adoption for numerous latency sensitive pipelines. In this talk, we’ll take a deep dive into Spark’s execution engine and share how we’re gradually solving the data skew problem at scale. To this end, we’ll discuss several catalyst optimizations around implementing a hybrid skew join in Spark (that broadcasts uncorrelated skewed keys and shuffles non-skewed keys), describe our approach of extending this idea to efficiently identify (and broadcast) skewed keys adaptively at runtime, and discuss CPU vs. IOPS trade-offs around how these techniques interact with Cosco: Facebook’s petabyte-scale shuffle service (https://maxmind-databricks.pantheonsite.io/session/cosco-an-efficient-facebook-scale-shuffle-service).
Speakers: Suganthi Dewakar and Tony Xu
– Hello everyone welcome to our session. We are going to talk about how we are solving the data skew issue at Facebook for a large table joins along with keeping our IO within limits. My name is Suganthi Dewakar. I’m a software engineer in the Spark team at Facebook, presenting with me is Tony.
– Hi everyone my name’s Tony. I’m also software engineer at Facebook. I’m working on Cosco, which is an external shuffle service working for Spark work clouds.
– Today, we will be talking about our Skew Join Journey, where I will describe three solutions that work for us. And Tony will talk about how we integrate in Cosco, which is Facebook’s external shuffle service, with these solutions, what is data skew? It does the uneven distribution of partitioned data, and it affects aggregate operations like group by and joins. For example, if we were to count the number of people in each country, we would read all countries data and group data that belong to the same country into partitions. Each partition is then processed by a single task to obtain the final count. Partitions that contain data of less popular countries would have a smaller size like partition one, two and four on your screen. Partitions that contain data of densely populated countries would have a larger size like partition three. In this example, partition three is skew. The task that processes as skewed partition has larger latency and can slow down an entire query. The effect of data’s queue at Facebook is further exacerbated because of the scale at which we operate. We have petabytes size table joins and terabyte size skewed partitions. In terms of task latency, the non-skewed partitions are processed within minutes to a few hours whereas processing skewed partitions take anywhere between multiple hours to days. Skewed pipelines are often latency sensitive daily jobs with complex tax. Delay in a single skewed pipeline can cause delay in hundreds of downstream pipelines that depend on it. Even those queue affects both aggregate operations and join, in this job, we will focus only on joins. A little recap on the join strategies that exist in Spark, ShuffleHashJoin and SortMergeJoin require input data to be partitioned and hence are more prone to be affected by the input data that is skewed. BroadcastHashJoin on the other hand, does not require input data to be partitioned, but requires one side of the join to be small enough to fit in memory. Since broadcast join is a mapper type join that requires no partitioning, it is skew resistant. Skew join which is not an existing join strategy per se, is a hybrid join strategy that combines above strategies in order to mitigate skew. The non-skewed partitions are processed in a usual manner, but the skewed partitions are joined in a way that mimics broadcast join strategy. The mechanism we use to incorporate skew type strategy led to the progressive adoption of three solutions, each deprecating its predecessors. I will describe each solution going forward. The first solution, skew hint is a optimizers rule-based solution where the user provides us with skew information in the form of credit. The way they would do that is by using the skew on keyword, followed by a list of column name and value pairs. Once we have the information, the skew information, while planning the query, we split the input data into two parts. The first part containing the skewed keys and the other one containing the non-skewed keys. We then broadcast join the skewed keys and shuffle our sort merge join the non-skewed keys, and finally union both the join outputs to obtain the final result. Let’s visualize this better. Let’s take an example of a two table join, table A and table B on a skewed column. Without skew join, the physical plan would look something like this. This scan both the tables, partition the data and perform sort merge join. Since the data is queued, join task processing this skewed partition takes longer to complete. With skewed join incorporated, the physical plan is changed to something that looks like this, on the left we filter rows that are not skewed and sort merge join them and on the right, we filter rows that are skewed and broadcast join them. Since broadcast join is skew resistant, both these joins complete with little latency. And then we are finally able to union both these outputs. Let’s take a look at the pros and cons of this solution. We definitely were able to reduce runtime latency using skew hints. However, this solution required prior knowledge of skewed keys even before query execution. We also had to double scan input data in order to achieve this. This led us to the development of our second solution, which is a runtime skew mitigation based on Spark 2.0 adaptive framework. With this solution stool detection and mitigation happens at runtime while the query is executing. At the beginning of a join stage, we collect map output statistics of the joins input shuffle data. These statistics contain size information of each partition. We then find if any partition in this input is skewed by applying three simple condition, the partition size must be created than minimum threshold value. It is X times the median size of all the partitions and Y times the 99 percentile size of all the partitions. Once we detect that a partition is skewed, we now mitigate it by planning this skewed partition into multiple sub-partitions. The corresponding partition on the other side of the join is duplicated to each task processing the split sub-partitions. Since partitions are now evenly sized, the end to end latency of this join is greatly reduced. Looking at the pros and cons of this solution, we saw runtime latency along with not requiring prior skew key knowledge before query execution. We also avoided the double table scan problem. However, since this solution is based on Spark 2.O adaptive framework, we add additional shuffles between joins and joins and aggregates even if a particular join is not skewed In the third, approach we build and adopt on top of existing skew mitigation solution in AQE, which is the new adaptive framework in Spark 3.O that was introduced this year. A little recap on what AQE is. AQE is the new adaptive framework in open source Spark 3.0 which also includes skew mitigation solution as part of it, along with other features, it also allows for runtime DAG changes, which was a game changer for us. AQE based skew mitigation had some limitations because of which, it did not mitigate skew in our workloads in its original form. AQEs mitigation supports mitigating skew in two table joins only. It does not mitigate skew in joins that are part of other reducer or mapper operations in the same stage. This includes a skewed join that could also include an aggregate operation or a union operation in the same stage. Since a lot of our existing pipelines fell under this category, it led to us building our first customization, this customization adds shuffles after a skew join at runtime, if it is followed by an operation that requires a specific distribution. Before describing how we went about this customization, let’s go over the lifecycle of AQE execution. AQE splits the query at shoveling boundaries into query stages. Each query stage then goes through replanning based on its tile stage statistics followed by an insurer requirements, which adds shuffles between operators if required. Beyond this step, no new shuffles can be added to this query stage. The physical plan then goes through sub stage creation in case new shuffles were added followed by skew join optimization. In case the skew optimized plan requires the addition of new shuffles, it falls back to the original plan and gets executed. In order to add shuffles after a skew join, if required, we add a new step in the lifecycle, which is the detect skew step. Detect skew finds of the current stage contains a join, which has queued inputs, it marks the output partitioning of this join to unknown, thus forcing ensure requirements to add shuffles after the skew join, if it is followed by an operator that requires a particular distribution. This way, we were able to support most of our skewed pipelines that were previously supported by our older solutions. Beyond this, we also identified further improvements that could be done on above customization. Let’s consider a query sheet as this one, a petabyte sized table joined with multiple tables with a single join type. In this case, if there is Queue in the first table, the baby would mitigate it. But the first customization is by introducing shuffles after each join. The physical DAG would include additional shuffles to mitigate skew. After each join, we would be shuffling, petabyte sized data which is very inefficient in terms of CPU and IO. This led us to developing our second customization, suited specifically for multi-level joins with the same join types. We split skewed partition in the leftmost table and replicated the corresponding partitions in the rest of the tables for each split. This way, we were able to solve skew in multiple joins without adding additional shuffles. Now looking at the pros and cons, this solution had all the advantages of the previous solution plus was more efficient since it did not add unnecessary shuffles when a join is non-skewed. Now that I have covered skew mitigation solutions, we adopted at Facebook to help with latency. I will hand it over to Tony who will talk about the challenges we faced, integrating the skew solution with Cosco, which helps in IO.
– Thank you, Suganthi. Suganthi already talked about the background and journey of skew mitigation to Facebook. I’m going to talk about the challenges we encountered when integrating skew mitigation with Cosco and how we’re planning to overcome that. So what’s wrong with skew mitigation and Cosco together? basically skew mitigation spit a skew partition into multiple sub-partitions, each would it be consumed by separate sub-reducer, a pro here is that, it spits partition in mapper boundaries. Well, this works perfectly with vanilla spank shuffle as a shuffle output data, a group by mappers. So each reducer is only required to read data from ALPA files, from selected mappers. But in Facebook we use Cosco as an external shuffle service for Spark. It emerges all mapper outputs together. So that there are no mapper boundaries in the data consumed by reducer. So this brilliant strategy in skew mitigation doesn’t work well here. In example below each sub-reducer is only required to read a partial data from each file of partition zero, because they’re not able to know if the data is actually from its mappers and skew iterate through each file. Before we jump into this solution, let me give you a very short recap on Cosco. I believe a lot of your guys have already heard about Cosco before. If you’re not familiar with it, there was a Ted talk about it, which was held by Macothy Brian in a spot summit last year. Basically Cosco is a shuffle as a service, currently serving the Spark workload in Facebook, mainly focusing on efficiency to address two issues. The first one is the right application. The second is smile, they also sometimes improve job latency, but that’s just not our focus. In short, Cosco groups shuffle data into files by partitions, while shuffles group shuffle data into files mappers, each mapper streams the shuffle data to request shuffle service instance, based on partition ID of the data. Cosco shuffle service has a right ahead of buffer, for each reducer partition. So it emerges the data from different mappers and force them to this aggregate storage. Therefore, each user is only required to read from the those files dedicated for a particular partition. Also, data could be duplicated due to various reasons. And the duplication is also performed on reducing side. As those end of components I just mentioned, there’s also a dedicate and meta data service for meta data registration of shuffle files, which would be retrieved by reducers later, as I mentioned early, to make Cosco with mappers spitting boundaries, each sub-reducer has to read the entire data set of the skew partition and then filter out those data from uninterested mappers while iterating through entire dataset and only consume from those interested mappers. And as you can expect, this is very inefficient since it’s skew partition data has to be read multiple times that each sub-reducer has to read all data. And of course this doesn’t work very well, in a worst case in Facebook, our most heavily skewed partition could be up to a few terabytes, which requires thousands of splits for just a single partition. And that means the partition would be read thousands times. And we found out that a majority of the time would spent on this scale in a case for such jobs. So since Cosco groups shuffle output by partition and each partition could have a large number of files, it is natured to just ask ourselves why can’t skew mitigation split skew partition based on file boundaries? This is a very tempting idea. But before we jump into the design, let me explain why this is actually not a trivial problem. First of all, that’s classifies three terminologies. I will use frequently in the rest of the talk, namely record, package and a chunk. So record is the unit of a data row and a multiple rows, multiple records being buffered would make a package, which is the unit to be a sent from mapper to Cosco shuffle service, and a multiple packages buffered by Cosco shuffle service make up a chunk, which would be flushed to disk as a file. So the main restriction to perform a file boundary split is because of duplication. Basically the shuffle data from Cosco shuffle service is likely to contain duplicated records due to packages received by mappers, which might be caused by various reasons, to duplicate such package, Cosco reducer client perform a merge sort or keep a harsh map to achieve this. But requires each reducer to read all data generated by a mapper of that partition. Otherwise, if the output of mapper of the partition is partially read by multiple reducers, none of the reduces would be able to tell if a row is duplicated in the data read by others, because of this default set assigned to each sub-reducer has to have no overlapping of mappers with each other, which is just not physical. For example below, sub-reducer zero is assigned to read from file zero and one, but since file zero and one have mapper overlapping with file three and four respectfully, both file three and four have to be read by sub-reducer zero as well. Since Cosco randomly emerges out from different mappers, each sub-reducer will be likely to end up with reading all files with the partition again. So to make Cosco work with far splitting boundaries with skew mitigation, we need to find out a new way to reduce duplications, basically the two ways, a duplicated record could be generated. The first one is caused by mapper failure and a restart, we do this by assigning unique IDs to different mappers, even for a restarted mapper, it has a different ID was the original failed one. And then each record is appended with its mappers unique ID. When a reducers starts to consume data the unique ID of all succeeds in the mappers. So any records associated with IDs have of failed mappers will be dropped. Therefore, duplicated records caused by mapper failure will not be a blocker to splitting in file boundaries. The second course is because of network issues that a mapper fails to receive ack messages from a shuffle instance after a timeout, since the mapper has to resend all packages waiting for ack messages and all records was in those package would be likely to be duplicated in files since they are sent twice, we call such packages suspected packages, which is observed for mappers perspective that each packages are likely to be duplicated. Note that a suspected package is not necessarily to be duplicate package, but a duplicate package is always a suspect package. If we can identify all suspected packages and passed info along from mappers to corresponding reducers, would that help on duplicating packages? The answer is yes, each mapper would keep a suspect packages it observed basically by adding all those packages being sent to different shuffle service due to missing ack messangers and as such packages would eventually be ACKed along with a chunk ID, the chunk ID would be unique identifier for authorized file containing records of the suspected package. So each mapper keeps such mapping from the suspect package IDs to their authorized chunk IDs of each partition. When a mapper is done, it’ll report those suspected package for to Spark driver and of course, Spark driver would aggregate all the suspected package infos from all mappers. On the reducer side, it will first talk to our meta data service to fetch a list of files to read from the partition it is designed with, since we’re splitting in file boundaries, sub-reducers of the same skew partition would have non-overlapping files to read from. Meanwhile, each reducer will also get us a package mapping for, of that partition from the Stark driver. When iterating through the records of assigned files, the reducer will check if it comes from suspect package, if not, we’re just confident that this is not a duplicate packages or otherwise the reducer would only accept the record, if the file chunk ID matched with authorize chunk ID in the mapping. So an example below both chunks containing the same recorded from mappers three packages four, but since the authorized chunk ID is 15, to record would only be read by sub-reducer two and drop by sub-reducer four. What if a file chunk is lost? The original Cosco since each reducer has to read the entire partition data so the data to be read by the reducer is deterministic. We only need to restart mapper to regenerate data of the partition for the last chunk. And then reducer will be restarted to read all the data of the partition again. But a skew mitigation splitting in file boundaries, the data to be read by each sub reducer is not deterministic. This is because the restarts sub-reducer would only read a subset of regenerated files. And the regenerated files are non-deterministic since Cosco shuffle service merged snippet outputs randomly. So simply restarting a sub-reducer would be a mess up here. Therefore, when a chunk loss is detected, Spark driver would broadcast all the producers of the partition to fail them all and a once the partition files are regenerated to restart all sub-reducers of the partition again, but it is also worth mentioning that this is very rare case because Cosco uses RS encoding for shuffle chunks, Putting it all together, the motivation of this changing Cosco is to make Cosco with Spark skew mitigation, by splitting a skew partition into multiple non-overlapping file sets that each sub-reducer will be assigned just one of them and to achieve that, each mapper would keep tracking of suspect packages along with authorized chunks and the report suspect package enforce to Spark driver, when finished, once the mapper phase is done, the Spark driver would detect skew partitions based on that mapper status and split two partitions based on file boundaries of each partition, each reducer would fetch a list of chunks to read, along with suspected packaging info of the partition. It would accept a record of a suspected package only if it is read from authorized chunk. As the summarize of our talk today, we discussed how we improve the runtime latency of pipelines by gradually stopping the skew problem of our large join efficiently, namely skew hint, runtime skew mitigation, and also customized adaptive queue execution, skew mitigation, we then discussed the importance of Cosco for IO efficiency, the techniques involved in solving the challenges we faced while integrating adaptive skew mitigation with Cosco by splitting skew partitions into non-overlapping file sets. We hope you found this session useful and thank you so much for listening.
Suganthi is a software engineer at Facebook, Menlo Park. She is part of the data infra team focussing on Spark core and SQL engine. Before Facebook, she worked at Google, NetApp and Ericsson. She received her Masters in Computer Science from North Carolina State University.
Guanzhong is a Software Engineer at Facebook’s Big Compute team. At Facebook, he has worked on Cosco for large data shuffling for half a year. Before Facebook, he worked at Google Ads and Youtube for large data processing. He received his master's degree in computer science from Shanghai Jiao Tong University.