Parallelization of Structured Streaming Jobs Using Delta Lake

Download Slides

We’ll tackle the problem of running streaming jobs from another perspective using Databricks Delta Lake, while examining some of the current issues that we faced at Tubi while running regular structured streaming. A quick overview on why we transitioned from parquet data files to delta and the problems it solved for us in running our streaming jobs. After converting our datasets to Delta Lake. We will then explore techniques in which we can maximize the cluster utilization by submitting multiple streaming jobs from the driver to run in parallel using scala parallel collections. We’ll discuss techniques to write and implement idempotent tasks that can be parallelized. In conclusion, we will discuss an advanced topic on running a parallel streaming backfill job and the nuances in handling failure and recovery. Demos using databricks notebooks will be shown throughout the presentation.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Good afternoon. This is Oliver Lewis from tubi.

Parallelization of Structured Streaming Jobs

And we’ll be talking about Parallelization of Structured Streaming Jobs using Delta. So the first thing that we wanna talk about is what is tubi? So tubi is an ad supported video on demand service. We’re one of the largest online catalogs of free content. So like Netflix, the bulk of our revenue is attributed to content recommendations that make personalized experiences for our viewers. To power up our recommendation engines. We continuously process a large streaming data set of customer events. And I’ll show you in a little bit what these events look like. But roughly we get about 40,000 requests per second, which aggregates to about 800 million records per day. And in terms of our data size, that equal equals about 500 gigabytes per day. So when you’re thinking about these events, we have like a pretty robust architecture to ingest these events. We get all these client events hitting our backend service. Which then sends it to a Kinesis Queue, which then hits our enrichment service. Where they are joined with a lot of datasets. And then those data sets are finally ingested to from a consumer spark. The way we’ve created the job is it’s a streaming job that reads these events and writes it into our data Lake. The initial problems that we had with that was, that our event out here is an immutable event, right? We have the language Symantec is that we have a person watching content. So this helps us make it very extendable. And the way we wanna have is like an expressive vocabulary that describes a user action. So if you just go to another example, if you see this example out here, if you have a person watching content on a platform, and that helps us design easily immutable events that can be changed and tweaked. That will eventually reach our data Lake. And we can generate insight from them. When we have, the pipeline design like this, we have a spark that’s running as a streaming job that has a trigger timestamp. And the trigger kind of has issues with file sizing. We tend to have too many small file sizes. And this was something that we were always facing a problem with. And we needed to find a better way to resolve that. I’ll be showing a bit how Delta really helped us solve that problem. Another problem was Data Deletion process. So anytime we needed to make changes to that data. And we needed to reprocess the data. We would have issues with the streaming jobs or managing the state. And we needed to kind of rerun the entire script all over again. Multiple Streams writing to the same location. This was also not easily possible with the first version of Structured Streaming. That’s just because we could not have the folders being updated at the same time. We had like a hacker way of using separate checkpoints to maintain these different streaming jobs. But it still wasn’t a very straightforward method. And that is something where we started using Delta. And Delta came with its own set of features also, that really helped us make our decision to using Delta. The first thing that it solved was we were able to optimize the table and vacuum all of SD files, without interrupting the streaming jobs. We also did not need to worry about the transaction consistency of Delta. Because long running deletes or changes to history were able to be done without impacting the streaming data coming in. And also as most of you know, with the GDPR and CCPA changes that are being done, for most of the data deletion right now. We were able to support that using Delta just because of its transaction lock capabilities. Another thing that I’ve mentioned in the previous slide about not being able to run multiple jobs in parallel. Because of the spark metadata issues that we had to deal with and consistency issues. We do not need to worry about that anymore, because we get full fledged transaction log capabilities using Delta log. So that was another really good reason for using for the entire transition into Delta.

Example of a simple structured streaming job

And, this is actually where we start our presentation in the actual detail, because this is an example that we’ll be using for the rest of this presentation. It’s a real time streaming session job, which will trigger every 10 minutes. Let us imagine we have written this kind of standard code and we test it, we write all our unit tests and we deploy it into production. Everything works great. We are content and there are no failures in production, the cluster size is right. We are able to let process records in a timely fashion real time enough. But this job is still lacking something. And that is, the business team is not able to get any kind of historical reports, right? Because there’s no comparative analysis being done for our data set. So now we need to figure out a way to backfill the data. And that is actually the main thing that we are trying to solve in a very structured and supported manner using Delta. Because this was not possible with the first version of the way we had written our code, but it’s kind of easily supported with Delta. So let’s talk about how we manage that with Delta.

Strategies to Backfill

So when we’re trying to strategize to do a backfill job. The first thing that comes to mind is let’s write a batch job that would, to perform the backfill. However, writing a batch job would require a pretty major rewrite of our state management code. As I described earlier, right? Like sessionization job is using state for management. And if you have to rewrite this job into the batch mode operation, it would require us to rewrite all of that state management logic into a separate batch mode, support and logic. And it’ll also requires us to maintain two separate code bases. So this was actually one of our first constraints to keep the job as a streaming job. And not have to rewrite the business logic. So to keep the job as a streaming job. We would still need to figure out a way in which we can gracefully terminate the job on completion. Because as most of you all know, with streaming jobs. The only way to dominate a job is to receive some kind of a termination signal or some kind of some monitoring signal that will say that “Okay, our job is terminated. We need to kill the job now”. But luckily for us spark gives us a useful method for doing this by using trigger at once. Which will load the entire data frame, run the job and then stop the stream on its own. Without us having to write special logic to terminate the job. There is one small Gotcha Moment out here is as most of you all know: Do not replace the readStream with a read and the writeStream with a write. Because implicitly flatMapGroupWithState is gonna convert this mapGroups. It’s gonna convert it into mapGroup function and you’ll lose state management entirely. So that’s just one thing you need to remember.

Issues in backfilling large datasets

There are issues in backfilling large data sets. For example, if we said the start date to 2016, first of Jan and the end date to, let’s say current timestamp. When we’ve started the job. There are a couple of issues, the first issue would be the job probably runs for a long time. Where it’s reading in a lot of the incoming data. And it starts executing the logic and the job fails.

And that is something that we really do not wanna do. Because I mean, its gonna be really annoying for us right? We don’t want a job to fail after five or six hours of running. Like if a job wants to fail, you’d rather the job failed right away than for it to run for a long time and consume resources before failing. So that was one thing that we want to avoid. So you don’t want to have a job that has a really large batch size. And I’m using the word batch size because I wanna elaborate on how we define an ideal batch size in a few minutes. The second thing is you must have also noticed is that I’m using state management in the state for the job that we’re using the sessionization. And state management cannot hold such a large state. In fact, the job would fail much quicker. So with that said, we want to think of finding the right batch size.

Encapsulate the Task.

So we wanna be able to encapsulate the data into something that is more easily sensible and more easily triggered and completed. So when we’re trying to identify a small batch size. We wanna be able to encapsulate a data into a task that can be triggered, executed and completed. The other key word to remember is that we should be able to get completed. Because that will enable us to track the progress of the job. Since our data is partitioned by date, we decided to, just keep the granularity of a task at the date level. It is also important that the job is important, at least on the task level and the date partition level. So that reruns of the same job, would not cause duplicates in the output. Another thing that we also need to keep track of is that in case we set up a loop to execute a job. And if there’s a certain failure within a certain task and we read on it, we should not have like duplicates and or any kind of failures in the final output of the batch run. So let’s think about the next thing that we need to talk about, its performance. To make the backfill go any faster. Our immediate intuition is to increase the size of the cluster. So let’s take this example, right? The example on the presentation right now. The image out there is showing 3,886 tasks. And we have 64 cores. I’m just giving you the cluster and the size. It was a 64 node cluster. So it took 64 core cluster, and it takes about 8.2 minutes to run this job. So if you increase this cluster size to have 3,886 cores, we can actually complete this job in about eight seconds. So our intuition that we had at the beginning to increase the size of the cluster was actually correct. However, there are a few caveats to this approach. If we increase the cluster any further, we start getting diminishing returns. And that’s something that I’m also gonna show in the demo in a little bit. But I also wanna talk about how we need to have a job running environment. But also make sure that our cluster size is at the right. Exactly It’s intuitively correct. And the way I like to think about this is like the pizza problem. So, I know it’s a little funny, but when I was trying to imagine this entire thing, I literally thought of the pizza problem. Where you have a group of friends around you and you have four slices of pizza. What we have is spark is really great at utilization . So all the four slice of the pizza are picked up immediately. But we’re still gonna have four hands that are idle. Because there’s just not enough pizza. But if you look at the number of cores, it’s always greater than the task. And you end up having a large cluster that is not being fully utilized. If you look at this legend of this chart, that I’ve zoomed in a little bit. If you look at the CPU is it’s averaging at about 29.9%. Which is about one third of the entire cluster.

And if you look at the chart on the right, you can also see spikes in the CPU usage. So that should be a good indicator for you to realize that maybe the cluster is being underutilized. And the reason for that could be that they are a lot of IO operations and like in real world jobs. There are a lot of IO operations that happen. And IO operations are not CPU bound. They are IO bound. And IO bound operations might sometimes use less number of cores to run. So that ends up leaving a lot of idle CPU cores. So with that said, I then ran another analysis to calculate for a single date partition. At what size do we start seeing diminishing returns? So this is a very arbitrary line that I drew out here. And you can see that, anything to the left of the red line shows a pretty good decrease in the time taken for the job to complete, right? The rate of decrease is much higher. Whereas everything to the right of the lane, shows a slower slope, a smaller slope in the degrees. And that’s kind of my assumption to show that where there is a diminishing return. So when I was trying to decide a single node, if I could run a backfill on a single partition, I would spin up a cluster of 32 cores. So if you just do the math. If I’m running three dates in parallel. I would want to spin up a cluster size of 96 cores. And we will see later on in the demo also, how this kind of fairs and what are the outputs that we see in brackets.

Okay. So let’s move on to the next slide. And this is another metric and performance. When you’re trying to design a cluster. When you trying to design parallel job you wanna have the number of tasks as they increase. You want them to increase in relation to the cluster size in a linear fashion.


Which means that at a certain threshold, if you increase the class size more. You wanna make sure that more runs are running in parallel. Instead of just having a block where things are not increasing in performance.

And that is, also with the next slide where we’re talking in backfill in parallel.

Backfilling in parallel

So as most of you must have already got the context that I am to run this entire thing parallel. And the thing that I put a little importance on was separating the business logic of the code from the execution logic of the code. So when we do spend time with the execution logic of the job, we can also structure the job to be more performant in backfills. Another question that I frequently hear is why not use a technology like a scheduler to handle submitting these multiple jobs. The quick answer for that, that I could think of is that you can actually set up a scheduler like effort to run these in separate environments. But just be aware that you need to set up monitoring for each of these jobs separately. And that is something that you also need to be tracking separately. Whereas in this approach, you can do it within the same context.

So this was actually a funny quote, but not a funny quote. But it’s a pretty nice quote that I read from Rob Pike. Who has done a lot of work with the Golang programming language. And he States that ” Concurrency is not parallelism.” If we’re using Scala’s parallel collections,

to analyze our work. And we have written a really concurrent job. That does not necessarily mean that the spark is going to run this entire loop. And the reason for that is spark uses a first in first out scheduling strategy by default. So even though parallel collections launch these spark jobs in parallel, but the sparks scheduler may not actually execute these jobs in parallel.

So we want to move to another strategy and that is by using Futures and the Fair Scheduler Pool. By default each pool will get an equal share of the cluster, but inside each pool, the jobs will run in a FIFO order. We can obviously tweak these parameters by changing the scheduler mode, the minShare and the weight. Depending on our strategies. But for that example in the demo, we’re gonna be using the default. In this demo I’ll walk you through an example of structured streaming using flatMapGroupWithState. And we’ll go through the different modes of operation using batch parallel collections and futures with regular mode. So let me show you the case cluster that we’ll be using in this run. The first case class is your input data, which is going to be for the input data. Being sent to the sessionization job. The next one is our session state, which has been used by the flatMapGroup at state to maintain the state object. The next one is your output data, which is basically for the imitate records from your sessionization job. There are a few other case classes out here and variables, which I will explain. They’re more like helper variables, which I’ll explain when we use them. The next case class that I wanna explain is actually being used, specifically only because we’re using a windowing function with triggered at once. And that’s because when you do have a window function there could be some sessions that are still active in the state store. And have not been emitted out yet. So at the end of the trigger at once run, you wanna make sure you use the SQLContext to read the checkpoint location and the state folder. So you can read the open sessions that should have been emitted but have not yet been emitted. Because you wanna make sure that they’re added to the output. Just please make a note of that. That’s pretty interesting and important , and there’s a good reason for doing that also.

The next thing is your sessionization function this a really standard function shown in most books out there. It’s that if I state standout, you want to make sure you remove that state and add it to the arbitrator, else if the state exists. You want to update that state. Or you wanna create a new state if it does not exist. And at the next steps would be to update the existing state with the new state waiver. And update the timeout timestamp. The next function that we’re using is gonna be a run streaming query. The run streaming query is using watermark. Just as I mentioned, right? We’re using watermark of 12 hours. So we need to use the state steward writer to flush out, open records. We also grouping by device ID and we’re using flatMapGroupingState with the group state timeout as an event time timeout. And we’ll be parsing the sessionization function to this flatMapGroup at state function. We are also setting the trigger out here as to trigger at once.

The next function is your run state stored query, which is actually gonna be like a post query to the above function. The real reason for using this is you wanna make sure that you append all the open states that are in the state store to the same output location for each run. Okay, so now let’s actually talk about the different modes of operation, right? And the first mode of operation is your batch mode. You can see that I’m iterating over three dates out here, the first, second, and third of May. And we’ll be initializing the output folder. And this is just because of eventual consistency issue in Delta. You wanna make sure that you update the Delta log before you run the individual runs.

Let’s talk about the encapsulation within the dates out here. So if you notice I’m using dates, start map. So I am gonna be iterating with the entire collection of dates, sequentially, and for each of them. I’ll be running a pre query step, which is gonna delete the existing folder. If it already exists. I’ll be running the streaming query as step two. And then I’ll be running a post query step. I mean, all these three steps are going to be run within for each and every date, right? So no matter which date is thrown at this function. I’m gonna be running each of these three queries. So that also helps you visualize that. Okay, if you notice that there’s a pattern to your steps being run and you wanna encapsulate that into a single function. And then being able to map over it gives you a good amount of control over how to paralyze it. And I’ll show you that in a minute, but let’s check out the output of this run. I’m not gonna run this just for time constraints right now. But if I look at the output of this run. You can see that each run is taking approximately a hundred seconds. And the total run is about 318 seconds. So if you check this in parallel, the first things you’ll notice is that the only change that I made was add .bar to the dates collection. Which changes this collection to a parallel collection. Everything else remains exactly the same. The only thing in the output is that now we can see that each of them are running in the sequential order anymore. They are pretty randomized. And the time taken for each individual run is now much higher than a hundred seconds. It’s about 175 seconds. And that is because we are now looking at sharing the cluster between all the different runs. So it makes sense as to why it’s higher. But if you look at the total time taken for the entire job, its almost more than a hundred seconds faster. And that’s kind of be cruel to the slowest job in the run. So this actually gives you a very quick demo of how you can increase your job performance significantly by running it in parallel. Although there are a few caveats to this approach, which I will discuss in the presentation after the demo. But let’s move on to the next mode of operation. And that is using Futures and Scheduler Pool. So I wanted to quickly give you a example of how we can change the level of parallelism in a concurrent chug. So if you notice out you’re by default, the driver, the amount of parallelism is equal to the number of cores on your driver node. So if my driver node had eight cores, I’m gonna get a parallelism of eight. But if you could change that, right. You could also change and tweak the amount of parallelism that you want in your job. And one way of doing that is by using this new fixed thread pool class or function and setting that to the execution context. So that gives you a good way of changing it. Since at my job, we have about three dates. I just set this to a value of three. Of course, if you’re using much larger clusters, you can change it to a more appropriate parallelism size that you have calculated. Okay. So I’m gonna actually run this cell. So while this is running, I’m going to also give you a few things that I found pretty important. So in this, I’m gonna encapsulate the about task in a Future. And I’m gonna be waiting for the result of the sequence of futures at the end. The other change I made is that I set the scheduler pool to the spark context for each individual run. So each individual run is gonna get its own separate pool. So that helps the run get a good amount of isolation from the other runs. And the cluster is able to split its resources for each of the runs. So while this is running, let’s look at the SparkUI. If you look at the stages right now, you can immediately see that the Fair Scheduler Pool. Has three more new pool names pool zero, pool one and pool two. And the minShare, the pool weight, the scheduling mode, are all defaulted to the default values. I haven’t tweaked them yet, but you can obviously change them.

Let’s click this again. And this is actually why I wanted to show you. If you look at the running tasks out here, they’re all 32 ,32 and 32, which is exactly what I described earlier, right? That we are able to evenly split the resources. They split the cores between each of the runs. And that is something I found pretty useful. If you have some jobs that might be hogging resources. So if you look at the run out here for pool zero, it’s a little ahead. Let’s see if I can show you another part of the demo.

So what I wanted to actually show you, it’s taking a little time. But what I wanted to actually show you was that when pool zero does complete the resources held by pools zero that are 32 cores, again get reshuffled between the remaining jobs that are still running. So that actually helps you accelerate the completion. Once a certain job has completed. It does not hold steady two cores and not take new resources when available. And I think that was something I found pretty nice and useful.

And that actually completes my demo. I hope this was really informative. And helped you get a fair understanding of how we can improve performance by making minimal tweaks and change in the code. Let’s get back to the presentation and we can further our discussion. Great, So as you guys are just seen in the demo, we ran this over 96 node cluster. And we got a pretty good understanding of how we can run the same batch mode, parallel mode and pool mode. Now let’s take a look at the performance charts on different cluster sizes. So if you look at the chart on the left, it shows you that we see performance stabilization at 72 cores in the case of parallel and pool. If we divide 72 by three, we actually get 24, which is much lower than our initial assumption of 32. Which is kind of remember the earlier assumptions that I asked you guys to make with the arbitrary red line with 32. So in a real world scenario, when you’re running this in parallel we actually get a performance improvement on the 32 cores. Which means that we could have actually spun up this job and seen a really good decent performance. Even with just a 24 node cluster for each partition. So if you multiply that by three, that would be about 72 cores, right? The next thing that we wanna notice is the chart on the right. So the chart on the right shows you performance improvements across the board, but we do get much high performance gains at larger clusters of like 96 and 180. If you notice a difference between running this job on a batch mode versus a parallel mode. The parallel mode shows a significant jump in performance.

Let’s go on to the next slide. If you look at this slide, this was something that I brought up very early on in the presentation also. And that is failure and recovery handling. So we should be able to handle failures and retries within the job. So if you look at the three tasks that are shown out here, right, the first task runs it completes. And because we are having these jobs running in parallel, we’re easily able to track the progress of the job. So if a job starts and it fails, we’re not able to easily tracking it. So we should have additional state management setups. So that we’re able to monitor the state and the tracking of which the job is.

With that said, please do look at our blog. We will have a working example with the GitHub links. Do check out our careers page. And if you have any further questions, do reach out to me on my LinkedIn or my email address.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Oliver Lewis

Got my Masters in Computer Science degree from Indiana University and since then I've worked in the Bay area in tech startups in the fields of finance, retail, and entertainment. Real-time projects excite me the most because its fun to see and interact with systems that respond to a given signal.