The number of daily Apache Spark applications at LinkedIn has increased by 3X in the past year. The shuffle process alone, which is one of the most costly operators in batch computation, is processing PBs of data and billions of blocks daily in our clusters. With such a rapid increase of Apache Spark workloads, we quickly realized that the shuffle process can become a severe bottleneck for both infrastructure scalability and workloads efficiency. In our production clusters, we have observed both reliability issues due to shuffle fetch connection failures and efficiency issues due to the random reads of small shuffle blocks on HDDs.
To tackle those challenges and optimize shuffle performance in Apache Spark, we have developed Magnet shuffle service, a push-based shuffle mechanism that works natively with Apache Spark. Our paper on Magnet has been accepted by VLDB 2020. In this talk, we will introduce how push-based shuffle can drastically increase shuffle efficiency when compared with the existing pull-based shuffle. In addition, by combining push-based shuffle and pull-based shuffle, we show how Magnet shuffle service helps to harden shuffle infrastructure at LinkedIn scale by both reducing shuffle related failures and removing scaling bottlenecks. Furthermore, we will share our experiences of productionizing Magnet at LinkedIn to process close to 10 PB of daily shuffle data.
Min: Hi, welcome to this session. I’m Min, a senior staff engineer at LinkedIn and the tech lead for the LinkedIn Spark team. So together was my colleague, Chandni, another member of the Spark team, we’re going to talk about project Magnet, which aims to drastically improve the performance and the scalability of the shuffle operation of Spark. And it will also be an upcoming feature in a future release of Spark itself. I hope you will find this session useful.
So we will start with a brief overview of the Spark [inaudible] system at LinkedIn, and also talk about the major challenges we have faced with operating shuffle at LinkedIn scale. And following that, we’ll also introduce project Magnet and the push-based shuffle mechanism that it brings to project Spark. And also to demonstrate its impact on LinkedIn, we’ll also share the production results of Magnet and some of the key learnings we have gathered. We will conclude this talk with a few interesting areas that we believe Magnet can bring further benefits.
So after evolving over a period of four plus years, [email protected], now represents a pretty massive scale infrastructure across our two largest and business clusters we have more than 11,000 compute nodes. Every day, more than the 40,000 Spark applications are submitted and running on our clusters. These Spark applications contribute to more than 70% of our total cluster compute resource usage, and generally close to 18 petabytes of data to be shuffled on a daily basis.
And this massive scale infrastructure also grows very fast. We have seen more than 3X year over year growth in our Spark compute footprint for the past two years. So when operating such a massive scale infrastructure, many challenges can arise, among these the ones associated with the shuffle operation is perhaps the biggest we have dealt with. Shuffle is the very foundation of the MapReduce compute paradigm and Spark’s shuffle architecture perhaps represents the result of a continuous improvement over the past decade in big data compute engines.
So as illustrated here Spark adopts the so-called [inaudible] based shuffle. That’s each map task inside a Spark computer will produce one shuffle file consisting of multiple shuffle partition blocks for the corresponding reducers. The shuffle file is produced on local disks and managed by the external shuffle service deployed on the same node. When the reduced task start roaming, they would fetch the needed shuffle blocks from the corresponding remote shuffle services. This architecture achieves a reasonable balance between performance, scalability and reliability for shuffle. However, when operating it at [inaudible] scale, we still encountered multiple issues.
The first one is a reliability issue. In our production clusters we have noticed that the shuffle services can become available during peak hours due to the heavy request load. This will result into shuffle fetch failures and leading to expensive stage retries, which can be pretty disruptive causing workflow [inaudible] breakage and even job failures. The second is an efficiency issue. At LinkedIn we store the shuffle files on hard disk drives. Since the reducers block fetch requests arrive in random order, the shuffle service would also access the data in the shuffle files randomly. If the shuffle block size is small, then the small random reads generated by the shuffle services can severely impact the disk throughput extending the shuffle fetch wait time.
The third issue is about scalability. An abusive job shuffling many small shuffle blocks could easily choke a shuffle service leading to performance degradation. This not only impacts this abusive job, but also all the neighboring jobs sharing the same shuffle service. As the total number of Spark apps increase more such abusive jobs will appear. And it’s not always easy to tune these jobs to shuffle larger blocks, but whenever it happens it will lead to unpredictable delays to other jobs, that’s otherwise behaving normal. So we took a look around all the possible solutions and we noticed that there are a few existing optimizations that can improve shuffle performance by either reducing the need of shuffle or probably changing the shuffle partitions.
Techniques such as, broadcast join, caching of the RDD/DF frame and even bucketing are common ways to reduce the need of shuffle. However, they either have limited applicability or require a non-trivial manual setup effort. Thus, they are not generic enough to solve these issues we have encountered at our scale. In Sparks 3.0 the AQE feature, the Adaptive Query Execution, was introduced which enabled Spark to auto-collates many small shuffle partitions into fewer larger ones.
So this feature could actually lead to slightly increased shuffle block size and resulting to better shuffle performance. However, in our evaluations, it is still not sufficient to address all the challenges we have faced. So facing all these challenges with shuffle and the limitations of the existing optimizations, we realized that solving these issues actually require a more fundamental improvement to the shuffle operation itself. For this, we have designed and implemented project Magnet, a novel push-based shuffle architecture.
Magnet aims to merge small shuffle blocks into larger ones thus optimizing the I/O pattern during shuffle. As part of this process, a second replica of the shuffle data gets created, which helps to further improve the reliability of shuffle. And more importantly Magnet also aims to significantly improve the shuffle data locality, which significantly boost the performance of the shuffle operation. So during the time constraint, we won’t be able to go into too much details on Magnet, but our paper describing the detailed design and computation of Magnet was published in last year’s world [inaudible] conference. We will also provide more links towards the end of the presentation, for people who are interested in knowing more.
So now I will hand it over to my colleague, Chandni, to talk about the high level designs for Magnet, as far as its impact for LinkedIn’s offline compute infrastructure.
Chandni: Thanks Min. Hi, I’m Chandni and I’ll be talking about the basic design principles of Magnet and it’s impact at LinkedIn. We have designed Magnet as a complimentary approach to the existing shuffle mechanism in Spark. Since it works along with the existing shuffle operation, it is a best effort approach, where not every block is guaranteed to be pushed and merged. As shown in this diagram on the right the core idea behind Magnet is that instead of having reducers fetch small blocks of shuffle data from the external shuffle services, the mappers, after they generate the shuffle data, will also push the shuffle blocks to the shuffle services.
The server can then merge the blocks, belonging to a shuffle partition into a merged shuffle file for that partition. Since, this push merge approach is best effort, the reducers now fetch a hybrid of blocks. As shown in this diagram here, the reducers will be both merged shuffle blocks as well as unmerged blocks, since not all blocks are guaranteed to be merged. When the reducers read larger merged blocks, then that improves the I/O efficiency because the server cannot perform large sequential reads instead of small random reads from the disk.
Another benefit on the read side is that now the location of these merged files provide a natural locality preference for the reducer tasks. They have higher chances to be placed at the same nodes as the merged files from which they read the shuffle data. And this has a big impact on the cluster efficiency, as it leads to more shuffle data that has been read locally. In addition, the overall reliability is improved as well because there is another replica of shuffle data that is created. In cases where reducers just fail to fetch the merged file they will fall back to fetch the unmerged blocks making up that merged file.
Now we will look at the impact that Magnet has had in our production clusters. We have rolled out Magnet to 100% of all the Spark workloads at LinkedIn. And we did some analysis and measured the data from our two largest production clusters. And currently Magnet is serving 15 to 18 petabyte of shuffle data daily. When we rolled out Magnet to these clusters, we did it in stages with the ramping tool to minimize our risks and we reached to the 100% mark without encountering any major issues. This is a big attesting to the overall reliability and scalability of Magnet.
In addition, Magnet has significantly transformed the landscape of shuffle in our clusters. We have observed 30X increase in the shuffle data that is read locally compared to what it was six months ago. As we can see in this graph here, the y-axis in this graph is the shuffle data locality ratio, which is defined as the percentage of the total shuffle data that is read locally. The x-axis are the days when this metric was recorded. We can see that as Magnet is ramped up during the last six months, the amount of shuffle data that is read locally has increased.
Now since Spark allows reading local shuffle data directly from disk and bypassing any network RPCs the load on the shuffle service is considerably reduced. This helps a lot to improve the cluster efficiency and it also leads to higher stability in the cluster. Specifically, we’ve observed an overall reduction of 84.8% in the shuffle fetch wait time across these two clusters. The shuffle fetch wait time measures the amount of time that reducers spent waiting for remote shuffle data to arrive. In this graph here, the y-axis represents the average shuffle fetch delay percentage, which measures the total shuffle fetch wait time as a percentage of the total task time. The x-axis are again, the days when this was recorded. So we can see from this graph that as Magnet is ramped up again, the average shuffle fetch delay percentage reduces. And this significant reduction shows how much Magnet improves the overall efficiency of the shuffle operation in our clusters.
In addition to improving the cluster efficiency and stability Magnet also directly benefits Spark jobs in our clusters. One of the key benefit of mandate is that it doesn’t require any jobs specific tuning. The users just need to turn on push-based shuffle configuration, and they will see improvements in their job. To quantify the benefits that Magnet brings to the Spark workflows we analyzed around 10,000 workflows that were onboarded to Magnet, and then we measured the compute resource consumption and overall job runtime.
Now to measure the improvements by Magnet to these workflows, we had to compare the Magnet enabled and disabled executions of these workflows, which dealt with similar amount of data. After this analysis, we saw that the overall compute resource consumption went down by 16.3%. About 20% of these flows were previously heavily impacted by shuffle fetch delays. Heavily impacted flows are the ones which had non-Magnet executions where shuffle fetch delay time was over 30% of the total task time. Among these flows the overall resource consumption went down by almost 45%.
So now for the improvements in the job runtime, we observed that 50% of the heavily impacted workflows had seen at least 32% reduction in their job runtime. This is also shown in the graph here, which is a present time, time series chart. I think to note here is that this chart represents thousands of spark workflows. In this chart the y-axis is the app duration reduction percentage, which measures overall job runtime reduction with Magnet as a percentage of the total job runtime of non-Magnet executions. The x-axis again, are the days when this metric is recorded. The middle line is the 50th percentile of the app duration reduction percentage. The bottom line is the 25th percentile and the top one is the 75th percentile. So for April 25th, the data point on these lines, they show that there are 75% apps that saw more than 23% reduction in their job runtime. 50% apps, saw more than 38% reduction, and 25% of these apps saw a reduction of more than 58% in their job runtime.
So just to summarize, Magnet has resulted in much faster and cheaper job executions across our two production clusters. The ramping up of Magnet to 100% adoption in our clusters, we’ve also validated some of our initial theories about it. One of them was that if Magnet is more widely used within a cluster, it will further reduce the load on the shuffle services and therefore the shuffle performance will also increase. This graph here is the same as the previous one, but for a different time period, when we significantly increased the adoption of Magnet in the cluster. And this helped us to validate that theory about Magnet.
From this graph, we can see that across the two highlighted windows, the first one where the adoption was increased from 12% to 60%, and the second one where it was increased from 60% to 100%, the app duration reduction percentage increased across all the percentiles. Just a thing to note here is that, the job runtime in a large-scale multi-tenant cluster can be impacted by various factors, such as delays in allocation of containers [inaudible] or skewness in the data. And it is not possible to exclude these factors in the job runtime measurements. This is the main reason we see regression and fluctuation in the above graph and especially in the 5th and the 95th percentile. Since these are measuring extreme cases, they tend to have more fluctuations than the other percentiles.
Finally, we also compare the performance of Magnet with different disks. We run a benchmark, which shuffles around 800 GB of data on both NVMe and HDD. For this benchmark, we had set up two 10 node clusters where one was using HDDs and the other one was using NVMe disks to store the shuffle data. And we ran both Magnet enabled and disabled benchmark jobs on these two clusters, and the results are shown here in this table. Without Magnet the benchmark on HDD took 16 minutes, and on NVMe it took 10 minutes. With Magnet the benchmark on HDD took 7.3 minutes and on NVMe took 4.2 minutes.
What we learned from this, these numbers, is that even though NVMe disk improved the shuffle efficiency, Magnet, just with HDD, brings an additional improvement of 17 percentage points. Also, Magnet on NVMe brings significant improvement compared to Vanilla Spark on NVMe. So we can conclude from this, that even though the shuffle delay problem can be addressed with a more performing disk to some extent, but the RPC load on the shuffle services also contribute significantly to this delay, and this problem is alleviated by Magnet. So that brings and end to my section. I will hand off to Min from here. Thank you.
Min: Thanks Chandni, for giving me your summary of the design and impact of Magnet. So now that we have achieved this internal success with project Magnet, we also have a few areas that we’re working on to bring the benefits of Magnet to even more scenarios. The first is contributing Magnet back to project Spark. This is an ongoing effort under the Spark [inaudible] ticket, Spark-30602, and we have already contributed around 50% of our internal [inaudible]. So once completed push-based shuffle will be included in a future release or purchase Spark itself.
So in addition, in the past one to two years, we have also seen a few solutions from other big tech companies that aim to improve Spark’s shuffle operation, such as the Costco shuffle service from Facebook and the Zeus shuffle service from Uber, and a few other solutions. So compared with these solutions Magnet has a few unique advantages. The first is Magnet native integration with Spark, meaning that no external storage systems or coordination services are needed to use Magnet. This has helped significantly at LinkedIn to ensure ease of deployment and monitoring, as well as making sure that we do not hit any unexpected scaling bottlenecks from these dependencies.
The second is a much improved shuffle data locality that Magnet brings to the table. Now we have already seen in Chandni’s section that how much this helps to improve the performance, reliability and scalability of shuffle, even if it’s SSD or even NVMe disks. So we hope that once Magnet becomes available as part of Spark itself, other people can also try that out and benefit from the much improved shuffle architecture, and it will also welcome collaborations from the community on this work.
And in addition to the open sourcing efforts, we have also identified a few areas where Magnet can help to bring additional benefits. Specifically, we’re seeing that Magnet can help to further optimize compute and storage disaggregate data infrastructure, this company [inaudible], as well as optimizing Python-centric data processing workloads in AI data pipelines. And we’ll briefly talk about these two areas before concluding this presentation.
So as more companies are embracing cloud to operate their data infrastructures, we have seen an increasing demand for compute storage disaggregate cluster deployments, where the data sets are permanently stored in remote storage clusters, where the compute happens within an elastic cluster on cloud. Such as setup helps to reduce the cost of operating data infrastructure on cloud, as computing instances can be lost as they come and go, based on demand. However, there hasn’t been a very good solution for storing shuffle data in such a disaggregate cluster or set up yet.
So one common approach is to just store the shuffle data on local disks attached to [inaudible] compute instances. However, this hampers the computer elasticity since decommissioning the computer instance would also mean loss of the shuffle data stored on it, which can actually lead to expensive stage retries. This also runs the risk of exhausting the limited local storage capacity, as compute instances on cloud usually do not have ample storage space.
The alternative approach is to just store the shuffle data in remote disaggregate storage in order to decouple the compute instances from shuffle storage altogether. This however also has a non-negligible performance overhead since all reason rights of the shuffle data, now needs to go through this remote storage. So with Magnet we’re actually trying to propose an alternative hybrid approach for storing the shuffle data in such disaggregate cluster setups.
So we will leverage both the local storage available to the compute instances and the remote storage, such that the local storage is acting as a caching layer for the shuffle storage. With this set up the shuffle data will still be produced onto the local storage first. However, the local storage is now treated as an ephemeral storage instead of a permanent storage, meaning that the local storage can evict or even relocate locally stored shuffle data to remote permanent storage if needed, unless it can tolerate running out of local storage space or even compute instance decommissions.
When the shuffle data is fetched, if it is still presenting the local storage, it will be accessed directly. Otherwise, it will be fetched from remote storage first, before serving to the reducers. This approach will still preserve Magnet’s benefit of the much improved shuffle data locality. We’re also enabling that decoupling of shuffle storage from the compute instances. We will provide links to more details on this approach and at the end of the presentation.
So in addition to this, another area that we see Magnet can bring additional benefits, is how it can help to optimize Python-centric data processing workloads. The surge of the usage for AI in recent years across industry has driven the adoption of Python for building AI data pipelines. A different from the [inaudible] use cases, where developers might still largely preferred SQL as the language. The AI developers are increasingly leaning towards Python as their first choice.
Since Magnet’s optimization is at the shuffle layer, which is a very fundamental building block for modern big data computer engines, it is generic enough to bring benefits to both SQL-centric analytics workloads, as well as the Python-centric data processing workloads in AI data pipelines. To fully unlocked those benefits we’re also looking to support a Magnet for Spark and Kubernetes deployment scenarios.
So this concludes our talk. Towards the end we also want to provide a few resources for people who are interested in learning more details. The first is the upstream Spark direct ticket under which we’re currently contributing Magnet to our project Spark. You can follow this ticket to get an idea of when this feature becomes fully available in Spark.
The second is a blog post we published last year, which talks a bit more on the design implementations of Magnet. And this blog post is also a simplified version of our we [inaudible] paper published last year, which contains more insights. To last, we also recently published a second blog post, which contains information about the production results of Magnet at LinkedIn, as well as more detailed information on some of the future works we just talked about earlier. We hope you will find them useful. So towards the end, we would like to also encourage people to rate and review this session and to let us know your feedbacks. We will also be available, if you’re curious, after this presentation.
I have been working on the Data Infrastructure projects in the Hadoop ecosystem since 2013. I am an Apache Hadoop committer and an Apache Apex PMC. Currently, I work on Apache Spark at LinkedIn. I’v...
Min Shen is a tech lead at LinkedIn. His team's focus is to build and scale LinkedIn's general purpose batch compute engine based on Apache Spark. The team empowers multiple use cases at LinkedIn rang...