Netflix personalizes the experience for each member and this is achieved by several machine learning models. Our team builds infrastructure that powers these machine learning pipelines; primarily using Spark for feature generation and training. In this talk, we discuss two different Spark related problems and their solutions. – Memory optimization: our spark jobs can consume upwards of ten terabytes of memory. To optimize this memory footprint, we implemented an in memory cache that shares resources between different tasks in the same executor. We achieve this by using a combination of singletons, broadcast variables and task completion listeners. This implementation reduced the memory footprint by more than 20 percent. – Reliable metrics: we make use of several metrics that help debug different stages and sub-components of the pipelines. Spark accumulators can be cumbersome to use for metric computations, especially in scenarios where the spark process fails mid way during execution. We will describe solutions using spark listeners to solve this problem. This metric granularity helps reduce the time to debug complex jobs.
– Good morning, everyone.
This developer is stuck just like you and me, in multiple lockdowns.
First one is the COVID-19 lockdown that is on everybody’s mind. And the other one is the Spark APIs knocked down. There are some who believe is lockdown should be ended and there are others who want to keep these lockdowns.
And irrespective of their beliefs, there are many who have learned how to work with these lockdowns. It might be the essential service workers or it might be you who has to go out to get groceries and the most important thing, the toilet paper.
I don’t know about you, but my team and me are definitely in this category, we have learned to work with the Spark APIs lockdown, and today I’m gonna share two such stories of how we hit the Spark limitations, and we worked around those.
For me, this acceptance of working with the lockdowns comes from the fact that I love to meditate, I also like to read. I’ve worked at Apple, Sumologic and Amazon before. And right now, I’m working as a Senior Software Engineer at Netflix. Netflix is a video on-demand streaming service. We have more than 180 million members. And in 2019, we added more than 1000 original titles to our catalog. We use machine learning models to generate recommendations. And these machine learning models are data hungry.
Let’s see what sort of data are they looking for? The first data is the video metadata. Let’s think about “Stranger Things.” In “Stranger Things,” there are multiple seasons and each of those seasons has multiple episodes in it. This information about how many seasons it has, and how many episodes does each season have is contained in the video metadata. Today, we are going to assume that this video metadata or this entire information about the video data is 10 gigabytes, exactly 10 gigabytes. And whenever required, we are going to load this information into the JVM, because it can be. We don’t want to pay the network costs of having to make a network request to actually access this data. The second data is about the members. There are more than 180 million members and for them, we have their viewing history, we have their thumbs ratings, whether you give thumbs up or thumbs down to “Stranger Things”. This data is in terabytes, it cannot be loaded into one JVM.
Let’s see how this data gets used to generate the recommendations. Viewing history and video metadata combined are used to generate the features online. Once we have the features, then we can compute the recommendations by scoring on those features. Once we have computed the recommendations, we can show those to the user. While we were doing the online feature generation, we take snapshots of the member data and put it into the historical data store.
And from there, we can recompute the features. Why do we do this? Because this helps us improve our explorations speed. We can change a future, we can add a new feature. Once we have done that, we can compute new features. And once we have computed the new features, we can train the model and we can send the updated model to recompute the fresh recommendations. Today, our focus is going to be this offline feature generation Sparks job. And to know more about the limitations that we had during this offline feature generation Spark job. We need to know what it did, what it is trying to do? It is trying to replicate the online features generation Spark job, sorry, the online feature generation job and this replication is what we are trying to achieve faster. Let’s dig more into the online feature integration job.
Let’s visualize it on a timeline. So we have on a timeline, let’s assume that this is a 24 hour timeline. And from there, we want to compute the recommendations for member one. For the member one, we have the viewing history of the member. And then we want to get the video metadata for the member, not for the member, this is the video metadata, the entire catalog. This is 10 gigabytes. Now we have loaded this 10 gigabytes into the memory. And from this, from the viewing history and video metadata, we can compute features of member one. Why do we have this 10 gigabytes of data sitting in memory? Why don’t we compute the recommendations for member two, three, four? Now, at some point during the day, we would add a new video to our catalog, that we have been using on an average more than two, we are adding more than two videos every day. That just means at some point in the day, it’s inevitable we would have something added. Now, that means the video metadata would change and we would have a new version of video metadata. Let’s assume we download this 10 gigabytes again into the memory. And with that, we can compute the recommendations for member five and member six.
The same goes for member seven and member eight with version three. Now, let’s take this data and put it into a dataframe.
We have this data in the dataframe. In this, notice that H one is in entirety, but M one, when we talk about the video metadata, it is only the version of video metadata. This is done because we cannot put 10 gigabytes in each and every row, that’s too much data. While H one is much smaller in size, so we can put H one in its entirety, but M one is just a version.
Let’s load this data into the offline feature generation Spark job.
And with that, we want to now look at this problem from the perspective of Spark. We know what we are trying to achieve, we need to know what happened in Spark.
Let’s take it from the top, we have the driver and driver is trying to communicate with the executors. Within each executor, we have multiple cores. So there are three cores. Within each core, all the time we have running tasks; task one, task two, task three. And when we talk about the task, within the task, we are trying to run over a partition of the data. From the previous dataframe, we have H four M one, H five M two. Now, we said M one is not in its entirety. So we have M one, just a version, so we need M one somewhere in the executor. Let’s say we have put this M one right there in the executor and then we can access it. So the M one can be accessed. With this, the same thing applies for M two. So we have M two sitting in the executor’s memory and we can access it. And with that, we can generate the features of number four and number five.
This is bit different from the online feature generation. In the online feature generation we had only one of these versions in memory at one point. Either it was M one or it was M two, or it was M three. But now we already have two versions in memory. That means instead of consuming 10 gigs, we are already consuming 20 gigs. Our task today is to optimize this memory consumption without having an impact on the performance. With this, let’s dig deeper into the executor, let’s dig this executor and see what’s going on.
At this instance in time, this executor is running task one, task two, task three. And we have ordered the three versions that we need in memory, M one, M two and M three. Let’s assume that tasks one needs access to M one, task two needs access to M one. So both task one and task two need access to M one, task three need needs access to M two.
If we see with closure, we have both copies of M one in memory. That just means that you’re wasting 10 gigabytes extra. We could have done, on the left hand side, we could have done with 20 gigs, but we are consuming 30 gigs. So let’s take the second option. Let’s try to broadcast the data, which is a second memory option that Spark provides. We have 30 gigs, again, because same rule is going to need access to M three. So we broadcast everything, M one, M two and M three. On this executor, at this instance, M three is not required. But with broadcast, we have to broadcast everything. First of all, we cannot broadcast 30 gigs, but let’s say with lazy loading, we can get 30 gigs in the user memory. With this, again, we are consuming 30 gigs while we could have done the 20 gigs by just having M one and M two but M three is also coming along and that’s wasting memory.
So the Spark limitations that we are hitting are, the first one is that Spark has limited memory sharing options. And the second one is, there is no option to… So the second one is, Spark does not let us control very a particular task nodes. So if we could choose that, we could say that all of the tasks that need access to M one, go to the first three executors, all of the tasks that need access to M two, go to the next three executors. For M three, go to executor seven, eight, nine, 10 and that would have worked. But Spark does not let us control where a particular task nodes, it’s very random.
But these are the Spark limitations, what we do get are multiple data wrangling tools from Spark and they are repartitioning, materializing the intermediate dataframes and sorting within partitions. In today’s talk, we’re going to try all three of them.
The first three approaches that we’re going to talk about, they don’t work. The fourth one is standing towards a solution, but it does not work fully. Approach five is the one that is going to work. So pay attention to approach four and five. On approach one, we have taken the dataframe that we had initially and they’re broken down into three partitions; partition M one, M two and M three. For partition M one, we are going to try to put this into an executor, and we have task one that is trying to execute partition N one. As we can see, partition N one needs M three, M three and M one. So in this case, first we download M three into the memory. Once we reach the row three, where we have H one and M one, we will download M one. Now this is 10 gigabytes of data. Downloading this data into memory is going to take finite amount of time. Let’s assume that’s one minute into this talk. Let’s assume that whenever we download this new version in memory, that takes one minute. That’s if you have more rows in this partition, we’ll be switching from M three to M one to M five to M seven, and then back to M one. That’s inefficient. Let’s see what happened on other tasks. On task two, we need M two and then M one and task three, we need M one and M two. This, all of this is inefficient. And again, we are still consuming 30 gigs. Let’s see if we can actually consume just 20 gigs of executor memory. We’ve got that event, with this, we’re consuming 30 gigs and we are switching between versions on the same partition. So let’s try to repartition this data. But this we are going to assume that we are partitioning by version and we have broken down the data for version one into two partitions because it’s too much data. So we have partition R1A and partition R1B for version one. For version two, we have partition R2A and so on.
So with the repartitioning, let’s try to use the closure approach, the first approach that we saw. Okay, task one is trying to access M one, task two is trying to access M one because both of them are working on partition R1A and partition R1B, so they’re working on data for version one. And task three is working on M three, here’s M three. Again, we are still consuming 30 gigs of data our wider task is to go to 20 gigs because M one is replicated twice. On top of that, we are paying an extra penalty of repartitioning the data. Repartitioning is slow, so we don’t want to do that. But with that, we do see that we have stopped switching between versions.
Let’s go to try this a different way. In this case, we are going to materialize the dataframes. So we are going to split the execution by version. So what we will do is, we have M one, every task that needs M one, so every partition that has version M one, will execute that first, and will materialize that dataframe. So we have stage one, in that case, we’re broadcasting M one to all of the executors, and then we work through task one and task two which are partition R1A and partition R1B. In this case, we can see that one third of the executor is empty. We have three cores on this executor, and only two of those cores are getting used, the third one is empty. Because we do not have any more work to do right now. And then we have stage two. On stage two, we are broadcasting M two and then we are working on partition R2A and task one. In this case, two thirds of the executor is empty. Because while we could have worked on the partition is for version three, we cannot do that because we need to finish off M two. This is highly performance efficient, oh sorry, highly performance inefficient, while it is highly memory efficient. In this case, we are just consuming 10 gigabytes of memory in each and every step, we’re not going above 10 gigabytes. But this resource wastage manifests itself as performance in efficiency. In this case, the entire Spark job takes about 10 hours while the filing solution takes about one hour. So we did not like this approach, we moved on to a different one.
At this point, we gave up on all of the options that Spark provides us for memory management, and we decided to implement our own. This custom memory management, it’s an idea that sits between task one, task two, between the tasks and the user memory. So task one is going to exist, talk to this custom memory management and say that, “Can I get access to M one?” This custom memory management says that, “But M one is not in memory.” So it downloads M one, and then returns the pointer to task one. Then task two comes and request for the same thing, for M one. But M one already exists in memory. So the custom memory management just returns the pointer. For task three, the custom memory management downloads M three and returns that. This is good. In this case, we can actually limit the amount of memory that we are consuming by putting a semaphore. So this custom memory management doesn’t say that, “I am never going to have more than 20 gigabytes in memory, “I’m not going to have more than two versions in memory, “that number is set to 20 gigabytes.” There’s only one problem with this approach. It works in every other way that we have to repartition the data. And repartitioning the data can be slow. So we would use this custom memory management in the finance solution, but we did not want to have repartitioning. This particular custom memory management is implemented using the singleton so that all of the tasks are talking to this custom memory management here. Let’s look at the working approach.
In the working approach, we decided to get away from repartitioning the data, we decided to use sort within partitions.
When you’re sorting within partitions, partition N one which we saw, which has M three, M three and M one, now becomes partition M one, M three, M three. So, what we are doing is we are saying that we sought by the versions, the metadata version, M one, M two, M three in that order. And we can use this in conjunction with the custom memory management. Let’s see how that works.
On a timeline, we have partition S one, S two and S three. With that, at this point in time, when we start, all of the versions, oh sorry, all of the partitions, S one, S two and S three, need access to the same version, which is M one. And we can see that task one, task two and task three, all of them need access to M one. Towards the end, as the time moves on, task one needs access to M three, task two and task three need access to M two. Let’s see task three. Task three moved from version one to version two. So it moved from M one to M two while task two, we’re still working on version one because it has three rows. Let’s assume that task two was still working on version one. Now, because task two, sorry, task three has already requested for M two, task two can still use that time to complete working on M one. Let’s see the overall advantages of text. So we have task one, task two and task three, first accessing M one and then task one accessing M three, task two and three accessing M two. In this case, we are being memory efficient, because we can restrict the amount of memory we are using, because we are going through the custom memory management, we are not repartitioning. Sorting within partitions is much faster than repartitioning. Why do we have switching between versions on the same partition, which we have said is a bad thing? This cores is ID amortized.
Because task two and task three are going to need access to the same one. And if task three requested for M two, task two is still going to be able to use the same thing. The efficiency of this system increases as we increase the number of cores. So if we have 20 cores, and just six versions required, in that case or efficiency is very high. While if the number of cores are small, let’s say we have five cores and 15 versions, the efficiency may not be that much. For us, the efficiency was that high that we did not have to even implement the semaphore. So, as of now, we do not have the semaphore implemented.
In this case, our problem statement was that Spark has two limitations. First, is that Spark provides us limited memory sharing options. And second is, Spark does not let us control data particular task nodes.
And what we decided to… We fixed this problem by implementing our own custom memory management solution. Let’s move to our second problem statement for the day. We want to get reliable metrics and improve the efficiency of debugging while working with Spark jobs.
The first thing, I want to give a shout out to my teammate, Dhaval Patel who implemented this.
We’re back to our recommendations architecture. And in this particular section of the talk, we are going to focus on the interaction between the historical data store and the offline feature generation. Let’s see this interaction.
We saw the offline feature generation. And we saw how it interacts with Spark. In this case, let’s summarize this. The first step and this is getting the labels. The second step is to actually get historical data. So the labeled information says, “Did a member watch “Stranger Things” or not?” And if they did, did they like it or not? That is the labeled information. From that, we go and fetch the data for the member from the historical data store. With that, we can regenerate the features as required. Now, when we’re regenerating out of these three steps, there’s only one step that actually ends up materializing the dataframe. That is, this Generate Features.
This helps query planner to optimize the query. Because we are only materializing at one step, query planner can reorder the query. In this case, Generate Features is actually doing a filter based on submitted information and not based on the historical data. But it may decide to move that up. And if there’s a part in historical data that is not used by the Generate Features, but it is part of the final output. In that case, it may move that after the Generate Features, so query planner has a lot of leverage to move things around.
And this query planner reordering is going to cause issues, we’ll see that. In the meantime, I do want to highlight one more thing, we do accept a slight amount of data drop. What does that mean? Let’s say we had 100,000 rows in the first step, in the label information. Now, a historical data store is built on top of a cache, so we are caching and that cache can sometimes lose tiny amounts of data. What if you say that 100 rows dropped out of those 100,000? It’s okay. We’ll move on. We’ll continue with the process. But if we lose 20,000, rows, that’s 20% of the data drop, we do not want to continue. We want this job to fail.
And this is a problem in Spark, why? In Spark it’s zero or one, either out of the 100,000 rows come out or nothing comes out. So, you may say, “Why don’t you use Dark Map?” Yes, we can use Dark Map, we do. But at the same time, we would want to record the sample failures. If 100 rows drop, we want to know which 100 rows dropped, if 20,000 rows drop, we just want to know some samples, let’s say 500 of those.
We do not want to record all of the 20,000, because that would pollute the logging. And Spark does not make that easy, Spark does not make logging sample failures easy. Let’s see overall, how does the Spark job look like if we have this offline feature generation, how does a Spark job from that look like? This is one of the sampled Spark jobs that we have. In this case, we have more than 70 stages. And remember what we talked about, query planner may have reordered stuff. We don’t know where each of those steps happen. If parts of Generate Feature happened in the first 20 or was it just a label information? So let’s assume that this Spark job fails and we want to debug why it failed. Due to the query planner’s optimization, this is going to be very hard and we cannot debug. On top of that… We cannot debug easily. On top of that, we do push out accumulators in each of these steps, but Spark does not provide us.
It’s very hard to know what was executed and Spark does not have a consolidated view for the accumulators. So at each of those steps, we are pushing out some accumulators. And we want to see those accumulators but with 70 different steps, we have to click through each one of those to find out where those accumulators manifested, we cannot see them in one view. And that takes forever to debug. On top of that accumulators are not trustworthy, they can double count.
And we already talked about this one, that there is no easy way to record sample failures. At this point, we decided to implement our own solution for this.
And in this case, we decided to implement our own solution. From the drivers, we sent the accumulators to Kafka and then to Hive.
In this case, the accumulators that we are sending,
we are only sending when a stage actually changes some accumulators. We do not send from the stages that do not have any accumulators. From the executors, we record the partial data failures and send those to Hive. With this, we are able to exhibit a consolidated view. We can just enter the application ID and know what happened. We can see what stages got executed. We can see what steps got executed and get the accumulators (indistinct). And at the same time, if the accumulators show that there was a large number of failures, in that case, we can actually get the partial data failures. On top of that, when we talk about the accumulators, we said they can double count. So we do not really trust the accumulators as such, we actually look at the accumulators ratios. Most of the time, when the failures happen, they happen together, and the double counting also happens together. So the ratios are way more reliable than the exact accumulator values. There’s another thing that we do, we send the metrics from the executors to Atlas. Atlas is the Netflix open source time series database.
The advantage of sending these metrics to Atlas is now, if there are multiple Spark jobs, it can be the software feature generation, and there can be multiple flavors of those for different purposes. In that case, we can actually bid metrics and crafts on top of multiple jobs. So having all of these metrics in Atlas, provides us more debugging information, if the failures are happening across multiple jobs.
So the implementation of this is done using the Spark stage listeners and task listeners. As I already said, we send information from the driver to Kafka, if there’s a change in the accumulators, so we are deduplicating the logging.
Here’s an interesting gotcha, Spark listeners catch throwable, but that’s a story for some other time.
What are the advantages that we see of this approach?
That we can have a consolidated view of the logs and accumulators. It’s easy to detect what pieces of code were executed. And this overall has reduced us debugging time significantly. Your feedback is important to us. Please do not forget to rate the session. So with this, I want to remind you what we talked about. We talked about three different Spark limitations. The first one is that Spark has limited memory sharing options. The second one is that in Spark, we cannot control where the task nodes on what executor does a task node. And the third one is that it can be hard to debug in Spark,
because Spark does not have the tools that we needed. So what we did is we implemented these to solutions that you see in front of your screen.
What questions can I answer for you?
I work as a senior software engineer in the Personalization Infrastructure team at Netflix. I work on distributed systems and big data, currently focusing on storing and querying petabytes of data. Previously, I have worked at Apple, Sumologic and Amazon in similar roles.