Patterns and Operational Insights from the First Users of Delta Lake

Download Slides

Cyber threat detection and response requires demanding work loads over large volumes of log and telemetry data. A few years ago I came to Apple after building such a system at another FAANG company, and my boss asked me to do it again. I learned a lot from my prior experience using Apache Spark and AWS S3 at massive scale some good patterns, but also some bad patterns and pieces of technology that I wanted to avoid. That year I ran into Michael Armbrust at Spark+AI Summit and described what I wanted to do and a plan to test Databricks as a foundation for the new system. A few months later, while we were in the middle of our proof of concept build out on Databricks, Michael gave me some code they were calling Tahoe. It was the early alpha of what became Delta Lake, and it was exactly what we wanted. We have been running our entire system writing out hundreds of TB of data a day on Delta Lake since the very beginning.

This presentation will cover some of the issues we encountered and things we have learned about operating very large workloads on Databricks and Delta Lake.

  • Effective Delta Lake patterns for streaming ETL, data enrichments, analytic workloads, large dataset queries, and Large Materialized Aggregates for fast answers
  • Z-ordering and the 32 column default limit. Oops. Optimizing your schema to ensure z-ordering is effective
  • Date partitioning and the implications of event times with long-tail distributions or from unsynchronized clocks
  • Optimize, optimize, optimize, and when autoptimize is your only option
  • Upsert patterns that have simplified important jobs
  • Tuning Delta Lake for very large tables and low-latency access

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Hi, my name is Dominique Brezenski. I’m a Distinguished Engineer in Apple Information Security, and I’m gonna talk about Delta Lake patterns and insights. Necessity is the mother of all invention, and building big data systems, there are a lot of challenges.

Sometimes the greatest innovations come from solving

About three years ago, I was at Spark Summit and I ran into Michael Armbrust and talked about a system that I had built at a prior employer that was pretty large-scale, doing hundreds of terabytes a day of data. And I had to build a similar system at Apple, and I described some of the stuff that we had gone through that we had done and challenges that we’d had with certain components, and I saw a flash of, you know, inspiration, terror, brilliance, in Michael’s eyes, and he said, “Yeah, I don’t think our architecture is gonna hold up to that now, but I have this thing in my back pocket.” and as we discussed it a bit more, it really resonated with where I wanted to be and what I wanted to do for this project. So, a couple months into a POC, Michael reached out and he said, “Hey, I have some code for you to try.” and that turned out to be basically the alpha of Delta Lake. It was Codename Tahoe at the time, and after running a few tests and verifications, we just went all-in. And within a short period of time, we were doing tens of terabytes a day into Delta Lakes, and now we’re much bigger, hundreds of terabytes, approaching a petabyte a day of data that we ingest through our ETL pipeline, and then we process much higher volumes of data from kind of daily workings inside.

So, we learned a couple things along the way and had some insights. I’m not really gonna talk too much about the sort of standard meat and potatoes of having data in a table-like format, but I’m going to go into a little bit more about some of the more novel or interesting things that Delta Lake has allowed us to do, and then some just kind of operational insights and gotchas that we’ve experienced, hopefully a little bit of tips and tricks kind of mixed in with this.

So, first I’m gonna talk about patterns, and first up, since I said we run a big ETL pipeline, is the inversion ETLs extract transform load, but in the big data ecosystem, we usually do something more like extract, load, and then transform. And that’s exactly what our system looks like. We actually have an extract system that dumps a bunch of data into S3, then we pick up that data, we load it into a staging table, and then we have a bunch of streams off the staging table that are doing transformations that result in unique data tables that have well-defined schemas and present really a stable interface for our downstream consumers of that data.


But before we get too far into these patterns, I want to stop for a second and talk a little bit about our parsing framework that we use because I’ll show some sample data down the road, or some sample code, and it’ll use this, and so it’s easier for you to just pause for a second and take a look at it. So we have an abstract class called Parser, and a few of the unique things about this is that we actually kind of broke it up into some base steps that have really given us flexibility. So we have a prepare step, and our parsers are basically data frame transforms, right? So they take a data frame in and they pass a data frame out. And so prepare, given the source data frame, it’s meant to do simple things like filtering or simple extractions that would give us additional information that we might need to use in parsing. And just fundamentally prepare possibly drops from tables, or sorry, some columns, if necessary, coming in. But other than that, not too much processing in there. Parse is where we do a majority of our extraction and transformation of the data, and then complete is an opportunity once parsing is done, once validation has occurred, for us to do kind of any fix-up of the data before it actually gets committed to the table or output from the stream. And so an interesting thing is, basically apply function on this, this is all Scala, if you didn’t notice, is that it runs these steps in a fixed sequence, and in between some of the steps. So for instance, we run prepare, but then what it actually does is we create a struct of all the columns in the incoming data frame, and that goes into an origin column. And so we basically just transposed all the columns into a struct, put them in one column that we start with, and then we run the parse method, and then we see this set parse details is automatically run, and the interesting thing that it does is it actually creates a column with a struct in it and it uses this validConditionsExpression, as well as the additionalDetailsExpression, in order to validate the row and then possibly capture additional detail about it that could be useful downstream in either dealing with a parsing error or tracking lineage of the data. And also puts a timestamp on it so we have a marker from when the data was actually parsed. And once the parsed details has been set, finally the complete method is called. So, this is sort of a layout for what we do through our ETL pipeline. When we get into the extract part, we actually have a fairly complicated system upstream, it’s actually a couple systems, that is responsible for collecting log data or telemetry or extracting data from upstream systems, databases, APIs, and fundamentally what it does is takes the raw data, wraps it in a JSON metadata layer that includes, for instance, the event type, the source of the data, we get timestamps from that system so we can track latencies. But fundamentally, it’s metadata. And then there’s a raw field in that that includes the actual log event or piece of telemetry or data in it. And the extract system plops that into s3.

From there, we actually use the s3-sqs source in Spark, and that’s tied up to notifications on the s3 bucket, and that’s how we consume high volumes of data without having to do expensive list operations and slow list operations on the s3 bucket. And so that’s the source of our data. And remember when I said it’s in the JSON object per line format? But a note here is we don’t actually use the JSON input format for Spark streams. We actually use file format to capture that as just a single text line. And I’ll talk a little bit more about why we do that and the advantage. So, as we’re loading the data out of s3, here’s our staging parser and you can see our validation conditions is just that it has a valid timestamp. And in that additionalDetails, one of the things we capture is actually the input file name. So this is giving us lineage from this row back to the file in s3 that it came from. And so this is just to help with debugging, break-fix type information, but it establishes at least a baseline history for the data. And then our parser’s really simple. We’re basically just extracting the JSON from the text file input, creates a single column called value, so that’s been transposed into this origins struct. So we’re just using a from_json. As you noticed here, we’ve actually extended from JSON to include some fancy timestamp parsing stuff that we do and add some semantics that is consistent for our use case. So, we know the schema for this input data because it’s the standard metadata wrapper, so we’re just extracting those fields and then we’re basically transposing them again out of the resulting struct in the top-level columns and then we’re preserving that origin column. And so, we have the raw data as it came in in origin.value, and then we have the actual extracted data in those columns.

Then, internally the parser is doing a validation and checking whether or not that there’s a valid timestamp. And so in our complete method, we actually come along and we look to see if the timestamp is null. And if it is, we substitute in the parsing time for that. And if not, we use the actual extracted timestamp, and then we capture all of those standard columns and the parsed details, as well as that origin, which give us lineage from this. So the important things here are that we always end up with a valid record even if we fail to extract the JSON, we have original string as lineage that comes through here, and then we have parsed details so we’ll know whether or not is was correctly parsed, and we have a lot of information to help us if we have a problem loading. And the reason why we use that text file input at this stage is because if you just try to use the JSON reader and it comes across a bad line, or worse, it comes across a line that doesn’t match your extract schema, you’ll just get all nulls, and you won’t know exactly what happened. So, what we do is we capture the whole string in case that fails, so we have a copy of it and we have the lineage of it, and this has really helped a lot in dealing with when data shape changes upstream coming into our table and allows us actually to reprocess against the same table and override if we need to. It’s had real advantages from a kind of break-fix perspective.

Then we get to the transform phase. So, off that staging table that that last one loaded in to, it’s partitioned by date and event type. So mostly then, our actual data parsers have a stream reading off of the staging table that has a predicate on it looking for a particular event type or maybe is possibly poking into the raw log itself, looking for some kind of fix tokens in it. And then that’s driven into the stream. So in our prepare phase, we actually drop that origin struct from the prior table because we’re gonna create a new one with the incoming data from this. So we drop origin, we add all the other fields. It’ll create a new origin. And then by parsing it, a lot of our data sets are actually JSON themselves, so I’m just giving an example of doing an extraction from JSON using the schema again, and transposing that back out. Some of ours are more complicated, and this parse method would have a bunch of code there that has to deal with ugly, raw, semi-structured events. So that just really depends on the source type. The output from this will then go to an actual clean data table. Somebody might call that silver or gold, you know, kind of depending on it. For our users, that data is first-class and super important. There are a lot of derivative data sets from those, but the events themselves actually have a lot of value in our environment.

Merge Logic Debug

So that’s extract, load, transform. So once we have these tables, we want to start doing stuff. And some of the unique features of Delta Lake are being able to do things like asset upserts at a very large scale. In an upsert, you have a table and you have new data coming in, and based off of, essentially like join criteria, whether or not certain columns match or have a relationship with one another, you may want to choose to update an existing row, insert a new row, or delete a row. And that’s super, super powerful. We’ll talk a little bit more about that in some of the use cases. But it turns out, sometimes your data doesn’t look like you think it does, or your merge conditions, essentially the on clause, aren’t as specific as they need to be. And so what will happen is multiple rows from your new input data will actually match a single row in the existing table, and that violates a constraint for merge, and it’ll throw an exception, and you’re left going, “Hmm, I wonder if it was my on criteria, or I wonder if my input data actually had duplicates in it?” and you’re sort of stuck. So, merge logic debug. Turns out, oftentimes you’re doing, if you’re doing merge in a stream, you’re doing it in a for-each batch writer. Or sorry, a for-each batch method. And a little trick that you can do is, the input data, so you get this microbatch data frame, you can just take that and you can just write it out to some other Delta table and just overwrite the data in it. And if then it throws an exception in your actual merge, you’ll be left with the actual input data that came in and violated the constraints, and you can just go manually run a join with the same on clause conditions, and then validate whether or not you had duplicate rows in your input, or whether or not your on conditions are actually too wide and matching multiple rows that aren’t duplicates. And it makes it super easy to debug when you hit that case. I wouldn’t recommend leading that intermediate write-in as it’ll add latency in production. If you’re not latency-sensitive, it’s not a terrible idea, it’s just after each batch, that table will represent the last batch that was processed. But in development, super good. And if you have a production stream and it breaks, you can just go and wedge that little bit of code in there, run it against this existing checkpoint. It’ll try to do that last batch, it’ll fail again, and you’ll have the data that you need to debug. So, super helpful thing.

Stateful Merge

Another thing that we do with merge, and this really marries a powerful feature of Spark Structured Streaming, which is essentially stateful processing, with merge, and where we use this a lot is creating things like DHCP sessions, VPN sessions, so any sessionization makes a lot of sense, or when you have a stateful change or update to something that you want to do. Then, these two marry together really well because you can run basically a mapped group to a state or a flat mapped group to a state. And so in our case of, say, take DHCP when a machine gets a new session and IP binding. We want to omit that as soon as possible so any other event data that we have coming in, we can map by IP to the correct machine identity. And so we want to make that open available super fast, but eventually that session will close, and it will have an end time to it, which becomes really important for processing old data and making sure that you’re only assigning the host to the IP within the right timeframe with regard to the time series data that you’re looking at. So, what we do is we omit an open, and that comes in and basically gets through a for-each batch writer, gets appended to the table, but then we omit the close when we see the end of the session, and through merge, we actually find that open session, the corresponding open session, and we just updated that row in order to be the close session with the addition of the end time and some other state and attribute information to it. And that keeps, then, that session table really small. We either have an open or we have a close for that session. We don’t end up with an open and a closed. It keeps the processing logic, i.e. when we’re doing a join in order to enrich other data with that. It keeps that logic more simple. And it’s easy to reason about the data that you see in the table, as well. So, map groups with state, or flat map groups with state married up with Delta Lake merge, they’re really peanut butter and jelly.

Aggregation Funnel

Another big one that we’ve done, and this is sort of a crazy thing that we did, but we have a few dozen tables that have columns that have either an IP in it or have a comma delineated list of IPs or an array of IPS, and it is really common for us to have to try to find all the data or telemetry that we have around a given IP or a set of IPs or IPs that are in some slider range, and this is a very expensive operation even with things like dynamic file skipping and z-ordering on those IP columns. Since some of them aren’t just a single IP and they’re actually a list of IPs and you have to do like a contains or reg checks, really terrible, you’re doing full table scans. So in the past, when we’ve had a set of IPs and we’ve had to look over our long retention window across all those data sets, that could take up to 48 hours on a very large cluster. We decided we needed an index, but we have 38 tables. And by the way, these tables are crazy big, where we’re writing hundreds of terabytes a day across these tables, so it’s a lot of data. So, first naive thing was, create a stream off of each one of those tables that was pulling out IP addresses and the date partition and the table, union those strings together, doing aggregation on it, and try to write it out to a table. You can imagine how well that went. Even on extremely large clusters, that wasn’t gonna work. We went kind of beyond the computational resources that we had to do it. So we had to start decomposing the problem a bit, and the really interesting thing is that Delta Lake semantics and operations you can do on it, in some ways really complement structured streaming, or can be used instead of certain features on structured streaming. So what we did on this, is we basically hang one or more streams off of each sourced data set, where we’re doing a for-each batch grouped by source IP, dst IP, and the date partition. We also include the source table name and column information in that. So this means, for-each batch means we’re running this aggregation just on each microbatch on the stream but not in totality on the stream itself. And we take that and we append that to the step one table. So, we can have a bunch of source tables, they each have streams coming off of them, and we’ll just have all those streams append to one table within some reasonable bound. And that creates the step one table, and then off the step one table, we’ll do another for-each batch, actually two operations, one where we’re doing a group by source IP and the date, and another one by the dst IP and the date, because ultimately our index is gonna be about one IP to the table’s columns and date partition that it showed up in.

So we do the two group-by’s, and we union those together, and then we append to the step two table, and then from the step two table, we basically do a for-each batch, and here instead of doing a group-by, we do actually a window aggregation on it. Turns out more performant, haven’t really dug into that.

Then we take the output of that aggregation and we actually do an upsert into the index table. And so what that’ll look like, the final index table, we actually have the original value from the column, we have an individual extracted IP from that value, we have the date partition, and then we have an array of the data set names and columns where that IP showed up in that original value on that date partition. And then we also have a sum total row count that we keep for that, which gives us a good approximation of how much activity there was for that IP on that day.

So over the day, as new data comes in, that array may grow, and that total count will grow, but we won’t be creating a bunch of other rows. So this gives us a much smaller, more compact table that we can z-order by extracted value on, and we can very rapidly search for where IPs.

Then from that, we get some metadata about what data sets they were in and the date partitions and some row counts, and we can do some interesting stuff with that, or we dynamically then compute queries against the sources data sets using that information, so now we have a date partition predicate, as well as the original values. So we’re doing an equality search of that column value in that table, and we’ve taken, the one that took 48 hours using the set up to get the real data from the source tables, took about a minute. So that’s a radical improvement. So we’re spending some time upfront to compute this, but every time we go to do that type of search, we’re saving massive amounts of cluster resources, and we’re giving our users much faster response for the data that they need.

So, interesting thing about tables that are merged into, so again, I’m talking about, hey, there’s all these great, we built these index tables, these session tables, and we’re doing these upserts into them. The problem is, is if you want to actually take that data and you want to get, say, a slowly changing data feed of it. So you see the new sessions or you see the updated state of the session, or you see the new index value that’s coming out of it.

The problem is, if you just try

to hang a stream off that table, soon as there is an update or a delete, the stream will throw an exception because normally that violates the standard semantics. So now you have to throw on the ignore change option onto the stream, which, when you’re doing just updates and inserts, essentially what happens is, if an underlying Parquet file is read because one row needs to be updated in it, that row gets updated and a new Parquet file is written out and replaces the old one in the table. With ignore changes set, the entire contents of that Parquet file will get played down the stream. So you imagine you have tremendous amounts of duplicates coming down. So you’re not just seeing new inserts, and you’re not just seeing the rows updated, you’re seeing all the other unchanged rows from the same Parquet files. So, a little trick that we do for this to get kind of a clean, just insert and updated row feed, is we actually hang a stream off the table that’s being upserted into, that stream uses ignore changes, and then it does another upsert but using the de-dupe pattern. And so the de-dupe pattern is essentially you just check whether or not all the columns match, and if they do you just ignore it, and if they don’t then it’s a new value that you haven’t seen in the table, i.e. it’s inserted or it’s an updated record, and then we insert that into a clean table.

So now that clean table can be a nice source of a stream

that just gives you like an SED update-type feed. So this is, you know, one extra step, you add a little bit of latency, but we’ve now solved an interested problem by just kind of understanding the semantics of Delta and kind of marrying that up with streaming in an interesting way and kind of chaining merges together using different semantics.

So, that’s a really, really, interesting and powerful pattern that we use in some places, too.

So that’s it for patterns. Now I’m gonna get into some of the insights that we’ve had.

Storage isolation for high-scale tables

So, obviously we run some very large infrastructure. I said we take in a petabyte a day. We have a long retention window as well, so we actually have tables that are on order of two petabytes, and we have a ton of half-petabyte tables sitting around that we operate against regularly.

From our prior experience we knew that you can have performance problems with s3 if you don’t have enough entropy at the top of the path file, on top of the paths so that the s3 bucket can shard well and distribute your workload. So, when we started this, we said, hey, we know how to do that, so we can get really high IO on a bucket, so let’s just put all our tables in one bucket, you know, kind of easier. Turns out, it’s not easier, and it turns out, still under certain conditions, your s3’s having a bad day, we might get throttled, and sometimes that throttle can impact other tables in the same bucket. All right, if they happen to share the same shard or something like that. So, we found best practice is for most tables, unless they’re incredibly small and bounded, we just put each table and its corresponding checkpoint in its own bucket, and we enable random prefixes on that bucket, or on that Delta table, I should say. And this means that the data that’s written to that table is spread nicely across a nice hash or partitionable pass space, and then we get really high IO for s3 and we don’t get throttled for those. And it also has some other nice advantages where the bucket then becomes a proxy for table access, so we can just use, you know, standard IAM accols on that, and so we can actually share individual tables with non-Spark consumers. So for instance, if you wanted to use Hive with the Delta Lake connector, or Starburst Presto with the Delta Lake connector, you could individually expose tables by just using accols on the bucket off to those things even potentially running in other accounts, which is a little easier than doing accols on prefixing and trying to track the whole thing. It’s just a little bit more insurance. And also, if you want to replicate a table to another region, you can just use bucket replication, and largely it’s gonna replicate the data in the table and also the metadata and also the checkpoint data. So, under most conditions, that other table within a reasonably short, or sorry, the other bucket in another region will have a corresponding version of the table and even a checkpoint you can restart from. So, some nice properties there.

The next insight we had, and this shows up in the parent, and we really realized that Delta Lake and structured streaming in Spark are really composable. And if you really understand the semantics that you can get through the various configuration options and using normal stream or using for-each batch in the stream. And then the semantics around Delta Lake, we found that we were able to overcome scale hurdles that we might have. So for instance, if we wanted to keep a running aggregation on a huge stream, and in our case, oftentimes the event timestamps on our streams actually have a long-tail distribution, so trying to set up a window on a watermark is either not possible or might be possible but the resource consumption is kind of sketchy on it and might not be super stable and we’re not well-protected against huge spikes in data. But what we found is we can just go do for-each batch aggregations on the stream, which don’t have to keep in our batch state, and then use the merge operations on Delta Lake to keep updating the aggregation output there so we have a running total on the stream. And so now we’re able to use Delta Lake semantics to overcome scaling issues that we might run into on Spark Structured Streaming. And when you use for-each batch, you give up exactly-once semantics and get at least once, but if you include a batch ID column in a Delta Lake and merging into it, you can actually include that in the on conditions to make sure that you won’t merge inner results from a batch that you’ve already seen, and now you’re back to exactly once. So, these type of interplay between the capabilities in Delta Lake and Structured Streaming gives a super composable building blocks. And we’ve found that we’ve tackled many problems, including building a giant streaming inverted index without actually having to introduce any other technologies into our stack, and just using our expertise on those two.

Schema ordering

So, a little thing to note, and this is actually covered well in the Delta Lake docs, but schema ordering and interaction with stats is something that has bitten us before, even though it was documented. So I’m just calling it out again. And essentially, stats collection in Delta, so for the first 32 fields, and I’ll clarify fields and columns in a second, have stats like min and max generated on them in Delta. And that’s kept at a per Parquet file basis within the metadata. When I say fields and not columns, it’s bad terminology but Delta Lake tries to not have exceptions around types. So for instance, in a struct it considers each member of the struct to be a column, essentially. So, it’s not the first 32 top-level columns that have stats collections. If you have a deep struct in early column index, then you may eat up those first 32 by the first couple columns and then that big struct sits there. So you gotta make sure that you kind of count over and then down, as well. If you’re trying to figure out if you’re within that, you can actually modify the 32 using the dataSkippingNumIndexedCols setting, and you can set it to minus one if you want it on everything or any value that you want. So, dynamic file pruning uses the min max column stats. When you have a predicate on a column value, if it falls outside the min and max for that file then the answer can’t be there and it can choose to not read that file, and reduced IO is increased performance. So z-ordering, also in the Databricks product, essentially maximizes the utility of min max by doing a clustered sort across one or more columns. So this gives you tighter min max ranges without inner file overlaps, and so this is really gonna maximize the number of files that you can skip based off of a predicate like that. So, the key is, make sure if you’re z-ordering or you’re sorting columns, that those columns have stats collection on, i.e. they’re within the first dataSkippingNumIndex or by default, 32 fields. Otherwise, happily go on in z-order and do nothing for you. It might catch that these days, but originally it didn’t. And happily would optimize on a field that didn’t have stats collection. Another little hint there is that it’s expensive to collect stats on long strings. I don’t have an exact number for what constitutes long, but I’d say, you know, when you’re talking about hundreds of bytes to a kilobyte plus, you’re crossing a threshold. If you’re under 100 bytes, you’re definitely fine.

But it’s expensive to do it, so if you can move your long strings to the end of your schema, beyond the dataSkippingNumIndexedCols value, you know if your column width is not 32, if it’s less, then you can tune down that value for your table and put those long string columns outside of that, and that’ll just make your writes faster. If you really need stats on them, though they’re less helpful on strings, but then you pay the price on writes. So depends on what you’re really tuning and optimizing for.

Don’t over partition your Delta Tables (laughs).

So this is worth kind of talking through a little bit. When you add partitions to your table, you generally are increasing the object or file count underneath the table on the storage system. And that is because if you have no partitions, you can optimize the table to have essentially the fewest number of one gig, or whatever you kind of tune optimize for, objects. So you minimize the number of objects and get nice big ones that give you great throughput from the underlying storage. As soon as you introduce partitioning, you’re probably going to a larger number of objects, possibly smaller on even size, going into those partitions. Even if you optimize it, the partitions are forcing a break between those and how the data’s commingled. So you’re increasing object count. When you increase object count, it generally has a performance impact on full table scans, on large aggregations, on joins, all those things, unless you’re using the partition values within your predicates liberally and efficiently.

Basically, if most of your queries are join or aggregation operations, can specify partition values as a predicate in the operation, then probably you’re picking something reasonably good to partition on. I think Michael has suggested that a partition should have at least a gig of data in it as a general case. So that can just give you, you know, a role by. But we often see cases of 10, even 100-gigabyte tables that the way that they’re commonly interacted with, partitioning doesn’t actually make a ton of sense, and it actually is faster if it’s just un-partitioned. The caveat is if you have to have a retention window or delete certain data at time, if you can set it up to where you can delete by the partition value, say a date or a month, then that makes deletes cleaner and disjoint from other operations on the table, but again, that’s often paying the cost of higher object count, which can reduce certain performance of operations. So, the onc case where over partitioning makes sense, is if you’re trying to make a table that purely services point lookups on partition values, and usually a small number of partition values. If you’re only appending data to that table, and you’re only deleting by partition values. And in which case then, if a partition ends up with a very small amount of data, but you’re intentionally only trying to look for the data and not partition, it’s okay, you’re not really paying a throughput performance problem. So that’s the narrow case of where over partitioning can be advantageous to make high-performance point query. But soon as you have to mix, like a point query load with, you know, an aggregation or join stuff, it’s probably better to right-size partition or even under partition a little bit. And then fall back on something like z-ordering or sorting on your primary key column to maximize the dynamic file printing off the min max stats. So one thing that you’ll run into with Delta Lake is sometimes having conflicting transactions, and this really comes about when you’re doing things like deletes and updates on a table.

Handling conflicting transactions

If you’re only doing append-type operations, then you don’t really have conflicts. So, where we really run into this is on tables that we’re doing upserts or merges into because the table is changing all the time. And then if we have to do something like a z-ordered optimize on the table, and the optimize takes a little while, it can stumble across the table changing out from underneath itself. And so our way of handling that is when we have sort of two related operations that conflict, you can either make everything partition disjoints, which means that are not operating on the same partitioned time, but if you have something that really operates on the full table, like an optimize z-order, what we do is we just inline it like in the same method that’s doing the upsert, and then, for instance, in a for-each batch, we’re just doing a mod on the batch ID and every end number of batches, we execute the optimize. And this way they’re serialized, they don’t conflict, we get a little bit more latency on that batch, but we’re ultimately keeping latency down because the table is well-ordered and nice and compact.

And one last thing is we have some very large tables.

Large table metadata optimization

I mentioned, you know, one and a half petabytes, two petabytes, with millions of objects in them. And so that means the metadata is larger. So just one thing that we’ve found, you can use the snapshot partitions setting in order to essentially use more partitions or tasks to read that metadata. So if you have a larger cluster, something boosting that up into the hundreds, or even 1024, can reduce the latency on reading the metadata until we see some coming, hopefully, optimizations on metadata operations. And then, sometimes you might want to twiddle down like your log retention. Again, if you’re adding tons of data to a table on a table with lots of metadata, you’re gonna you know build up more metadata and more underlying metadata files and changes, and so you might want to shorten the interval that those are kept for that can be impacted, time travel and other stuff like that. So, you know, dig into that detail, but that can just be something that can help kind of keep your metadata size down a bit. So really it’s adding more parallels into reading the metadata and then keeping the metadata within a reasonable size through settings. We largely, most of our tables, we don’t touch anything but we set snapshot partitions on a cluster-wide basis, and just, based off sort of cluster size, how much resources we want there in order to read these large table metadata. So that’s all for kind of the patterns and insights I have. Thank you, and get in contact with me if you have any questions. I’m also on the Delta Lake Slack often and happy to answer questions or work through issues with you. Thank you very much, and take care.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Dominique Brezinski


Dominique Brezinski is a member of Apple's Information Security leadership team and principal engineer working with the Threat Response org. He has twenty five years experience in security engineering, with a focus on intrusion detection and incident response systems design and development. Dom has been working with Apache Spark in production since the 0.8 release.