
Making Apache Spark™
Better with Delta Lake
Want to watch instead of read? Check out the video here.
The promise of a data lake
Michael Armbrust:
I’m super excited to be here today to talk about how you can make Apache Spark better by using Delta Lake. However, before I jump into that, I want to start by talking about this concept of a data lake and why so many people are excited with it, and also why there’s a lot of challenges when they try to set these things up as well.
So, first of all, what is a data lake, and what does it mean to me? So, the promise of a Delta Lake is basically this, organizations have a lot of data. It might be carefully curated customer data in your OLTP system. It might be raw clickstream coming from your web server, or it might be unstructured data coming from a bunch of sensors.
And the promise of a data lake is that you can take all of that and just dump it in the data lake. And this is actually really powerful when you compare it to a traditional database. Because in a traditional database, you have to start by coming up with a schema and doing a lot of cleaning. This is often called a schema-on-write.
And what a data lake allows you to do is it allows you to forego that process and just start by collecting everything. Because sometimes you don’t know why data is valuable until much later. And if you haven’t stored it, then you’ve already lost it. And so, the data lake is just a bunch of files out in a file system. It could be S3 or HDFS, or Azure Blob Storage. And you can just dump everything there and then come back and look at it later.
And the idea is when you’re done, and once you’ve collected it all, then you can actually get insights from it. You can do data science and machine learning. You can build powerful tools for your business, like recommendation engines or fraud detection algorithms. You can even do crazy things like cure cancer using genomics and DNA sequencing.
However, I’ve seen this story many, many times. And typically, what happens is, unfortunately, the data at the beginning is garbage. And so, the data that you store in your data lake is garbage. And as a result, you get garbage out from these more advanced processes that you try to do at the end.
Challenges with a data lake
And why does that happen? Why is it so difficult to get quality and reliability out of these data lakes and what does this typical project look like? I want to walk you through a story that I’ve seen happen over and over again, at many organizations, when they sit down and try to extract insights from their data. And it typically goes something like this. And this is what is considered cutting edge today, but before Delta Lake.
So, a pretty common pattern is you’ve got a stream of events that are coming into some system like Apache Kafka. And your mission is to do two things, you need to do streaming analytics, so you can know what’s going on in real time in your business. And you also want to do AI and reporting where you can look at a longer period of time and do a longitudinal analysis, and actually look at the history and trends and make predictions about the future.
So, how are we going to do this? Step one, I sit down at my computer. And I know that Spark has really good APIs for reading from Apache Kafka. You can use data frames, data sets, and SQL and Spark SQL to process and do aggregations and time windows and all kinds of things that come up with your streaming analytics.
And so, we start with that and off the bat, it’s working pretty well. But this brings us to challenge number one, which is historical queries. Kafka is great for getting real time analysis, but it can only store a day or a week worth of data. You don’t want to be storing years and years’ worth of data in Kafka. And so, we can solve this problem as well. Real time is really good for what’s happening at this moment, but it’s not so good at looking for trends historically.
So, I’ve been reading a lot of blog posts. And a pretty common pattern that happens here is there’s this thing called the lambda architecture, which, as far as I can tell, is basically you just do everything twice. You have one real time thing that is doing an approximation and giving you exactly what’s happening at this very moment. And you have another pipeline, which is maybe a little more curated, and runs a little bit more slowly. But it’s archiving all of that data into your data lake. And so, that’s step number one. So, if we want to solve this historical query problem, we’re going to also set up the lambda architecture on top of vanilla Apache Spark.
Then once I’ve got all that data in the data lake, the idea is that now I can run Spark SQL queries over that as well. And I can do AI and reporting. That was a bit of extra work and some extra coordination. But fortunately, Spark has a unified API for batch and streaming. And so, it’s possible to do it, you can set up.
But that brings us to challenge number two. Like I said before, data in the real world is often messy. Some team upstream from you changes the schema without telling you and now you have problems. And so, a pattern that I see here is you need to add validations. So, you need to actually write extra Spark SQL programs that are just checking to make sure that your assumptions about the data are correct. And if they’re wrong, it sends off an email so that you can correct it.
Now, of course, because we’ve done the lambda architecture, we have to do validations in two different places. But again, that’s something that we can do. We can use Spark to do it. And so, now we set up validations, they handle the messy data. That unfortunately brings us to challenge number three, which is mistakes and failures.
Those validations are great, but sometimes you forget to put one in place or there’s a bug in your code, or even harder if your code just crashes in the middle because you’re running on EZ2 and your spot instances die. And now you have to worry about cleaning that up. The real problem with using these kinds of distributed systems and distributed file systems is, if a job crashes in the middle, it means garbage results out there that need to be cleaned up. And so, you’re forced to do all of the reasoning about correctness yourself. The system isn’t giving you a lot of help here.
And so, a pretty common pattern is people, rather than working on an entire table at a time, because if something goes wrong, we need to recompute the entire table, they’ll instead break it up into partitions. So, you have a different folder. Each folder stores a day or an hour, a week whatever granularity makes sense for your use case. And you can build a lot of scripting around it, so that it’s easy for me to do read computation.
So, if one of those partitions gets corrupted for any reason, whether it was a mistake in my code or just a job failure, I can just delete that entire directory and reprocess that data for that one partition from scratch. And so, by building this partitioning and reprocessing engine, now I can handle these mistakes and failures. There was a little bit of extra code to write but now I can sleep safe and sound knowing that this is going to work.
That brings us to challenge number four, though. Updates. It’s very difficult to do point updates. It’s very easy to add data, but it’s very difficult to change data in a data lake and to do it correctly. And you may need to do this for GDPR reasons. You may need to do retention. You might have to do anonymization, or other things, or you might just have errors in the data. And so, now you have to end up writing a whole nother class of Spark jobs that do updates and merges, because this can be very difficult.
And typically, because it’s so difficult, what I see people do is rather than do individual updates, which would be very cheap, they actually just anytime they need to do something, whenever they get a set of DSRs once a month, they will copy the entire table, removing anybody who’s asked to be forgotten due to GDPR. And they could do it, but it’s another Spark job to run, it’s very costly.
And there’s a subtlety here that makes it extra difficult, which is, if you modify a table while somebody is reading it generating a report, they’re going to see inconsistent results in that report will be wrong. So, you’ll need to be very careful to schedule this, to avoid any conflicts when you’re performing those modifications. But these are all problems that people can solve. You do this at night and you run your reports during the day or something. And so, now we’ve got a mechanism for doing updates.
However, the problem here is this has become really complicated. And what that means is you’re wasting a lot of time and money solving systems problems, rather than doing what you really want to be doing, which is extracting value from your data. And the way I look at this is these are all distractions of the data lake that prevents you from actually accomplishing your job at hand.
And to summarize what I think these are, a big one here is no atomicity. When you run a distributed computation, if the job fails in the middle, you still have some partial results out there. It’s not all or nothing. And so, atomicity means that when a job runs, it either completely finishes correctly or if anything goes wrong, it completely rolls back and nothing happens. So, you no longer leave your data in a corrupt state requiring you to tediously build these tools to do manual recovery.
Another key problem is there’s no quality enforcement. It’s up to you in every job to manually check the quality of the data that’s coming in. Again, it’s all of your assumptions. There’s no help from the system like invariants in a traditional database where you can say, “No, this column is required or this must be this type of schema.” All of that stuff is left up to you as the programmer to handle.
And then finally, there’s no control for consistency or isolation. And this means you can really only do one right operation to any data lake table at a time. And it makes it very difficult to mix streaming and batch or to do operations where people are reading from it. And these are all things that you would expect from your data storage system. You would want to be able to do these things and people should always be able to see a consistent snapshot automatically.
Solving these challenges with Delta Lake
So, let’s take a step back now and look at what this process looks like with Delta Lake instead. And the idea of Delta Lake is we take this relatively complicated architecture, where a lot of these correctness and other things were left up to you manually writing Spark programs and we change it to something like this, where you’re thinking only about data flow, where you bring in all of the data from your organization and flow it through continually improving the quality until it’s ready for consumption.
And the hallmarks of this architecture here are first of all, Delta Lake brings to Apache Spark full ACID transactions. And what this means is every Spark job that runs will now complete either the entire job or nothing at all. People who are reading and writing from the same time are guaranteed to see consistent snapshots. And when something is written out, it’s definitely written out and it will not be lost. Those are the hallmarks of ACID. And this allows you to focus on your actual data flow rather than thinking about all of these extra systems problems and solving this known thing over and over again.
Another key aspect of Delta Lake is it is based on open standards, and it’s open source. So, it’s a full Apache License. No silly common clauses or anything like that. You can take it and use it for whatever application you want completely for free. And personally, that would be really important to me if I was storing petabytes of data. Data has a lot of gravity. There’s a lot of inertia when you collect a lot of data. And I would want to put it in some black box where it’s very difficult for me to extract it.
And this means that you can store mass amounts of data without worrying about locking. So, both it is open source, but it’s also based on open standards. So, I’ll talk about this in more detail later in the talk. But underneath the covers, Delta is actually storing your data in parquet. So, you can read it with other engines. And there’s a growing community around Delta Lake building this native support in there.
But, worst case scenario, if you decide you want to leave from Delta Lake, all you need to do is delete the transaction log and it just becomes a normal parquet table. And then finally, Delta Lake is deeply powered by Apache Spark. And so, what this means is, if you’ve got existing Spark jobs, whether they’re streaming or batch, you can easily convert those to getting all kinds of benefits of Delta without having to rewrite those programs from scratch. And I’m going to talk exactly about what that looks like later in the talk.
Delta Lake architectures: bronze, silver, and gold tables
But now, I want to take this picture and simplify it a little to talk about some of the other hallmarks I see of the Delta Lake architecture and where I’ve seen people be very successful.
So, first of all, I wanted to zone in on this idea of data quality levels. These are not fundamental things of Delta Lake. I think these are things that people use a variety of systems. But I’ve seen people very successful with this pattern alongside the features of Delta. And so, these are just general classes of data quality. And the idea here is as you bring data into the Delta Lake, rather than trying to make it perfect all at once, you’re going to incrementally improve the quality of your data until it’s ready for consumption. And I’ll talk about why I think that’s actually a really powerful pattern that can actually help you be more productive.
So, starting at the beginning is your bronze level data. This is a dumping ground for raw data. It’s still on fire. And I actually think that’s a good thing. Because the core idea here is, if you capture everything without doing a lot of munging or parsing on it, there’s no way that you can have bugs in your parsing and munging code. You’re keeping everything from the beginning. And you can often actually keep a year’s worth of retention here. And I’ll talk in a little bit about why I think that’s actually really important.
But this means you can collect everything. You don’t have to spend a bunch of time ahead of time deciding what data is going to be valuable and what data is not. You can figure that out as you go, as you do your analysis.
Moving on from bronze, we move on to silver level data. This is data that is not yet ready for consumption. It’s not a report that you’re going to give to your CEO. But I’ve already done some cleanup. I filtered out one particular event type. I’m parsing JSON and giving it a better schema. Or maybe I’ve joined in augmented different data sets. They’ve got all the information I want in one place.
And, you might ask, if this data isn’t ready for consumption, why am I creating a table taking the time to materialize it. And there’s actually a couple of different reasons for that. One is, oftentimes, these intermediate results are useful to multiple people in your organizations. And so, by creating these silver level tables, where you take in your domain knowledge and clean the data up, you’re allowing them to benefit from that automatically without having to do that work themselves.
But a more interesting and more subtle point here is it also can really help with debugging. When there’s a bug in my final report, being able to query those intermediate results is very powerful, because I can actually see what data produces those bad results and see where in the pipeline that makes sense. And this is a good reason to have multiple hops in your pipeline.
And then finally, we move on to the gold class of data. This is clean data. It’s ready for consumption. It’s these business level aggregates that actually talk about how things are running and how things are working and this is almost ready for a report.
And here, you start using a variety of different engines. So, like I said, Delta Lake already worked very well with Spark. And there’s also a lot of interest in adding support for Presto and others. And so, you can do your streaming analytics and AI and reporting on it as well.
Streaming and batch processing on Delta Lake
So, now I want to talk about how people actually move data through the Delta Lake, through these different quality classes. And one of the patterns that I see over and over again is – streaming is actually a really powerful concept here. And before I go too deep into streaming, I want to correct some misconceptions that I often hear.
So, one thing that people usually think when they hear streaming, they think it’s got to be super fast. It’s got to be really complicated because you want it to be really fast. And Spark actually does support that mode, if that’s an application that you have. There’s continuous processing where you continually pull the server for new data, holding on to that core that supports the millisecond latencies.
But that’s actually not the only application where streaming can make sense. Streaming to me is really about incremental computation. It’s about a query that I want to run continuously as new data arrives. So, rather than thinking about this as a bunch of discrete jobs and putting all of the management of those discrete jobs, on me are some workflow engine, streaming takes that away.
You write a query once, you say “I want to read from the bronze table, I want to do these operations. I’m going to write on the silver table,” and you just run it continuously. And you don’t have to think about the complicated bits of what date is new, what data has already been processed, how do I process that data and committed downstream transactionally, how do I checkpoint my state so that if the job crashes and restarts, I don’t lose my place in the stream. Structured streaming takes care of all of these concerns for you.
And so, rather than being more complicated, I think it can actually simplify your data architecture. And streaming in Apache Spark actually has this really nice cost latency tradeoff that you can tune. So, at the far end, you can use continuous processing mode. You can hold on to those cores for streaming persistently and you can get millisecond latency.
In the middle zone, you can use a micro batch. And the nice thing about micro batch is now you can have many streams on the cluster and their time multiplexing those cores. So, you run a really quick job and then you give up that core and then someone else comes in and runs it. And with this, you can get seconds to minutes latency. This is a sweet spot for many people because it’s very hard to tell if something is up to date within the last minute, but you do care if it’s up to date within the last hour.
And then finally, there’s also this thing called trigger-once mode in Structured Streaming. So, if you have a job where data only arrives once a day or once a week or once a month, it doesn’t make any sense to have that cluster up and running all the time, especially if you’re running in the cloud where you can give it up and stop paying for it.
And Structured Streaming actually has a feature for this use case as well. And it’s called trigger-once where basically rather than run the job continuously, anytime new data arrives, you boot it up, you say trigger once, it reads any new data that has arrived, processes it, commits a downstream transaction and shuts down. And so, this can give you the benefits of streaming, the ease of coordination, without any of the costs that are traditionally associated with an always running cluster.
Now, of course, streams are not the only way to move data through a Delta Lake. Batch jobs are very important as well. Like I mentioned before, you may have GDPR, or these corrections that you need to make. You may have changed data capture coming from some other system where you’ve got a set of updates coming from your operational store. And you just want to reflect that within your Delta Lake. And for this, we have UPSERTS. And of course, we also support just standard insert and delete and those kinds of commands as well.
And so, the really nice thing about Delta Lake is it supports both of these paradigms and you can use the right tool for the right job. And you can seamlessly mix streaming and batch without worrying about correctness or coordination.
And one final pattern here that I want to talk about is this idea of recomputation. So, when you have this early table that keeps all of your raw results and when you have very long retention on that, so years’ worth of the original data.
And when you use streaming in between the different nodes of your Delta Lake data graph, it’s very easy for you to do recomputation. You might want to do recomputation because there was a bug in your code or you might want to do recomputation because there’s some new thing that you’ve decided that you want to extract. And the really nice thing here, because of the way that streaming works, is this is very simple.
So, just to give you a mental model for how structured streaming works in Apache Spark, we basically have the model that a streaming query should always return the same results of the batch query over the same amount of data. So, what that means is when you start a new stream against the Delta table, it starts by taking a snapshot of that table at the moment that the stream is started.
And you do this backfill operation where you process all of the data in that snapshot, breaking it up into nice little chunks and checkpointing your state along the way, committing it downstream. And when you get to the end of that snapshot, we switch to tailing the transaction log and only processing new data that has arrived since the query started. And what this means is that you get the same result as though you had run the query at the end anyway, but with significantly less work than running it over and over and over again from scratch.
So, if you want to do recomputation under this model, all you need to do is clear out the downstream table, create a new checkpoint, and start over. And it will automatically process from the beginning of time and catch up to where we are today. So, that’s actually a pretty powerful pattern for correcting mistakes and doing other things.
Delta Lake customer use cases
So, now that we’ve gone over the high level, I want to talk about some specific use cases where Delta Lake has been instrumental in both reducing cost and using the management of Apache Spark on top of these Delta Lakes.
So, Delta Lake, I want to give a little bit of a history here. Delta Lake is actually a few years old. We previously had it inside of Databricks for the last two years as a proprietary solution. And we’ve got some of our largest customers using it. So, I’m going to talk in particular about Comcast, but also Riot Games, the Jam City and Nvidia, a bunch of big names that you know. They’ve been using it for many years.
And about two months ago, at the Spark Summit, we decided to open source it so that everybody or even people running on-prem or in these other locations could get access to the power of Delta Lake.
Comcast
So, I want to talk about one particular use case that I thought was really cool. This is Comcast. So, their problem here is they have set top boxes around the world. And in order to understand how people are interacting with their programming, they need to sessionize this information. So, you watch this TV show, you change the channel, you go over here, you go back to this other TV show. And with this, they can create better content by understanding how people consume it.
And as you can imagine, Comcast has many subscribers. So, there’s petabytes of data. And before Delta Lake, they were running this on top of Apache Spark. And the problem was the Spark job to do this sessionization was so big that the Spark scheduler would just tip over.
And so, rather than run one job, what they actually had to do was they had to take this one job, partition it by user ID. So, they take the user ID, they hash it. They mod it by, I think, by 10. So, they break it into 10 different jobs. And then they run each of those jobs independently.
And that means that there’s 10x the overhead in terms of coordination. You need to make sure those are all running. You need to pay for all of those instances. You need to handle failures and 10 times as many jobs. And that’s pretty complicated. And the really cool story about switching this to Delta was they were able to switch a bunch of these kinds of manual processes to streaming. And they were able to dramatically reduce their costs by bringing this down into one job running on 1/10 of the hardware. So, now computing the same thing, but with 10x less overhead and 10x less cost.
And so, that’s a pretty powerful thing here that Delta’s scalable metadata can really bring to Apache Spark. And I’m going to talk later in the talk exactly how that all works. But before I get into that, I want to show you exactly how easy it is to get started if you’re already using Apache Spark with Delta Lake.
So, getting started is trivial. So, you published it on Spark packages. All you need to do to install Delta Lake on your Spark cluster is use Spark packages. So, if you’re using pySpark, you can just do dash, dash packages. And then Delta, if you’re using the Spark shell, same thing. If you’re building a Java or Scala jar, and you want to depend on Delta, all you need to do is add a Maven dependency.
And then changing your code is equally simple. If you’re using the data frame reader and writer in Sparks SQL, all you need to do is change the data source from parquet, or JSON or CSV or whatever using today to Delta. And everything else should work the same. The only difference is now everything will be scalable and transactional, which as we saw before can be very powerful.
Everything I’ve talked about so far has been mostly about these kinds of system problems of correctness. If my job crashes, I don’t want it to corrupt the table. If two people write to the table at the same time, I want them to both see consistent snapshots. But data quality is actually more than that. You can write code that runs correctly, but there can be a bug in your code and get the wrong answer.
This is why we’re expanding the notion of data quality to allow you to declaratively talk about the quality constraints. So, this is work that’s coming in the next quarter or so. But the idea here is we allow you to, in a single place, specify the layout and constraints of your Delta Lake. So, you know first we can see some important things like where the data is. stored, you can optionally turn on strict schema checking.
Delta Lake has two different modes here. And I often see people use both of them as they move through their data quality journey. In the earlier tables, you will use a schema and prints where maybe you just read a bunch of JSON and just put it exactly as it is into the Delta Lake. We have nice tools here where we will automatically perform safe schema migrations.
So, if you’re writing data into Delta Lake, you can flip on the merge schema flag, and it will just automatically add new columns that appear in the data to the table so that you could just capture everything without spending a bunch of time writing DDL.
We of course also support a standard strict schema checking, where you say, create a table with the schema, reject any data that doesn’t match that schema. And you can use an alter table to change the schema of the table. And often, I see this use down the road and the gold level tables where you really want strict enforcement of what’s going in there.
And then finally, you can register tables in the Hive metastore, that support is coming soon. And also put human readable description, so people coming to this table can see things like this data comes from this source and is parsed in this way. It is owned by this team. This extra human information that you can use to understand what data will get you the answers you want.
And then finally, the feature that I’m most excited about is this notion of expectations. An expectation allows you to take your notion of data quality and actually encode it into the system. So, you can say things like, for example, here, I said I expect that this table is going to have a valid timestamp. And I can say what it means to be a valid timestamp for me and for my organization.
So, I expect that the timestamp is there. And I expect that it happened after 2012 because my organization started in 2012. And so, if you see data from say 1970 due to a deep person error, we know that’s incorrect and we want to reject it. So, this is very similar to those of you who are familiar with a traditional database. This sounds a lot like a variant, where you can take not null or other things on the table.
But there’s a subtle difference here. So, the idea of invariants is that you can say things about tables. And if one of those invariants is violated, the transaction will be aborted and will automatically fail.
And I think the problem with big data, why invariants alone are not enough is if you stop processing, every single time you see something unexpected, especially in those earlier bronze tables, you’re never going to process anything. And that can really hurt your agility.
And so, the cool thing about expectations is we actually have a notion of tunable severity. So, we do support this fail stop, which you might want to use on a table that your finance department is consuming because you don’t want them to ever see anything that is incorrect. But we also have these kinds of weaker things where you can just monitor how many records are valid and how many are failing to parse and alert at some threshold.
Or even more powerful, we have this notion of data quarantining where you can say, any record that doesn’t meet my expectations doesn’t fill the pipeline, but also doesn’t let it go through. Just quarantine it over here and another table, so I can come and look at it later and decide what I need to do to remediate that situation. So, this allows you to continue processing, but without corrupting downstream results with this invalid record.
So, like I said, this is a feature that we’re actively working on now. Stay tuned to GitHub for more work on it. I think this fundamentally changes the way that you think about data quality with Apache Spark and with your Delta Lake.
Now that they’ve been over the high level of what Delta is, why should you care about it? I want to go into the nitty gritty details of how Delta actually works. Because it sounds almost too good to be true that we can bring these full ACID transactions into a distributed system like Apache Spark and still maintain good performance.
First of all, let’s start by looking at what a Delta table looks like when it’s actually stored out on disk. So, to those of you that have a Delta Lake already, this should look really familiar. It’s just a directory stored in your file system S3, HDFS, Azure Blob Storage, ABLS. It’s just a directory with a bunch of parquet files in it.
And there’s one extra bit that is very important. And that is that we also store this transaction log. And inside of the transaction log, there are different table versions. And I’ll talk a little bit about those table versions in a moment. But we still store the data in partition directories. However, that’s actually mostly for debugging. They’re also modes of Delta where we can work directly with storage systems in the most optimal way.
For example, in S3, they recommend if you’re going to be writing a lot of data out regularly rather than create date partitions, which create hotspots of temporal locality. Instead, you randomly hash the partition and because of the power of Delta’s metadata, we can do that as well. And then finally, standard data files, which are just normal encoded parquet files that can be read by any system out there.
So, what is actually in those table versions? How do we reason about what the current state of a table is? So, each one of those table versions has a set of actions that apply to the table and change it in some way. And the current state of a table at this moment is the result of the sum of all of those actions. So, what kinds of actions am I talking about?
Well, for one example, we can change the metadata. So, we can say this is the name of the table, this is the schema of the table, you can add a column to the table or something, you can set the partitioning of the table. So what action you can take is to change the metadata.
The other actions are add a file and remove a file. So, we write out a parquet file. And then to actually make it visible in the table, it needs to also be added to the transaction log. And I’ll talk about why that extra level of indirection is a really powerful trick in a moment. And another detail here is when we add files into Delta, we can keep a lot of optional statistics about them.
In some versions, we can actually keep the min and max value for every column, which we can use to do data skipping or quickly compute aggregate values over the table. And then finally, you can also remove data from the table by removing the file. And again, this is a lazy operation. This level of indirection is really powerful. When we remove a file from the table, we don’t necessarily delete that data immediately, allowing us to do other cool things like time travel.
And so, the result here of taking all these things is you end up with the current metadata, a list of files, and then also some details like a list of transactions that have been committed and the protocol version we’re at.
So, how does this allow us to get ACID to actually get these nice properties of transactional databases? So, one detail here is when we’re creating these table versions, we store them as an ordered atomic unit called commits. So, I talked about this before. We create version zero of the table by creating this file, 0.json. And the idea here is when Delta constructs that file in the file system, we will use underlying atomic primitives.
On S3, in order to guarantee atomicity, all you need to do is upload to the system. And the way they do this is you start your upload by saying I expect to upload this many bytes. And unless you actually successfully upload that many bytes, S3 won’t accept the right. So, you’re guaranteed in July to get the whole file or none of the file.
Another system is Azure or HDFS. What we’ll do is we’ll create a temporary file with the whole contents. And then we’ll do an atomic rename, so that the entire file is created or not. And then you can have successive versions. So, in version one, we added these two files, or sorry, in version zero, we added these two files. In version one, we remove them and put in the third. So, for example, you could be doing compaction here, where you atomically take those two files and compact them into one larger file.
Now, another important detail here is that we want to add atomicity for each of these commits, but we also want serializability. We want everybody to agree on the order of changes to the stable that we can correctly do things like merge into for change data capture and other things that require this property.
And so, in order to agree on these changes even when there’s multiple writers, we need this property called mutual exclusion. If two people try to create the same version of a Delta table, only one of them can succeed. So, just to make this a little bit more clear, user one could write version zero of the table. User two could write version one. But if they both try to write version two, then one of them can succeed but the other one must get an error message saying, “Sorry, your transaction didn’t go through.”
And now you’re probably saying, “Wait a second. But if two people do something at once, it fails. That sounds like I’m wasting a lot of time and a lot of work. And that sounds like a lot of complexity for me.” Unfortunately, this is where we use a third cool trick called optimistic concurrency. And the idea of optimistic concurrency is when you perform an operation on the table, you’re just going to optimistically assume that it’s going to work. And if you have a conflict, you’ll just check to see if that conflict matters to you. And if it doesn’t, you’re allowed to optimistically try again.
And then in most cases, it actually turns out that the transactions are not overlapping and you’re allowed to automatically remediate this. So, to give you a concrete example here, let’s say we have two users, and both of these users are streaming into the same table.
So, when both of them begin their streaming right, they start by reading the version of the table at that moment. They both read in version zero. They read in the schema of the table. So, they make sure that the data that they’re appending has the correct format. And then they write some data files out for the contents of the stream that are going to be recorded in this batch. And they record what was read and what was written from the table.
Now, they both try to commit. And in this case, user one wins the race and user two loses. But what user two will do is they’ll check to see if anything has changed. And because the only thing they read about the table was the schema and the schema has not changed, they’re allowed to automatically try again. And this is all hidden from you as the developer. This all happens automatically under the covers. So, they’ll both try to commit and they’ll both succeed.
Now, the final trick that we have here is tables can have massive amounts of metadata. And those of you who have tried to put millions of partitions into the Hive metastore are probably familiar with this problem. Once you get to those data sizes, the metadata themselves can actually be the thing that brings the system down.
And so, we have a trick for this, which is actually we’ve already got a distributed processing system capable of handling massive amounts of data, we’ll just use Spark. So, we take the transaction log with its set of actions, we read it in with Spark, we can actually encode it as a checkpoint in parquet. A checkpoint is basically the entire state of a table at some version.
So, when you’re reading the transaction log, rather than have to read the entire transaction log, you can just start with a checkpoint, and then any subsequent changes that happened after that. And then this itself can be processed with Spark.
So, when you come to a massive table that has millions of files, and you ask the question like, how many records were added yesterday, what we’ll do is we’ll run two different Spark jobs. The first one crew is the metadata and says which files are relevant to yesterday. And it’ll get back that list of files and then you’ll run another Spark job that actually processes them and does the count.
And by doing this in two phases, we can drastically reduce the amount of data that needs to be processed. We’ll only look at the files that are relevant to the query. And we’ll use Spark to do that filtering.
So, before we even go to questions, I want to talk a little bit about the roadmap. Like I said, while this project has been out there for a couple of years now, it’s just recently been open source. We have a pretty exciting roadmap for the rest of the year. Basically, our goal is for the open source Delta Lake project to be fully API compatible with what’s available inside of Databricks. And so, the roadmap for the rest of the quarter is basically open sourcing a lot of cool features that we have.
So, we actually a couple weeks ago released version 0.2.0, that added support for reading from S3 and also reading from Azure Blob Store and Azure Delta Lake. And then this month, we are planning to do a 0.3.0 release, that is going to add Scala APIs for update, delete, merge and vacuum. And Python APIs will be following shortly. And then for the rest of this quarter, we have a couple of things on our plan. We want to add full DDL support. So, that’s to create tables and alter tables.
And we also want to give you the ability to store Delta tables in the Hive metastore, which I think is very important for data discovery in different organizations. And we want to take those DML commands from before update, delete and merge and actually hook them into the Spark SQL parser. So, you can use standard SQL to do those operations as well.
And then moving forward, let us know what you want. So, if you’re interested in doing more, I recommend you to check out our website at Delta.io. It has a high level overview of the project. There’s a quick start guide on how you can get started. And it also has links to GitHub, where you can watch the progress and see what our roadmap is and submit your own issues on where you think the project should be going.
So, I definitely encourage you to do that. But with that, I think we’ll move over to questions. So, let me just pull those up and see what we got.
Q&A
Michael Armbrust:
Does Delta Lake add any performance overhead?
I want to break that down. So, first of all, Delta Lake is designed to be a high throughput system. So, each individual operation, there is a little bit of overhead in performing it. So, you basically because rather than just write up the files, we need to write out the files and also write out the transaction log. So, that adds a couple of seconds to your Spark job.
Now, the important thing here is we designed Delta to be massively parallel and very high throughput. So, you get a couple of seconds added to your Spark job. But that is mostly independent of the size of your Spark job. So, what Delta Lake is really, really good at is ingesting trillions of records of data or petabytes of data or gigabytes of data. What data is not good at is inserting individual records. If you run one record per Spark job, there will be a lot of overhead.
So, the trick here is you want to use Delta in the places where Spark makes the most sense, which are relatively large jobs spread out across lots of machines. And in those cases, the overhead is negligible.
Since it has ACID properties, will my system be highly available as well?
So, Delta, again, it’s designed specifically to take advantage of the cloud and to take advantage of these nice properties.
So, to me, there’s a couple of nice properties of the cloud. One is that the cloud is very scalable. You can put tons of data into S3 and it just handles it arbitrarily. It’s generally pretty highly available. So, you can always read data from S3, no matter where you are. If you really, really care, there’s even things like replication, where you can replicate your data to multiple regions. And Delta plays very nicely with that. So, reading from a Delta table should be very highly available, because it’s really just the availability of that underlying storage system.
Now, those of you who are familiar with the CAP theorem might be saying, but wait a second, so for rights when we think about consistency, availability, and partition tolerance, Delta chooses consistency. So, if you cannot talk to the central coordinator, depending on whether you’re on S3, that might be your own service that you’re running, on Azure they’ve taken the consistency approach, we use an atomic operation there, the system will pause.
But the nice thing here is that because of that optimistic concurrency mechanism, that doesn’t necessarily mean you lose that whole job that you might have been running for hours. It just means you have to wait until you’re able to talk to that service. So, I would say in terms of reads very highly available, in terms of rights, we choose consistency, but in general, that actually still worked out pretty well.
The next thing was you keep all levels of data. Well, and I think I want to clarify the idea behind that bronze, silver, gold. Not everybody keeps the raw data around, not everybody keeps all of the data. You might have retention requirements that say, you’re only allowed to keep two years of data.
So, really, I think it’s up to you to decide what data makes sense to hold on to. The only thing I would say is, I think the nice thing about Delta Lakes and how Delta applies to them in general is you are empowered to hold on to the raw data and as much of it as you want.
And so, there are no technical limitations allowing you to keep all of the data. And as a result, many organizations that I work with do actually keep everything that they are legally allowed to keep for a very long time, and you only remove it when they have to get rid of it.
What do you write that logic in? Are we able to write logic in Scala?
Delta Lake plugs into all of the existing API’s of Apache Spark, and that means you can use any of those. So, if you’re a Scala programmer, you can use Scala. If you are a Java programmer, that works as well. We also have bindings in Python. And if you’re an analyst and you don’t want to program at all, we also support pure SQL.
So, really our idea here is the underlying engine is written in Scala and Delta is also written in Scala. But your logic can be written in whatever language you’re comfortable with. It’s another case where I think you need the right tool for the right job.
Personally, I do a lot of my stuff in Scala, but when I needed to make graphs, I switched over to Python. But still, Delta gives me the ability to filter through massive amounts of data, shrink it down to something that will fit into pandas, and then I do some graphing with it.
Is Presto part of Delta Lake or is it all only Spark?
That’s actually something that’s evolving pretty quickly right now. So, there’s a couple of different answers to this. So, I’ll tell you both where we’re at and where we’re going.
So, right now, there’s a feature inside of Databricks that we’re working on open sourcing, which allows you to have writers for Delta, write up these things called manifest files that allow you to query a Delta table in a consistent way from Presto or Athena knew any of these other Presto based systems.
However, we’re working deeply with Starburst, one of the companies behind Presto, to build a native connector for Presto. We’ve also got active interest from the Hive community, in the scalding community. So, there’s a bunch of interest in building connectors. So, today, the core of Delta builds in Spark, but I think the really powerful thing about open source and open standards is that anybody can integrate with it. And us as the project, we’re committed to growing that ecosystem and working with anybody.
So, if you’re a committer on one of those projects, please join our mailing list, join our Slack channel, check it out and let us know how we can help you build these additional connectors.
Can we experiment with Delta Lake in the community edition of Databricks?
Yes, you can. Delta Lake is available in community edition. Check it out. Everything should be there. Let us know what you think.
Can a Delta table be created with Hive?
Yeah. So, basically the same answer to Presto. There’s active interest in the community to build support. It’s not available today, but that’s definitely something that we’d like to build.
How does Delta Lake handle slowly changing dimensions going from raw to gold?
And there’s actually a blog post on databricks.com. If you Google, slowly changing dimensions Delta, it walks you through all of the details. But I think really, the right answer here is, with the merge operator and plus the powers or Spark, it’s actually pretty easy to build all of the different types of slowly changing dimensions.
And the magic thing that Delta is adding on top of Spark that enables this is those transactions. Modifying a table in place would be incredibly dangerous without transactions and Delta makes that possible. And therefore, it enables this type of use case.
We usually deal with Azure. We’d like to know whether Delta Lake has any different behavior when it’s running on Azure Event Hub instead of Kafka?
I’m going to answer this question a little bit more generally. So, I think I talked about one of the powerful things about Delta, being its integration with Spark. And one of the big reasons is I view Spark as the skinny waist of the big data ecosystem. There are Spark connectors for almost every big data system in the world.
And so, if Spark can read from it, it works with Delta Lake. And so, Event Hub, in particular, has both a native connector that plugs into Spark data sources, and also as a Kafka API that works with Sparks Kafka. So, you can very easily read from Event Hub and do all the stuff I talked about today using Event Hub instead of Kafka. And really, that applies to any system that Spark can read from.
And just in general, answer Azure a little bit more, Delta fully supported on Azure, including ADLS, we’ve just recently improved our support for ADLS Gen2. So, it’s available both for you to download. And it’s also part of Azure Databricks out of the box.
What exactly is the Scala API for DML commands, like update?
And the answer was, does it look like this Spark SQL. Spark SQL and you pass in a string that does that update. And the answer is, we’re actually going to support both. So, if you actually go to the GitHub repository, I believe this code has already been merged. So, you can see the Scala API. If not, there’s a design that talks about the details there on the ticket for adding updates.
But the idea here is there will both be a Scala function that’s called update you can use programmatically without having to do string interpolation. And there’s also a SQL way to do it so you’ll be able to create a SQL string and pass that in. So, again, this is like you use the language that you are most comfortable with and is already part of your toolkit, and Delta should work with that automatically.
Does Delta Lake work with HDFS?
Yes, it fully works with HDFS. HDFS has all of the primitives that we need, so you don’t need any extra details. And what I’m talking about here is HDFS has support for an atomic rename that fails if the destination already exists.
So, as long as you’re running a new enough version of HDFS, which is not even that new, that should work automatically. And if you check out the getting started guide, in the Delta docs at Delta.io, it has all different storage systems that we support and details for what you need to do to set that up.
Are update, delete a single row or record level?
And there’s two answers to this. So, yes, Delta does allow you to do fine-grained individual row updates. So, you don’t necessarily have to do your updates or deletes at the partition level. If you do them at the partition level, they’re significant. If you do like deletes, for example, at the partition level, those are significantly more efficient, because we can just drop the metadata. We don’t actually have to do any manually rewriting.
But if they’re not at the partition level, if you’re doing a fine-grained single row update or delete, what we’ll do is we’ll actually find the relevant parquet files, rewrite them, commit the adds and deletes to make that operation happen. And then, that’s the transaction that does it. So, it does support it, but it does involve rewriting individual files.
So, what I’ll say here is if Delta is definitely not designed to be an OLTP system, you should not use it if you have lots of individual row updates. But we do support that fine granularity use case.
Do you know exactly when the Scala APIs for Delta Lake will be available?
Well, so there’s a couple of answers to that. So, Delta Lake reading and writing and streaming and batch work already in Scala that is available today. If you’re talking specifically about update, delete, and merge, I believe most of that code has already been put into the repository. So, if you download it and build it yourself, it’s there. We are hoping to make the release in July. So, hopefully this month, there’ll be the next release that contains extra Scala APIs.
Let’s see. Yeah. So, the next question was about data quality. Can we have any other field for validation purposes apart from timestamp? Yes. So, the expectations that we talked about before are just general SQL expressions. So, any expectation that you can encode in SQL is allowed.
So, it could be in that example, it was a very simple comparison operation with some specific date. But it can be anything you want. It could even be a UDF that is checking the quality of the data. So, really, the important thing here is that we just allow you to put those in as properties of your dataflow rather than as manual validations that you need to remember to do on your own. So, that enforces that globally across anybody that is using the system.
Does Delta Lake support merging from a data frame instead of a temporary table?
Yes. So, once the Scala and Python APIs are available, then you can pass in a data frame. Today, inside of Databricks, the only thing that is available is SQL DML. And in that case, you do need to register it as a temporary table. But like I said, stay tuned for the end of the month. We will have a release of Scala APIs, and then you’ll be able to pass in a data frame yourself.
And I’ve seen this question a couple times, so I’ll just answer one more time. We support both ADLs Gen1 and Gen2, although Gen2 is going to be faster, because we have some extra optimizations there.
In the checkpointing example, is the Spark job computing the Delta Lake checkpoint internally required to be handwritten?
So, when you’re using streaming to read from or write to a Delta table or both, if you’re just using it in between two different Delta tables, the checkpointing is handled by structured streaming.
So, you don’t need to do any extra work to construct that checkpoint. That’s built into the engine. The way structured streaming works in Spark is every source and everything, there’s a contract that allows us to do that checkpointing automatically. So, the source needs to be able to say, I’m processing the data from here to here. And those notions of where they are in the stream, we call them offsets, those need to be serializable. And we just store those in the checkpoint. We basically use the checkpoint as a right ahead log.
So, we say batch number 10 is going to be this data. Then we attempt to process batch number 10. Then we write it to the sync. And the guarantee here is the sync must be item potent. So, it must only accept batch number 10 once. And if we tried to write it twice due to a failure, it must reject that and just skip over it. And by putting all of these constraints together, you actually get exactly once processing with automatic checkpointing without you needing to do any extra work.
Why not use polyglot persistence and use an RDBMS for storing acid transactions?
We actually tried this. In fact, one of the early versions of Delta uses MySQL. And the problem here is that MySQL is a single machine. And so, just getting the list of files out for a large table can actually become the bottleneck. Whereas when you store this metadata in a form that Spark itself can natively process, you can leverage Spark to do that processing.
So, there’s nothing stopping you from implementing the Delta transaction protocol on top of a storage system. In fact, there’s a pretty long conversation on the GitHub repository right now that’s going back and forth about what it would take to build a foundation DB version of Delta. And yeah, that’s certainly possible. But in our initial scalability testing, we found that Spark was the fastest way to do this, at least out of the systems we tested that’s why we decided to do it that way.
Does that mean we don’t need data frames and can do all transformations on Delta Lake instead?
And I would say no. Well, I think you can only update, delete, and merge without using any actual data frame code you can use per SQL. But really, I think this is the right tool for the right job.
Delta Lake does integrate deeply with Spark data frames. And personally, I find that to be a very powerful tool for doing transformations. It’s like SQL++ because you have all these relational concepts, but embedded in a full programming language. And that actually I think can be a very productive way to write your data pipeline.
How does Delta Lake manage newer versions of Spark?
Delta Lake requires Spark 2.4.3, which is a pretty recent release. And that’s because there were actually bugs in earlier versions of Spark that prevented data sources from correctly plugging into it. But in general, we’re working on Spark compatibility. That’s actually one of our core projects for this quarter is making sure that everything in Delta plugs into nice public stable APIs of Spark that we can work with multiple versions in the future.
Does Delta Lake support ORC?
Again, there’s a discussion on GitHub about adding the support. So, I encourage you to go check that out and vote on that issue if this is something that is important to you.
And there’s two answers to this. One is the Delta Lake transaction protocol. I think it actually goes in the transaction log and actually does support specifying the format of the data that is stored. So, it actually can be used for any different file formats, txt, JSON, CSV, that is built into the protocol already. Today, we do not expose that as a choice when you’re creating a Delta table, we only do parquet.
And the reason for that is pretty simple. I just think less tuning knobs is generally better. But for something like RFC, if there’s a good reason why your organization can switch, I think that support would be really, really easy to add. That’s something that we’re discussing in the community. So, please go over to GitHub, find that issue and fill it in.
What is the difference between the Delta Lake that’s included with Databricks versus the open source version?
And that’s a question I get a lot. And I think the way to think about this is I’d like to talk about what my philosophy is behind open source. And that is that I think APIs in general need to be open.
So, any program you can run correctly inside of Databricks should also work in open source. Now, that’s not entirely true today, because the open source version of Delta Lake is only two months old. And so, what we’re doing is we are working hard to open source all of the different APIs that exist. So, update, delete, merge history, all those kinds of things that you can do inside of the Databricks will also be available in the open source version.
Manage Delta Lake is the version that we provide. It’s going to be easier to set up. It’s going to integrate with all of the other pieces of Databricks. So, we do caching. We have a significantly faster version of Spark. And so, that runs much faster. But in terms of capabilities, our goal is for there to be complete feature parity here, because we’re committed to making this open source project successful. I think open APIs is the correct way to do that.
So, with that, I think we’ll end it. Thank you very much for joining me today.
Here’s more to explore
Free trial
Discover how Databricks’ open and unified data analytics platform performs large-scale data processing for batch and streaming workloads, simplifies and accelerates data science on large data sets, and standardizes the entire ML lifecycle.
Get Started