In this presentation we want to share our experience in migrating Spark workload for one of the most critical clusters inside Pinterest. This includes two important changes in the software stack. First, the storage layer is changed from HDFS to S3. Second, the resource scheduler is switched from Mesos to YARN. We will share our motivation of the migration, experiences in resolving several technical challenges such as s3 performance, s3 consistency, s3 access control to match the feature and performance of HDFS. We make changes in job submission to address the differences in Mesos and Yarn. In the meantime, we optimized the Spark performance by profiling and select the most suitable EC2 instance type. After all, we achieved good performance results and a smooth migration process.
– Hi, everyone, my name is Xin Yao, along with Daniel. Today we like to take you along with the journey of migrating Pinterest Spark Cluster.
A brief introduction about ourselves. My name is Xin Yao, I’m a tech lead at Pinterest for Ads Team. And previously, I had spent many great years at Facebook and Hulu. A lot of my work was focused on building large scale distributed computing systems. – And this is Daniel Dai. I’m a tech lead at Pinterest data team. I am a PMC member for Apache HIVE and Pig, and before that, I was working in Cloudera and Hortonworks and Yahoo.
– Today in this talk, we will cover the following topics. First, we will share the high level overviews of our Big Data Platform. Learn the performance of Spark join in this platform. Next is the S3 eventually consistent model we have been dealing with, then we will talk about difference between primary storage HDFS and S3, the scheduling system methods and Yarn, we will conclude our talk with how Spark has been used in Pinterest.
First, let’s take a high level overview of our big Data Platform in Pinterest.
From search to eyes, machine learning to recommendations, nearly every team within Pinterest relies on the Big Data Platform in some way or another. The Big Data Platform team is responsible for providing a stable, reliable and efficient ecosystems for engineers and analysis to manage their and transform their data. Pinterest has been building our own in-house Big Data Platform leveraging open source projects. Projects we use HDFS for storage, Mesos and Aurora for resource management, and scheduling Spark HIVE and Presto for data processing, cataloging.
The Big Data Platform, and Pinterest is constantly evolving in order to fulfill the ever changing needs of our users and do, and to stay up to date with the latest cutting edge technologies. Over the last year, our team has been working on migrating to our next generation Big Data Platform. As shown in this chart from the high level, the most significant changes are changing our resource management and scheduling system for Mesos we are and switching our primary storage from HDFS to S3. Now the migration has finished with great improvements in money fronts from performance to efficiency, from viability to cost of savings, we will share our wins and learns in this talk.
As any Big Data Platform, performance always plays one of the most important roles, we have been striving to provide a platform that could run jobs performance only and efficiently.
As we were building our next generation Big Data Platform, improving performance and efficiency is one of the most important goals. We can’t improve anything if we don’t know what’s the bottleneck of our old cluster. So the first thing we did was to identify performance bottlenecks of our old cluster.
We took a data driven process to find the bottleneck of our cluster by looking through all the dashboards metrics, we had both internally and externally, we had identified the system wide performance bottleneck for our cluster is local disk IO. As shown in this chart, not having enough local disk IO often slows down Spark jobs due to slow shuffle, which in turn makes the job and the whole workflow slow, which in turn makes the IO results even more scarce. And as you can see, this is a performance downward spiral that no one wants to see in their system.
To understand the program more, we need to first understand why local IO is important for Spark performance. So local disk are typically used by several different ways. Spark write shuffle data to local disk, and then Spark write also read local disk to serve shuffle data for reducers. Spark also spills data to local disk when memories not big enough for some operations, like sorting.
Let’s take a look at this problem through a simple example. This is a simple aggregation query, essentially what it does is to coordinate a table to get distinctive ids and the max value of each id.
The core execution will have mapper phase, shuffle phase and the reducer phase. Let’s say we have 9K mappers and 9K reducers. Each mapper reads one turn of input processes and store the shuffle data into local disk. There is a shuffle phase, the go of shuffle is to move the same ids to the same reducer. So each reducer will compute the maximum value of each id. In order to do so, each reducer will talk to all the 9K mappers and asked for the ids this reducer is responsible for, so in total, all 9K reducers talks to all 9K mappers, there will be 81 meeting connections.
One machine can run multiple mappers at same time, so if we zoom in and only focus on one map, one machine, in this case, this machine has 30 mappers, all those 30 mappers share the same local disk of that machine. One mapper writes the shuffle data once, then read the 9K times or 9K reducers. So in total, 30 mappers need 270K IO operations. The only those operations happen around same time because each mapper processes roughly the same amount of data and finish them around same time. That’s too many operations for our machine in too short timeframe.
So how do we optimize this job in our own cluster?
We have done a lot to optimize our jobs, one of the most helpful optimization we did was to tune the number of mappers and reducers in order to combine small data operations into big ones. Here are, here is the, as an example, we reduced the number of post mappers and reducers from 9K to 3K.
Again, if we zoom in and only focus on one machine, in this case, this one machine has only 10 mappers, all those 10 mappers share the same local disk of this machine. one mapper write the shuffle data once and read it 3K times for 3K reducers. So in total, Spark will do 30K local disk operations, which is nine times better than before. And one thing to notice here is that reducing the number of mappers and reducers also reduces the parallelism of the job. If that is too low, job won’t be able to fully utilize all the assigned resources which could reverse the performance.
Last year, we spent a whole month to optimize one of our most critical jobs. The operation optimization result was awesome with four times improvements, but it required a tremendous amount of managing. Those two (murmurs) don’t scale very well with data volume increase and they couldn’t be easily applied to other jobs. So not highly enough local disk IO how to make Spark jobs really hard to optimize and having system wide performance bottleneck really put a heavy toll on every job running this cluster.
New cluster, so when we weren’t building our new cluster, we were determined to move, remove a cluster wide performance bottleneck, so that each team could focus more on building impact products instead of dealing with performance issues.
With the performance bottleneck came on, we do multiple rounds of testing, trying to find the best hardware configurations for our Spark workload. Eventually, we had to set it all up all easy two times our 5D is local disk optimized which brought the local disk IO per second, from 3K to a whopping 40K.
Our jobs really loved the new cluster, especially the new easy to instance time, jobs have seen 25% of runtime improvement on average, that’s without an actual results or tuning. What shuffle heavy job even costs 35% implement from 90 minutes down to 57 minutes. Out of all the cluster wise influence we have done of mining local disks IO was definitely the biggest to the computer.
If there’s only one thing that I hope you could remember
off from this talk, that is measure before optimize, as I was stating that. Premature optimization is the root of all evil. In our case, we did investigations and tests with our own workload in our cluster. So we didn’t guess that bottleneck or instead, numbers told us that local disk IO is the bottleneck of our cluster. So switching to local disk optimized instance type is a perfect option for our workloads. Do the task with your own workload in your own cluster, identify bottlenecks of the workload and try to optimize that.
In terms of optimizations, there are three levels you can do, cluster level, Spark level and job level. So you can optimize Spark at a cluster level to benefit all the workloads running in this cluster. Our example is improving local disk IO schema, 25% performance boost our average for all the jobs without any actual results performed. Next is the Spark level, seems like tuning the number of mappers, reducers, CPU memory, doing so could yield a greater performance improvements without knowing much about the business logic of the job. So last but not least, is at job level, so you can go and understand business logic of the job and try to simplify that by perhaps removing one unnecessary join or aggregation.
Next, I’d like to talk about the S3 consistency model we have to work with for the next generation cluster.
One big change for our new cluster is switching the primary storage from HDFS to S3.
Those two storage systems are different consistent model. HDFS is the file system, that is strong consistent. Changes are immediately visible, on the contrary, as raised the object storage service that is eventually consistent, there’s no guarantee or change is immediately visible to the client. This might cause missing file issue without user even know it.
Here is an example of read after write consistency for HDFS and S3. In this chart, from left to right is the timeline, from top to bottom on the right are clients, HDFS reader and S3 reader. In this example, read a client send a color to green to both HDFS and S3 are key one. For our constant read, HDFS read clients at any given time after write complete. They’re always read the color as green for eventually consistent reader, S3 reader must return now at T2, the green RT3, the now, the other RT4 eventually, they will always return the red color green.
What does this mean, Spark? Here’s one example, a Spark job writes out five files in the same folder, but with another Spark job trying to read those files shortly after creation, that’s remain only written four files instead of five. In this case, the reader Spark job doesn’t even know it, it missed one file.
So the next question is, how often does this happen? In general, the chance of inconsistency of listing of S3 folder is super, super small. S3, AWS team had conducted a test, showed the chance of inconsistency happens less than one in a million times. We also conducted S3 consistency tests internally, the chance even less than one out of 50 million.
When we were designing our solutions to this problem, there were a lot to consider and balance.
Two most important ones are write consistency and read consistency. Write consistency is whether the job could write output consistently without missed, duplicated or corrupted files. Read consistency is whether the job could read files in a folder, no more or less line than it supposed to read. And the consideration list also continues.
As you can see here, there’s a lot to consider.
So we have compared a lot of different kinds of solutions, ranging from simple ones to really complex ones. I won’t take too much time to cover each one of them. The quick takeaways, S3 is not an HDFS and there are a lot to consider when you want to switch from HDFS or S3, especially how to deal with the consistent model change. You can use this table later as a reference to making your own decision when it comes to dealing with S3 consistency models.
To solve this issue, we took a hybrid solution that combines three different tools. We use S3 committer, to improve write and consistency, then we’ll cover the difference between different committers later in this talk. We also use number of file monitor to make sure mission critical jobs don’t miss any files when reading. In parallel to this, we widely use data quality frameworks to ensure that data has high data quality standards. This helps us to prevent data issue introduced by all different kinds of reasons, including S3. This approach works well for our current jobs and doesn’t require a ton of efforts to implement. However, this approach needs workflow owners to be aware of this issue and they need to adopt those two in their workflows. For our long term, we are looking into implementing a sophisticated solution to solve this problem systematically, such as work from Netflix and Data Lake from Data Bricks.
With that, Daniel will talk about the difference between two of our primary storages, HDFS and S3. – Okay, thanks, Xin. Now let’s compare the performance between HDFS and S3. in our cast, HDFS and S3 achieve similar throughput.
However, metadata operation is much slower on S3, especially for move. S3 move operation is essentially a copy and then delete. Unfortunately, we use a lot of more operations in Spark. This is especially obvious for applications with simple logic but produce a lot of output. For example, one of our Spark streaming application, which persists Kafka stream to the storage. On HDFS, the micro batch runs time is 30 seconds. For our S3, it is increased to 55 seconds. Most of the time, the application is doing nothing but moving the files around, this cost of micro batch pipe up.
Let’s take a closer look at what outsource move operations. The first to move operations are common to every Spark application, that is, after the task finished successfully, committer task will move file from task attempted folder to the task folder. Then, after the job finished successfully, committer job will move it again to the job output folder. This is necessary because we don’t want the result of a failure job or task be visible.
In addition, some application need more move operation. For example, in our dynamic partitioning table insertion, Spark will save the job output to a staging folder before mode output files through the HIVE table location, this is because Spark need to figure out which partitions are newly added by the Spark job so it can add the patching metadata accordingly.
However, we cannot afford those move operations, we need to find some optimization. There are several existing solutions for the first two move operations, file output committer algorithm 2, skip that job level move operating. It moves output file from task attempted folder directly to the job output in committer task. So we don’t need to move it again in committer job. This remote the job level move operation, the task level move operation is still there. However, compared to job level move operation, task level happening parallel, so it is lesser a problem. Further, inside Pinterest, we already have our director output committer used in mapper-reduce, which also removed the task level move operation. Direct output committer write the file to job output directly for every successful task attempt. However, the first S3 approach suffers some degree of data correctness issue. For file output committer algorithm 2, if the job failed and the cleanup is not done properly, successful task output will leave there, so downstream job will read incomplete result. For direct output committer, if file, if job fail and cleaning up is not done properly, downstream job will read incomplete or even corrupted output. A better solution is Netflix S3committer, which uses S3 multi-part upload API to upload files directly to job output location without leaving incomplete or corrupted output, even if the job fail. We also realize as a more sophisticated solution such as Netflix iceberg, which is the successor of S3committer. However, the focus of iceberg is to provide the transactional support, which we don’t really need in our use cases and also S3A committer from recent Hadoop distribution. However, it is tightly integrated with S3 guard, which adding extra operational complexity, so we decided to adopt S3committer, which solve our need, but still simple enough.
S3committer make use of AWS S3 multi-part upload API. It consists of three stages, first, user need to invoke, initiate multi-part upload to instantiate, then multi-part upload the part API, which actually upload the file to S3. Finally, a lightweight complete the multi-part upload will tell AWS to weaving the pieces into a complete path and make it visible. If anything goes wrong, user will invoke abort multi-part upload to remove the partial uploads. There are two operational tricks for multi-part upload API work, first, unfinish the path files are saved in a separate staging area of the S3 bucket until complete the multi-part upload or abort the multi-part upload is invoked. It is not uncommon Spark application die in the middle, so neither complete or abort multi-part upload is invoked. User cannot see partial upload using the usual S3 command, but AWS will still charge you for the partial uploads. So it is important to set up S3 lifecycle policy to remove partial uploads after a certain time. Second, abort multi-part uploads, require a separate S3 permission, we need to explicitly grant the permission which we often miss in practice.
In the task, S3 committer will write the output to the local disk. After the task succeed inside committer task, S3 committer will invoke upload path API to upload the output from local disk to S3, however, the output is not visible yet, S3 committer will invoke complete the multi-part upload later in committer job after all tasks finish successfully. This two phase committer like approach will ensure there’s no corrupted or incomplete file if the application fail, and in the meantime, we don’t need to move any files. In addition, S3 committer also upload the path files in parallel so we can achieve better throughput the, HDFS.
Still, we have one more move operation remaining for dynamic partitioning insertion. It is required because we need a way to track the new folders added by the Spark application and add partition metadata to HIVE meta store for those folders only. For example, the Spark application produced a new folder, The S equal two, 2020 January 12th in the staging directory. By listing the staging directory, Spark can pick out the new folder, the S equals two, 2020 January 12th, and add Apache metadata to HIVE meta store, along with moving the folder to table disk, through the table location. To get rid of this move operation, we change S3 committer to write a table level tracking file to record the partition generated by Spark job. After job finish, we can read the tracking information, we can read the tracking file to find out the same information. However, if multiple Spark applications are writing to the table at the same time, there might be conflict on tracking file. We haven’t found a general solution yet, instead, we enable it in certain applications which we know there’s only one application writing to the table at any given time.
With the removal of move operations and increased upload throughput, the micro batch run time decreased from 55 seconds to 11 seconds, which is even better than HDFS.
By switching to S3, we also encountered some S3 specific issue, rate limit 503 error is a common one, AWS support can partition the S3 bucket and the prefix. They can even set up a cron job to partition a future prefix which doesn’t exist yet. However, we don’t feel this solution is a reliable and the implement our own task level and a job level exponential back off logic in S3 committer. We also make the parallelism of part file uploads tunable, so user can tune their job to maximize the throughput, and in the meantime, avoid exceed S3 limit.
Here is a list of improvements we made on top of Netflix S3committer. Besides rate limit and the parallel upload the file, carrier path file upload we just mentioned, we also add the ETag check for metadata upload to make sure the integrity of the file, we fix the thread pool leaking for long running applications. We also remove the local output generated by the task as soon as upload part is finished so we can save the local disk usage.
The biggest benefit by switching to S3 is the cost saving, we’ll reduce the cost by 80% compared to HDFS. In the meantime, S3 achieve better SL. The cleaned SLO for S3 is four, nine for availability and 11, nine for durability. In comparison, our HDFS can only achieve three, nine availability in our current support level and that the difference of durability is even higher.
Besides the storage, the new cluster is also different in resource scheduler, the other cluster is using Mesos and the new cluster is using Yarn.
There are a couple of things we miss in Mesos. First, we can manage it every service inside Mesos, but in Yarn, running service, such as zookeeper, HIVE meta store are separate entities. Mesos plus the Java sub-metering system, Aurora provides a lot of functionality. It can support a simple workflow, long running job and the cron job very well. It supports rolling restart and it has built-in application health check. All of those are missing in Yarn and we have to rely on external workflow system such as pinball and Afro, and sometimes the customized scripts to achieve those.
There are a couple of things we like in Yarn, it provides global view of all running applications. It has better queue management for organization isolation, and more importantly, our other clusters are already using Yarn.
With better visibility and the queue isolation, we can use a resource more aggressively with Yarn, the computer cluster cost less compared to Mesos.
Before we end the talk, I have another two slides to show the current status of Spark adoption inside Pinterest.
We are still in early stage among all load of Hadoop cluster. Spark represent 12% of total resource usage. We have a few Spark streaming use cases, but mostly it is for batch processing.
We are actively migrating our high workload to Spark SQL. In the meantime, we are looking at ways to migrate our cascading/scalding job to Spark, we are adopting Dr. Elephant for Spark. In our code review process, we integrate a stock elephant with our internal metric system and also include the several features from Spark lens. As a side effect, Dr. Elephant, increase the load of spark history server significantly, and we are in the process to improve the history server performance.
Daniel Dai is currently working on data processing platform in Pinterest. He is PMC member for Apache Hive and Pig. He has a PhD in Computer Science with specialization in computer security, data mining and distributed computing from University of Central Florida. He is interested in data processing, distributed system and cloud computing.
Xin Yao is a tech lead at Pinterest. Xin is passionate about building and scaling distributed systems. Xin was priorly at Facebook Spark team, focusing on improving spark performance and efficiency. Before that, Xin worked as a Senoir Software Engineer at Hulu, where he maintained spark, built and scaled multiple batch/realtime pipelines. Xin received his master from Beijing University of Posts and Telecommunications in 2013.