We can leverage Delta Lake, structured streaming for write-heavy use cases. This talk will go through a use case at Intuit whereby we built MOR as an architecture to allow for a very low SLA, etc. For MOR, there are different ways to view the fresh data, so we will also go over the methods used to perfTest the various ways that we were able to arrive at the best method for the given use case.
Speakers: Justin Breese and Nick Karpov
– Hello everybody. Today we’re gonna be doing Building Merge on Read on Delta lake. My name is Nick. And joining me is,
– Justin tell us a little bit about yourself.
– Glad you asked. I’m a Senior Strategic Solutions Architect in the Los Angeles area. And I love pestering Nick with questions and getting him to do a presentation like this with me.
– Strategic, when that happened? I’m Nick Karpov. I am a Senior Resident Solutions Architect at Databricks. And my interests are History and Music. We’re excited today to be talking to you about Merge on Read on Delta lake. So there’s gonna be four things, four general sections to our presentation, our talk today. First, we’re gonna cover copy just kind of a little bit of a theoretical background on Copy on Write and Merge on Read. And then specifically in the Big Data world, and in Delta and Databricks and Spark. Then we’re gonna jump into describing the use case, some of its characteristics, the challenges that we ran into. And then finally, some Merge on Read strategies that we explored to address some of those challenges. So those first two things I’m gonna cover, I’m gonna shoot it over to Justin at that point, he’s gonna talk a little bit about testing and performance and how to really choose the right Merge on Read strategy. And then finally, the rematerialization, which will become more clear as we actually talk about the Merge on Rate strategy. Anything else I’m missing Justin?
– That was awesome.
– Let’s do it. Alright, so Copy on Read, cool, no Copy on Write and Merge on Read.
– That’s right.
– So the short story is merge is done on the write. And what that really means is that the, when we do a merge operation in Databricks, since the or rather sorry, merge operation on Delta, since Delta is based on the parquet, file source, immutable, immutable file, what really happens on merge is we’re taking a bunch of files, reading them, rewriting them with some changes, and then marking those files as active. And then at that point, they’re available to the reader. So that entire end to end operation is vaguely following the Copy on Write pattern, which is fairly common across many different paradigms. It’s really, I mean, ultimately, it’s great for write once, read many. So as long as your readers, you have a lot of readers and the reading more often than you’re writing, it’s fairly good strategy. One of the challenges though, is that if you have many rights or many many changesets, many things that you need to merge into your target data set, then in that case you run into some challenges. And that’s what we’re gonna be talking about. So Under the hood, this Delta Lake merge strategy, what does it look like? So there’s three phases. And there are really two elements. The first element is a source that is a new data set. That’s the terminology we’re using. And the other element is to target the existing data. And that’s the actual Delta table. Three phases are essentially join in the first set phase, another join in the second phase. And then the third phase it’s, well, it’s written here sequentially. But really it just completes the Delta commit protocol. Since the document protocol is all about identifying what files you touch, changing them in whatever way whether you’re doing a pens, deletes anything, any operation really. And then finally committing those changes if you did really change anything. So in that first phase, what we’re doing is we’re really narrowing down the candidate files to just actually, excuse me, minimize because I’m blocking my own screen. So in that first phase what we’re gonna do is we’re actually gonna narrow down the number of candidate files that we’re working with and what that really means as you can imagine that you have, let’s say, 100 files in your target your existing table. So 100 files that make up that data set. And let’s say you have just some changeset coming in some X number around a rows, let’s call it 100 rows, so 100 rows coming in and 100 files that exist on your target dataset. Now, if we wanted to actually do the merge operation, without narrowing down that list of 100 candidate files. Then we’d have to use the entire dataset. So that first phase is a cheaper innerJoin. And really the function there is to make sure that we don’t have any duplicate matches, cause we don’t have at least today any way to deal with those duplicate matches. So that would be, That’s incorrect. And then the second part of that innerJoin, what it really allows us to do is narrow down the candidate files. So if the matching condition only matches three files, then we can move on to phase two, and only use three files instead of the entire data set. And so that is really what phase two does, it takes the results of this first phase. And it actually reads in the candidate files that we’ve narrowed down, and then merges them together and applies the logic, whatever is specified by the user, and then writes the new files out. And then phase three, we commit those files as the new active files and move on. So that phase two, that’s what we’re gonna take a look at. So in phase two, where we read those touch files again, that is these candidate files, and write the new files based on the logic. It can really take on one of three paths. And that really depends on what the user has decided to write in their merge condition. So in the case that your merge condition only accounts for insert only is right? We can actually detect that and decide, hey, we only need leftAntiJoin in that second phase. The reason you only need a leftAntiJoin is cause we only need the data that you brought in from the source. We don’t need the target, we don’t need the data that already exists, we just need to get the rows that match based on the matching condition. And only insert data that’s coming from the left side. So from the source. In the case where you have matched only clauses, we only need the right side, so it’s a rightJoin. And in all other cases, in the case that you have clauses that are updating, deleting and inserting, we actually need to do a fullOuterJoin, since we need all the fields from all the tables, or rather all the fields from both the source and the target. And we need to be able to based row by row. Understand whether we need to update, delete that row or do an insert. Actually, let’s go back here for a second. Now that fullOuterJoin at the end. That’s kind of the the crux of the problem. You can imagine that since it’s you can see that you’re getting basically all the data, that there will be cases where you’re going to be rewriting and pulling in essentially the entire table, or the use case characteristics. We’ll dig into that a little bit. So what that means is rewriting the entire table, and at a fairly fast pace is not really desirable. It’s neither fast nor your resource utilization isn’t very effective. Let’s call it especially if just a few rows are ending up changing the entire dataset. And so that kind of motivates the need for a Merge on Read strategy. So the the crux here of the Merge on Read strategy is that instead of doing that merge logic that we just covered up front before committing, we actually defer that we defer that until the reader really needs it. And so we do that, we essentially just create a change table. So instead of having to have one table represent the dataset, we have two tables that represent the table set, data set. We have the target data, and then we have a change data. And then a view reader, will kind of dynamically get the result it needs and roll up the changes as it goes. It’s fairly common strategy. There’s a few different ways to implement it. But the way that I’ve just described is the way that we the path that we chose. Which one do you choose? Well, it depends, as most things do. It’s really an empirical, an empirical approach. Since Delta comes out of the box with a Copy on Write strategy. It behooves you to take the easy path first and try that out. And then if the performance of that so the whether it’s the merging characteristics or the the readers and the time that it takes between an update is applied and is available to the reader. If the defaults don’t match the requirements, that’s when you can start taking a look at kind of the Merge on Read strategy, which will outline. So our Use case. so in our Use case, where we’re taking this changeset and applying it. We have anywhere between six and 12,000 changes coming in or every single minute, so it’s fairly quick. The change that’s coming in from Kafka, it’s a partial update. So only a few columns, only a few fields may be changing or even available in the changeset. So we’re not getting full data here. Each row happens to have a unique identifier in the target dataset and in the changeset. And that is, it’s not a requirement strictly speaking, but in this Use case, it is the joining condition. There is a unique identifier for each row. The table that we applied this strategy to many tables, but in our case, the one the one specific table that we use is around 200 gigabytes in size. So kind of like a medium size, a medium sized table and growing at a fairly small rate. The characteristics we’re looking for essentially is five minutes end to end before it’s available to the readers. And before, actually ,ooh I mean we got fairly lucky, we were taking a Use case, that was a daily rewrite of an entire table. So getting it down to under five minutes was a definitely an excellent first step. Some initial observations of what we encountered. And it’s probably the key slide. And key point for this entire presentation is that they’re most of those updates that we got, were actually changes or updates, instead of inserts and deletes. So what that means is we’re actually changing existing data. And the second point is the matching condition which decides whether or not we change it. It’s fairly uniformly distributed across the target. So recall that I gave this example of 100 rows coming in and 100 files in the existing dataset. Well in the worst case scenario, each row will have will have will be present will have a matching condition, in our case on the unique ID in each one of those hundred files. And that means you’re rewriting the entire dataset. And you don’t want to be doing that as much as possible. But sometimes you just get a dataset that’s like that and change data that has that characteristic. What makes it further difficult is that there’s no partitioning keys that are that are natural, at least for this dataset. So usually, you’ll have something like date, and date has a certain pattern, most usually that the only recent data is being used, and only recent data is being required. So you can explain those characteristics. Well unfortunately, in our case, for instance, a sample of 50,000 events has 2000 different days of updates. So that’s five and a half years worth of data that can be changing at any given moment. So even if we partition our dataset by date, we don’t really get anything out of that. And then even further than that, once you’re in that world, there’s really no configurations that can help you too much. So changing the file sizes, or various thresholds within Delta, unfortunately, don’t give you very much. So the Merge on Read architecture, just a visual representation of what we’ve described so far. Consists of this changeset table, this append only change data table, to their structured streaming job to the left of it, which is just simply sinking all the change data into the append only table. And then you have the actual snapshot materialized table below it, which represents kind of the final sync of the data the final representation of the table. And then to the right of that you have the view. Now the view is gonna take those two tables that changeset and the materialized table and perform certain kinds of joins, which Justin will cover. And those joins will represent the current state of the data. And then once in a while, you have a periodic materialization job. That’s the Spark job below, which will update the materialized table. And that’s something that you can tune based on your needs. Specifically the Snapshot, this base table. In our case contains the Primary Key in our case, the id again not strictly something that’s required, it’s just a characteristic of our Use case. You don’t need the id, you can just have matching conditions that exploits certain fields. We have frag number, which is kind of in our case of versioning information specific to the Use case. Partitioning is included there. But it’s not strictly necessary. Just we don’t wanna exclude it. So if you have a partition dataset that can be exploited please do. And I’m on the right side of the changeset, which basically matches it with a with a slight difference. We again have our Primary Key, we have the averse some form of versioning of the incoming data that’s coming in. So row versioning information. And then the partitioning here on the changeset is actually by the batch id. And so the batch id is something that the structured streaming job which is writing into the this changeset exposes. So just a quick review that structured streaming is a essentially a bunch of sequential batches. They’re being executed one at a time. And so for each for each execution, in that, in that sequence, a batch id is revealed, and so unique batch id and it’s guaranteed to always be the case.
– Thanks Nick. Great stuff. All right, now we’re gonna talk about the changeset. And then once we have the changeset, we can incorporate that into the view. For the changeset, what I need to do is, I need to wait to get the latest and greatest value for a given id. The other kind of caveat here is the CDC data is partial updates. So if I get a no, in the partial update, that doesn’t mean I actually need the value to be a no, it means I just don’t have anything for. So you can see here I do some coalesce between the changes in the baseline. I also within this code, I need to understand like can I actually broadcast? For this data I looked at it and it looks like each row is about 364 bytes worth of data. So if we can assume that I can safely broadcast up to one gigabyte worth of data, that means I can handle 2.8 million rows. And so I do a quick check down here in line 12 and say, hey, if as long as the changeset is less than or equal to two point million rows, then broadcast it else or else don’t broadcast it, it’s not safe. So now we have this ranked over changeset. in the latest and greatest by id. We need to figure out how to incorporate this into the baseline. And the easiest way of doing that is just do a doubleRankOver. So let’s take that changeset union it into the target, and do a RankOver on that to get the latest by the id. The second method is let’s just do fullOutterJoin. So we take the changeset that has already been ranked, and we do a OutterJoin to get the latest by id. Third method is we can do a leftJoinAntiJoin. So I take a leftJoin to give me the matches. Then I flip around the tables and I do a an AntiJoin. I do the AntiJoin to figure out what actually are the inserts going to be. Now that I have that, I union those inserts back into the entire dataset. So this will now give me the latest and greatest by id. So broadcastable and huge. But preferred method, at least that we what we could get away. For this use case is what we found out is that are in the CDC data and insert is guaranteed to be an insert, I don’t have to check and see if it’s going to be an update. So I take that rankedChangeset, I filter out the insert events and I hold those off to the side. Then I do my leftJoin, then I take those inserts, and I union those in. And so now I have a full and complete. And for us broadcastable and super fast. And so this lets us get at any time the latest and greatest data by a given id. LeftJoin union inserts it just rolls right off the tongue. All right. So now that we have these four different methods, though, and I told you like, hey here are the ones that we preferred, we had to come to some realization and rationality to why it’s the preferred way. And to do that we do some perfTesting. And performance testing usually takes a long time. But it really doesn’t have to. So some things you wanna consider is how many tests are sufficient? How can I make them as even as possible? What do you actually want to test? Again why is this so hard and so manual? So to do this though efficiently, what we did is we leveraged the Databricks, run submit API. The run submit API starts a fresh new cluster for each run. So you don’t have to go shut down a cluster, restart it, you don’t have to worry about what has been cached. What hasn’t been cached. Am I running with the same runtime? There’s just a bunch of different things. With this, you don’t have to manually configure all that. Also the Databricks notebooks have widgets and widgets can act as a params, so we can incorporate those into our code. And so if we were to say, hey, let’s do three different tests for each view type or method and each operation read and write. Then that means we can do a total of 24 tests right here in a relatively parameter sized way. So the widgets and the notebooks, super simple, some series of drop downs, text inputs, and then calling those as variables within the code. The next thing we wanna do is create a payload of results cause eventually we’re going to have a timer table. Where we can put metrics of what we’re actually testing. That way we can actually pull the data and put them pretty, pretty graphs. So we’re gonna create our payload. And now we have a timer function. So the timer function is going to look, what method, if we’re trying to do a read, what are we gonna read? we’re gonna read based on a given predicate. And we’re gonna do a dot show to make it happen. Finally, we wanna save this data along with the payload that I was talking about, to this timer table path. So the next thing we wanna do within the code is use a nice elegant case statement to based on which method is being passed in like, Is it the rank? is it though the AntiJoinleftJoin union? Based on which one of those we’re gonna call the correct DataFrame method. And then this is actually running the test here in line 10. Which is the stopwatch function, which I’m gonna grab here, here’s the dataframe that we’re gonna test it on. Here’s the operation. Here’s the recruz that we’re gonna do. Here’s the payload that I’m passing and then done. So the next phase is now configuring the API to understand what to do. And to do that, we’re just gonna use some simple JSON. So I can just say I wanna do three runs each, the operations can be reads, they can be rights, what are the different view methods that we wanna test? What is the actual read query, that we want to test? And then the cluster information. So each time this is kicked off, for all these runs, I want to use this same exact cluster, same I am role, etc. So in the background, here’s what it’s actually doing all the different permutations, I wanna do three runs of each. So I’m doing a read of this method, the leftJoin union inserts, I have the second method, third method, etc. I got my reads, I’ve got my runs. And so each one of those are gonna operate by themselves, and then the results will be appended to that timer table. And also you can check out everyone’s got a gitHub repo for that and so do I. So if you wanna take a look at this code, it’s super simple. And just for you there. So the next phase is actually calling the API. This is also in the repo, really simple, it’s just gonna go through everything that we wanna do. We’re gonna, it’s a simple command to call. Point it towards the JSON file, provide your user access token within Databricks and hit go, and it just runs. And then when we go through, and we wanna read the timer table. Let’s look at the reads, Let’s look at the runs. And let’s see the data. So what you’re looking at here is a graph of for each of these view methods, reading the same exact data, where it’s just the different methods of reading it, and we can see the lowest time equals fastest, highest time means not as fast. And so we can kind of jump down and look, the OuterJoin method. That doubleOutterJoin is pretty slow. Whereas what I was saying before the leftJoin union inserts is definitely the fastest method. And we can look at it in more of a tabular form as well and just look at the different times that things took. So this is a naked eye, letting the data and truly decide what is best. So recap thus far. Nick talked about like this the beginning part. So we have this append table, we have smash it into the view, and I just talked about the view. But now we’re gonna focus on this bottom part, which is this periodic Rematerialization, because we have the data. And then we want to every once in a while, instead of doing this Merge on Read architecture over time if we have more and more updates. It’s going to slow down over time, or we won’t even be able to broadcast the changesets anymore, cause they’re too large. So we wanna kind of hit reset, and we wanna merge everything in. And like anything else, there are many methods to do this. So first of all, like part of it is figuring out how often do you want to do this periodic job? And for that you for your use case you have to look at I mean, it’s it’s a job that has to run, how often do you want to run? And for what benefit? So you have to do that analysis on your own performance testing is key there. But as far as resetting the baseline itself, there’s a few different methods you can do a merge, which is very and which is very helpful and easy. If you have many larger partitions and only a smaller subset of these partitions need to be changed. Its merge is also built into Delta Lake. So super easy semantics. An overwrite is great if you do not or cannot partition or if most if not all partitions need to be changed and replace where is a Delta API, super elegant, but you need to have partitions. And I would say it’s a little bit more difficult than merge but not crazy by any means. So for this one we ended up going with the the override function. I know Nick was talking about at first how just if we were to partition by date, everything was going to get touched, or at least the vast majority of partitions. And so what we found is just overriding was super easy and it got us what we need. And the the key here too, as we talked about is the batch id. Cause after we, we rematerialize, we don’t want a subsequent read for those previous batches to be read as if it was new data. So what we have to do now is we have to delete those batch ids. So within that append only table we partitioned by batch id. So if we check line 68, we’re actually gonna go and use the Delta API’s to actually go and delete. So we’re batch id is less than or equal to the latest batch, you can see that we pulled the latest batch up here by looking at the max batch id. What this means is it’s a super easy metadata only operation to delete data out of this Delta table. So the normal stuff that you see here is we’re going to be doing our overwrite. But then we have to make sure we clean up that append only table within the materialization. All right, so again like I said, we went to an did overlay. And we did that based on data. And we saw that doing it over it was just gonna be faster, more efficient, and you use less workers for this type of use case. And in the reason why is what Nick was talking about, like how we get a ton of updates, and the data set is somewhere it’s not. It’s not small data, but it’s definitely not big data or large data, only 200 gigabytes. So using, 80 cores, we’re able to rewrite this thing in less than 10 minutes. We need to do this periodically rematerialization Final recap. We talked about the use case, we introduced the emergent rearchitecture, MOW’s and COW’s don’t forget, and we talked about the two tables. Meaning how you’re gonna derive a final result. We went through the different views and understanding of their their differences. We talked about how to test against these different view methods and the periodic materialization. So with that said, I always like to say thank you and give credit where it’s due. Chris Fish, Daniel Tomes, TD, the man, the myth, the legend, Burak, Joe, Denny, Paul Roome, and of course our moms. Our moms are always super helpful. Nick, is there anyone you wanna thank? Or no, you’re thankless?
– All right. Cool.
– And especially my mother. All right. Cool. All right. Well, thanks, guys. Feedback is super important. Give it to us. We are online and typing and we will stay here until you’re done asking us questions. So thank you very much. Have a wonderful rest of your day Spark in the AI Summit.
Justin Breese is a Senior Solutions Architect at Databricks where he works with some of the most strategic customers. When he isn't working you'll find him on the soccer field, working on his old Porsche, or playing the drums and guitar. He lives in Topanga, CA.