The transaction log is key to understanding Delta Lake because it is the common thread that runs through many of its most important features, including ACID transactions, scalable metadata handling, time travel, and more. In this session, we’ll explore what the Delta Lake transaction log is, how it works at the file level, and how it offers an elegant solution to the problem of multiple concurrent reads and writes.
In this tech talk you will learn about:
Speakers: Denny Lee and Burak Yavuz
– All right, hello everyone, welcome to our talk under the sediments V2 I’m Burak and I’m here with Denny. I’m a software engineer at Databricks, and that’s basically it. I’m gonna go and grow a beard and change real quick, and Denny can introduce himself.
– Thanks very much Burak my name is Denny Lee. I’m a Dev advocate and now I’m gonna have to change my shirt as well.
– All right, let’s get started. So today we’re going to look underneath the sediments of a Delta log table. To do that we’re first going to analyze our Delta log which is the transaction log. And it’s the key component to Delta’s acid capabilities. Now we’re going to look at time-travel how Delta’s transaction log provides time-travel capabilities. And then also look at how Delta plans and runs executes, batch and streaming queries. And then we’re going to see it all in action by a demo from Denny. So, first of all, let’s start with the transaction log let’s let’s see what’s going on. How Delta looks actually, a Delta table looks on desk. So you have your directory of your table under this directory you’ll find your transaction log in this underscore Delta log directory. Within this directory, you’re going to find table version. So J small Json files that actually contain the changes that are being made to your Delta table. And these are actually called Delta files. You can optionally find partitioned directories of how your data is stored. Delta doesn’t actually need to store data inside partitioned directories. It’s just a cosmetic, you know, feature for hive tables. And then we also have our data files stored alongside them as well. So if you were to open up a Json file, what would you find inside? Inside you’ll find a set of actions and what these actions represent are changes that are being made to your table. So the first critical one is essentially the metadata, updating the metadata of your table. Within your metadata you contain information such as the name of the table, the schema, the partitioning that you’re using and a lot more like, the table properties and such. If you were to change the schema of your table, you’ll see an action of update metadata being committed to your table as well. Then you have your add file actions, add file actions, basically say “Hey, I’ve written this file committed to your log so that the next time a query comes I actually read the information from this file.” We put in the information such as the path of the file, the partition values that it stored for, and as well as, you know, optionally we have statistics per columns, and you’ll see all of that from Danny’s presentation, Danny’s demo. We can have removed file actions. These essentially say, “Hey, remove this file.” This can be due to like a compaction operation where you’re compacting five files into one, or these could be due to like delete operations that you perform on a Delta table. We have our set transaction action? These set transactions record an idempotent transaction ID. So this is used mainly by structured streaming and essentially it’s, every stream keeps track of how many batches it has already committed to your Delta table. And this way it becomes a very cheap operation to actually see if your stream restarts whether a data needs to be reprocessed again, or recommit it to your Delta table. Then we also have our protocol action. And what this provides is essentially the cool thing about Delta is that it’s just a transaction log on top of a object store. So, and you’re using kind of like a library to read the transaction log and reason about what data that you know should be queried on such. So the protocol information tells you whether, you know, your current library whether it’s your latest Databricks cluster or, you know, your Delta Log jar that you download from me then can actually read the transaction law correctly. And at the end you get a result of, you know, what’s the current metadata of my table. What are the list of files that I need to query? What’s the version so on and so forth. So one of the cool things about Delta is its acid transactions. So how does Delta provide atomicity the first letter in asset transactions? So the changes to your table are stored as ordered atomic units called commits. And as we mentioned these are your Delta files that are being committed to your table. What happens is you first write out all your data files and then you commit it to the transaction log. So for example here, we’re committing the addition of files one and two. And in that case once we can find this version in the transaction log we’ll know that we can read files one and two. For example, we can commit version one on top of that. And here we’re saying, “Oh, I’m removing files one and two, just read version three, file three.” This way if you were to query, a Delta table at version one, you would just read the file three. And then if that’s a atomicity let’s talk about concurrency and serializability. So we need to agree on the order of changes that are being made to your Delta table even when there are multiple writers. So how does that work? For example, user one could be reading. For example, it’s gonna mutate your Delta table. It might be running a merge operation for example. It will start from reading from version zero. At that time user two actually starts someone else’s written use version one user two reads the version at version one. And it tries to commit version two but what user one doesn’t know is that version one and two already happens. So these are one we’ll have to reason about conflicts within these two commits. So how do we reason about these conflicts? Well, both Delta, we solve conflicts optimistically. This was a pretty important design choice where we discussed that, you know analytical operations will typically be non conflicting meaning that we’re generally going to be appending new data to our table rather than continuously updating you know, certain rows over and over again. So what happens is when two writers are trying to write to the Delta table, first they’ll record their star versions which version was the table at before you know, a transaction starts and I’ll record all the reads and writes and performs during that transaction. It’ll attempt to commit if some other writer wins it’ll have to check if anything you read has changed. And then it’ll try again. So for example user one and user two could start at version zero, they’ll read both the schema at version zero. And what they’re doing is they’re just blindly appending data to your table. For example, two streaming jobs, you know, appending data to the same table. User one will win commit version one. And then in that case user two what we’ll have to do is see what changes have been made in this version one. Since it was a blind depend user two is gonna say “Hey, I’m also blind the pen, I don’t need to worry about it.” So what it’ll do is, it’ll immediately commit version two. It doesn’t have to rewrite all the data files again since there was no logical conflict of what these two users were doing. On the other hand, if there were logical conflicts if user one had deleted a file which user two is also deleting or has to read then users two will throw an exception saying “Hey, there’s been a concurrent modification.” There you might say, “Well okay so Delta is great you know it provides acid, but how do you handle your metadata?” One of the key pieces of Delta is that it handles it metadata internally. It doesn’t depend on the hive meta store. The problem there is you know, the problem with the hive meta store is you have all the information for all your tables stored in one database being accessed by many, many clusters and many different execution engines at a given time. So this one database can easily turn out to be the bottleneck in your operations. So what we do instead is Delta handles the metadata itself. How does it do it? Well it uses for example, Spark to actually compute its metadata. And this is beautiful because each cluster then becomes you know, a stateless replica of your Delta table. Therefore, you don’t have this many like concurrency issues. You don’t have this like single point of failure and such. That you might say, well, okay, you might be reading and using spark to read all these Json files but why don’t you have a problem of too many small files or then you’re right. And for that reason what Delta also uses is a technique called checkpointing. And what we do is you know, given every 10 commits right now for example, we would replay all the actions see what the latest state of the table is and dump it to, you know, a collection of parquet files and then Spark can actually query this checkpoint directly. And you know, it doesn’t have to read all the preceding Json files to compute the state at a given version. So more about these checkpoints, will they contain the latest state of all actions at a given version and the office gate they need to read all these small Json files. Why don’t we choose parquet for the checkpoints? First of all there’s no parsing overhead with parquet similar to Json. So if you have like a huge table instead of having like a gigabyte sized Json file you’ll have like a small megabyte size parquet file. And also you get column pruning capabilities. You don’t need to parse the Json as well as you can also read certain columns directly from the parquet file which can speed up many metadata operations. So how does Delta computing state? So what we’ll have to do is we’ll have to list the transaction log directory. We’ll find the Json files that we see and then we will use park to play those actions in order to compute the latest state. Once we compute the latest state we’ll cache that version in memory so that any, you know, operations or your adult’s table after that can reuse this cached version and not have to do this state computation again. If we were to update Delta state what we’ll do is we’ll list from, you know, first of all, if we’re at version zero or we’re just, you know a new cluster we’ll list from version zero we’ll get our list of files. So here we’re gotten seven you know, changes to our Delta log. We’ll use Spark to compute the state we’ll cache version seven. The next time we’ll again, list from version zero. We have our cached version seven in memory but here we see, you know, like a checkpoint file up here and then additional Json commits have been made. So once we see a checkpoint file we’ll actually use that checkpoint file and play the Json files after that. For our log replay and we’ll cache the version 12 there. And the next time we’ll use our latest available checkpoint version. So we’re using version 10 to list perform our lists, and then we’ll see that okay, we’re at version 14, we can compute the version 14 and cache it then use that. So that’s essentially the transaction log works. Let’s see how time travel works and kind of its limitations. So since we have our order transaction log we can actually go back in time and replay to any given point of the transaction log in time. So with Delta you can time travel by version and timestamps. Time travel by version you can specify the version as part of a version as of syntax and at syntax after your table identifier or within scholar, you can or Python, you can provide the version as of table options and provide which version you’re reading as well as just provide the version that you wanna read appended to the end of your directory. And Delta will figure out, “Okay, you want to read version 1071. I’ll just compute the snapshot at 1071 and give you the answer.” If I wanna time-travel by timestamp, you know, the access is very similar. You can use timestamp as of and provide, you know, a date column or a date expression. And if you want to use the at syntax you need to provide your timestamp in this year month, day, hour, minute, second milliseconds format. And you can also, again, provide a timestamp as of option for your, you know, Python or scholar readers or you can also append it to the end of your path. But what happens with timestamps is that we don’t know specifically which version that timestamp corresponds to. So we can’t just Delta log get snapshot at. So what do we have to do then we need to figure out which version that timestamp corresponds to. So the commit time stamps come from the store it’s system modification timestamps the storage system could be HDFS. It could be S3, it could be ATLs gen two on Azure. What these systems provide they unfortunately do not provide or guarantee monotonically increasing timestamps with our versions. These are distributed systems there could be clock skew. Therefore what Delta has to do first is to put these timestamps in order. So we have some timestamps from important moments in Turkish history here. And you can see that the last version is you know, the timestamp of the last version is before the timestamp of the previous version. So what we have to do there is if timestamps are out of order, we’ll just add one millisecond to the next versions timestamp, to put them in basically temporal order. And when you want to actually query your data given your timestamp what we do is we play prices, right rules. So we pick the closest commit with the timestamp that doesn’t exceed the user’s timestamp. So here, given 1492 we can select version 1071 because it was at 1453. And that’s the latest version that’s before the user provided timestamp. And then we can just compute our snapshot at 1071 accordingly. If you wanna learn more about time travel we have our time travel deep dive session tomorrow at the exact same time. So check that out. So some of the limitations, first of all we require the transaction log files to exist. Delta performs log cleanup by itself automatically, and it performs this log cleanup according to this log retention duration setting therefore you can time travel at most as you know, back to this date. So if you wanna time travel more than, you know, 30 days, which is the default you would need to set this setting. We also required the data files to exist. Therefore, we have a different setting for this called the deleted file retention duration. And you might want to set these two to the same value of how much you wanna time travel back. The limitation here is if you vacuum, you lose data and that’s why we have this deleted file retention duration which prevents files that have been removed from Delta state to be actually deleted they’re are kept around us tombstones. So given that these two requirements you can assume that, you know, time traveling in the order of months and years, maybe infeasible. The reason for this is that if you actually have a stream appending to your table changing data every minute you’re going to have commits happening every minute. This could cause like your storage costs to increase as well as, you know, competing Delta state won’t scale when you have like millions of Json files that you have to read at a given time. But what you can do to work around that is to use, for example an operation like clone to archive your data once a day to some archive location. And that can actually provide, you know, one change per day is not that bad to time-travel back 10 years but one change every minute will be pretty infeasible. If you wanna time time-travel back like even a month. So how do batch and streaming queries work on Delta tables? Well, batch queries are easy. First of all, we update the state of your table showing, you know, like how we shown before we performed the data skipping, you know given your filters and we filter the add file events in our transaction log, according to the metadata, for example, the partitions and stats and then given our reduced set of, you know we call it the rates that you can execute your query based on those files. How does streaming queries work? Again we update the state of the table. And then if you have provided partition filters for example, process all data. Since you know the last month we can perform our partition filter and then we’ll catch the snapshot. The snapshot we’ll start processing files in it, a thousand files at a time by default. This is to prevent you know, a batch being spawned for like, you know 10 million files, which will ruin your driver. And within this you know, first set of files, there is no guaranteed order while we’re processing the files in our snapshot. Instead of max files per offset, you can specify max bytes for trigger for example, as an option to better set how much each batch of your stream should process from your Delta table. For example, you might wanna process a hundred gigabytes every single one microbatch. Once your snapshot is over, then we’ll start tailing. The Json files the Delta files within your transaction login. This’ll become very cheap. You know, all we have to do is see what files have appeared within your Delta files while you might say you know, I do compactions I rewrite my files all the time. How does my stream interact with it? Well if you have compactions, for example, if you run optimize within Databricks, compactions are committed with a flag called data change equals false. And the streaming source is smart enough to skip over any file that has this data change equals false setting. So if you compact your data you won’t actually reprocess, you know, duplicate records. If you actually delete data from your source, you might be using merges or you might be deleting data due to GDPR or whatnot. Since Sparks streaming does not know how to propagate the leads downstream you’ll face an error and you can set, ignore changes to actually process all the files that have been committed or changed as part of that transaction. And ignore the fact that some records may have been deleted. A GOTCHA here is that vacuum made delete the files referenced in the Json files. So you would want to keep your tombstone or deleted file retention duration long enough longer than however downtime you expect from your streaming workloads to keep make that faster. There’s also the option to start your stream from a given version or timestamp. And that works by providing the option starting version or starting timestamp. And in that case, we’ll just start telling your Json files that the corresponding version and the same GOTCHA’s apply. Starting version and timestamp are inclusive. And the starting timestamp will start processing from the next version. If a commit hasn’t been made at the given timestamp, which is on like time-travel. So in time-travel you remember if you provided, you know, start processing at 1492, we would actually return the state at 1071. But with your stream, we’re giving you the changes after 1492 therefore we’ll actually start processing your changes inclusive of 1072 going forward. So to see all of this action, I’m gonna pass it over to Denny to give you a demo.
– Thanks, Burak diving right into the demo. Let’s showcase why asset transactions are so important when it comes to working with your data. For example let’s go ahead and stream data into our parquet table. Okay, so again, there was a function before that basically it was called generate and a pen data stream. We were basically generating new data and we’re doing this in the parquet format as indicated here and with the parquet path. Okay, and if you open this up, you’ll notice in the dashboard that there is an increased input and processing rate and here’s basically the average batch duration. Once we verify that, then we can run this command here sell 26 to see the number of rows going in there. So if we wait a few seconds, we can go ahead and see an increase in the number like before it’s 220 now it’s 320 simple enough. How about if I wanna have more than one client do an insertion into that table? Well, what you’ll notice is that when I open this up, there is no input versus processing rate and sure there’s a batch duration but basically there’s no new data in see here’s the first stream data is still going in as you can see here, but the second stream, no data’s going in, all right. And so, yes the count is increasing steadily but there’s actually no increase or you can’t have a second or concurrent stream go ahead and insert data into that table. So let’s go ahead and stop all streams and reduce the exact same test except this time we’re going to go ahead and run this with Delta Log. So again, we’re going to kick this off, all right. Same function the only difference is that I’m saying table format is Delta and the table path is the Delta Log path. I open this up and we’ll see data slowly steadily inserting in this perfect bam! Just like that. All right, so if I go ahead and take a look at the counts same idea, there’s going to be data showing up, but then what’s really cool is that I can run a second stream and a third stream. And for that matter, I can even run a fourth stream if I want to, but I’m just gonna run these two streams at the same time concurrently with the first stream. So now I’ve three concurrent streams inserting data into the same Delta Log table. And you notice that here’s the stream number two. And again, the input processing rate and input and processing rates are going up and same thing for number three. So there’s data going in, Okay. So all three streams, you’ll notice the graphs there’s color here and that should tell you basically the data’s going in. So that’s pretty cool. So now I’ve got three streams inserting data into my single Delta Log table concurrently, all right. So let’s stop the streams for now and just take a look and review what happened. So I’m gonna go look at the parquet table, all right and just simply grouping by a stream number. Basically each stream that I insert they had their own unique number, Okay. So I’m gonna go ahead and do an insertion here. And so there’s only stream number one for the parquet and about 1,020 rows inside here. But if I go ahead and run this against my Delta Log table okay.delta.ml state you’ll notice that there’s actually four streams. The zero stream, the reason zero exists is because that was the initial insertion of the 50 rows of data that I placed on my original table. And then here are the three concurrent streams but you’ll notice that actually the parquet table only has one while the Delta table has four. Okay, this is happening because the insertion of the new data had a slightly different schema than the old data. Okay and because it did, it basically ran over and corrupted the original parquet table. In the case of Delta Lake I’ve got all three streams inserting new data in but I did not lose the original table as well. So that tells you how powerful it is. Because even within the context of scheme of enforcement or schema evolution you’re able to ensure that you keep all the data and your data does not become corrupt into the process. So underneath the covers, back to what we originally shown in the demo, there’s that Delta log folder right, and what that described table history is looking is actually looking at all the files inside here. Okay, so there are 25 Json files corresponding to the 25 history, Okay, there we go. Versions okay, of the table. So that transaction’s all there. All right, so as Burak had described before every 10th one there’s a checkpoint parquet file created as well, all right. So that’s where the V2 portion is. This is not just a second version of this particular session but we’re actually referring to the V2 checkpoint. All right, well, let’s just take a look at what the Json looks like. Right, so we just opened this up and it’ll break down the schema information like what files were added, the commit information and various operational metrics are all stored inside here, okay. And so if you wanna take a look at it, for example, here’s the commit information just run this command, open this up, and the same operational metrics that we talked about, they’re stored directly in that Json file that you saw in his described history, Okay. And the various operational parameters for that, for that matter as well right. The fact that we’re doing the pent, all right. In terms of add information, this’ll tell us what data was changed. This is for the very first one so nothing changed it but we actually tell you which file is involved. Remember how Burak also explained that the CRC file contains the table size and bytes and the number of files for diamond partition pruning? Well, let’s take a look at the CRC and here it is the table size bites, the statistics that it needs right. And as well, the number of files so it knows how to partition prude, this case, this particular example we’re not actually using partitioning so there’s not much to prune here but nevertheless, that’s the context. Okay, now let’s take a look at the checkpoint file right and remember, V2 has some improvements. The main improvement actually is called out here under the add when we adding new files, adding new rows. We have stats, which is a string, which is the way it was originally in the Json hub. But we also have stats parsed, Okay. So the number of records, the minimum values max values, Knoll counts, things of that nature. So that way the statistics are actually in columnar format. So it’s easy for Delta Log to actually grab this information and know how to actually set up the statistics better and actually run the query plans better. The key thing here is the add information. Here’s the file that was added. Path, okay. Here’s the exact file inside the system. It tells you the size, this modification. Now, when you read the stats you could try to reach stats by itself. But again, that’s just a string or for optimization purposes, you can look read stats parsed which now you have the different columns for the number of records. The men values, for example, in this case, Iowa right, oops sorry. And the max values and the no cuts, Okay? So all that information is right there for you to be able to work with. Well, that’s it for the quick demo on the V2 checkpoint, but again this notebook will be available for you to download. So that way you can run this yourself. And so you can see this, how this works in when you combine it back with a Burak’s presentation
– Thank you everyone for listening and we’re happy to hear your questions. And as you can see I’ve just rolled back to my shaven state again, so bye.
Denny Lee is a Developer Advocate at Databricks. He is a hands-on distributed systems and data sciences engineer with extensive experience developing internet-scale infrastructure, data platforms, and predictive analytics systems for both on-premise and cloud environments. He also has a Masters of Biomedical Informatics from Oregon Health and Sciences University and has architected and implemented powerful data solutions for enterprise Healthcare customers. His current technical focuses include Distributed Systems, Apache Spark, Deep Learning, Machine Learning, and Genomics.
Burak Yavuz is a Software Engineer and Apache Spark committer at Databricks. He has been developing Structured Streaming and Delta Lake to simplify the lives of Data Engineers. Burak received his MS in Management Science & Engineering at Stanford and his BS in Mechanical Engineering at Bogazici University, Istanbul.