Change Data Feed is a new feature of Delta Lake on Databricks that is available as a public preview since DBR 8.2. This feature enables a new class of ETL workloads such as incremental table/view maintenance and change auditing that were not possible before. In short, users will now be able to query row level changes across different versions of a Delta table.
In this talk we will dive into how Change Data Feed works under the hood and how to use it with existing ETL jobs to make them more efficient and also go over some new workloads it can enable.
Itai Weiss: Hello and welcome to the Delta, Change Data Feed talk in Data AI Summit 2021. In today’s talk, we’re going to cover what is a data capture challenges in the Lakehouse? How Change Data Feed in the Lakehouse is working? How do we capture the changes? How do we process them? And Rahul will show you a notebook how this works.
Jose Torres: Right. Hello everyone. I’m Jose. I’ve been a Software Engineer at Databricks for a couple of years now. I’m a committer for two open source projects, Delta Lake and Apache Spark. And yeah, I’m a professional database enthusiast and amateur cooking enthusiast.
Rahul Mahadev: Hello everyone. I’m Rahul Mahadev, I’m a software engineer at Databricks. I’ve been a Delta Lake committer for the past two years and I got my masters in computer science from the University of Illinois.
Itai Weiss: And my name is, Itai Weiss. I’m a Lead Solutions Architect in Databricks. I’ve been working with Apache Spark since version 1.6. And I consulted numerous firms across financials, insurance, tech, pharma and many more. So, the Data Change Feed is part of the Delta Lake open source. The Delta open source provide you an open source data format, is reliable based on [inaudible] to protect that whenever you commit the data, you’ll be able to read exactly the same data. It improves the data warehouse performance on the Delta Lake, has advanced security features and of course, an open-source.
In this talk, we’re going to first look at what are the challenges we face today, when we’re trying to look at changes in the data across Delta verdicts. So, today in the big data domain, we have a lot of data. Changes are coming in frequently. Sometimes a lot of them at once, sometimes only a few of them. But in many, many cases, we just want to process the newest changes. What has been changed, since the last time we processed the data? It was doable to do it based on Delta [Eyetravel], we could query the current version and the [inaudible] version do it full out and join, find out what has been changed. In other words, what has been updated, or what has been deleted. In part, we can run the inserts and do all of these in Spark. But this was complicated. So, we would like to offer you a new one.
How do we solve it? So, Jose would show you soon how this works under the hood, but really what we give you now, is a very simple option to read only the change data from a specific version, or a specific timestamp, to a version, or a timestamp, which are optionals. You will avoid full table scans. You don’t have to worry about the logic. You know that every time you do it, you always get all the changes. And you would, of course, reduce compute and memory. And you can use it to improve your ETL pipelines. You can use it in batch and streaming. You can use it from your BI, or dashboards and you can use it to make regulatory needs, to identify exactly what has been changed and when.
Delta Change Data Feed, works on Delta and only on Delta. So, if you have other change in your capture tools, is known as CDC. These will capture other external data sources outside of the Lakehouse. But really, you can use, Change Data Feed as we call it, CDF, between your bronze, silver layer and gold… Sorry, bronze, silver and gold layers and identify these changes and propagate them forward, or to your back stream application.
How does it work? We’re starting with original table. We now have three records, A1 primary key, A2 primary key and A3 primary key. We’re adding a change data block. A2 is updated, B2 is now been changed to C2, A3 has been deleted and B4 is being inserted. The in-state table, will show you A1, A2 updated and A4. A3 will be deleted, but now you can get a new data frame, which is based on the Change Data Feed output. In this case, we get four records. A2 has a pre-image, column B has a B2, the old value. A2, as well has another record for post-image. You can see that column B has now the new value and C2. Both of them would have the time and the Delta version. A3 has been deleted. You will get the full record with all the columns, with the change type [inaudible] the time [inaudible]. And B4 would have changed. I’ve insert, with again, the four record columns, the change time and the version. As you can only say, one does not [inaudible].
How do you consume it? So, you consume it both in-stream, or in-batch. In the upper there, we can look at the streaming consumption. On the left at 12:00 PM, we have the same changeable [inaudible] and we’re just consuming it, one after the other. On [inaudible], we’re getting a record 8, 5 , which is an insert. And we’ll just pull it and append it to the next layer, as we always do. At 12, 10, 10, this is where complexity and more interesting solution it’s done. We got a new record A6, but immediately after, we got an update. So, as you can see in the stream, we have three records. One of them is from version four and the last two are from version five. In this case, our same solution, we’ll have to find what is the latest commit version and process only the last one. Because this is what we’re really interested in and will avoid duplication downstream.
The batch consumption is even easier. At 12:00, we just process the four records, as we talked about before. 12:10 will just get two new inserts, we’ll push them and 12:20, we get an update. We can process an update. Update the record downstream and complete our [inaudible]. And now Jose is going to explain to us how this magic works under the hood.
Jose Torres: All right. So, yeah. Under the hood for Delta, in general, changes are stored as ordered as atomic units called, commits. And you can see an example of this. On the side of the slide here. Each commit is represented by a number of JSON file containing what are called, add actions and remove actions. And these represent tables where… These present files, where the rows within these files are added to, or moved from the table in that version. With Change Data Feed, in addition to those files, which represent the current state of the table, we also have changed data feed files. And the contents of these files are the rows contain the change events within that table.
So, there’s a couple of different uses that Change Data Feed could be applied to. It’s useful within our bronze, silver, gold architecture, you can improve the performance of Delta tables, by processing only the data changes between your initial bronze ingestion layer and your gold business layer. You can use it for materialized views to create up-to-date view, based on the changes as they come in, without having to rewrite the entire contents every time. You can transmit changes to downstream systems effectively. Again, without having to output the full state of the Delta table every time. And you can use it for audit trail cases. Both in terms of seeing what has changed in this particular version and the end in terms of, if you have some particular row, how has that evolved over time? What changes have people made to it? And when did they make those changes?
As with any software system, there’s a lot of great things about Change Data Feed, but there are trade-offs involved. So, we do have to talk about where it is and isn’t on appropriate thing to use. For use cases that include a lot of updates and deletes, where a small fraction of the records in the table are updated within each batch, Change Data Feed is a great choice for that scenario. Because it greatly reduces the amount of work you have to do, to read the changes from the table. At the cost of a minor increase in writing out these this change data. On the other hand, for a workloads where the changes are primarily append only, or when most of the records in a table are being updated in each batch, Change Data Feed can start to get more expensive to produce and to read. So, in those cases, it may not be the appropriate tool.
Similarly, if you have data you’re receiving from external sources in a CDC format, Change Data Feed is often a natural way to express that, because it kind of maintains the same structure of the events. Whereas if you’re receiving data in the form of destructive loads, which is a term that refers to, when you get a new batch of data and it’s meant to completely replace what was there before, in that scenario, the Change Data Feed’s kind of not a natural way to express such a workload. And you would typically want to use a normal batch scan in that scenario.
So, how do you start using Change Data Feed? The first step is to enable it for your table. And there’s a table level configuration. Table properties, is the term for it within Spark SQL, where you can turn it on and then after you turn it on, your table will start producing the Change Data Feed that you can scan from. There’s also a Spark cluster level configuration that you can use to turn on Change Data Feed for all Delta tables, that are created within your cluster.
And then, once you’ve initialized it, once you’ve configured the table to produce a Change Data Feed, you can query it using this table changes function, that allows you to see the changes in the feed, either in a particular version range, or within a particular range of times. And then once you do that, the result of the table of changes method is just a normal Spark SQL data frame. So, you can transform it, you can split it, you can display it, or you can store it into another table for use in later downstream processing.
And yeah, here you can see a short example of how the table changes function can be used within the notebook. Yeah, you can see that the user here has queried the table changes for this silver table, from versions 2 to version 4, ordered by the timestamp of the commit. And then the result of that query is just a data frame, showing the row data for each of the change events. And then the metadata indicating what type of change event is this and when did it happen? What commit version and when in wall clock time, did it happen?
All right. So yeah, next up we have Rahul, who will be showing us a practical demo of how Changed Data Feed can be used within Databricks notebooks.
Rahul Mahadev: In this demo, let’s explore the Change Data Feed feature of Delta Lake and see how it can be used to keep derived tables in sync. In this example, we have two tables. So, let’s call them silver and gold. The silver table tracks the number of people vaccinated and available doses for every country. And gold table tracks the vaccination rate for the country. And first, let’s start off by creating these tables, silver and gold. As you can see here, we have the country along with the number of people vaccinated and how many doses. So, this is our silver table. Let’s quickly create a gold table.
So, we have the same rows in the same number of rows in the gold table, but we have a different column called vaccination rate, which is derived from the columns in the silver table. Now, to propagate any future updates, deletes and merges on the silver table… The gold table, let’s enable Change Data Feed on the table. It’s as simple as using order data command for enabling Change Data Feed. So, let’s just run it. And now any future updates and merges should generate a Change Data Feed on the similar table. So, let’s try this out by running an update on March on the silver table.
So, here we have an update where we’re just setting, for one country, that is setting the number vaccinated to a new value. And we also ran an absurd, where the new data frame contained two new rows, as well as an existing row with updated values. So, this completed, so let’s see how the silver table looks like now. So, we went from five rows to seven rows, so two rows were added and the two other rows were updated. And the first step of propagating these changes is actually to read the Change Data Feed. And to do that, it’s as simple as using the function called table changes. And as a barometer to this function, we pass in the table name, along with the starting and ending commit versions, between which we want the Change Data Feed. So here, we’re going to pass the table name, silver table and we’re going to pass in the starting and ending motion.
So, we did two updates from version two. So, let’s see if that works. So, as you can see here, we get the existing data rows for the table, along with some additional metadata columns as well, which indicates what type of change it was. So, for the pure inserts, we have the change type column indicating that it was an insert. But for updates, we have two rows, the pre-image and the post image. The pre-image containing the older values and post-image containing the new values. Now, let’s use the change data, to actually update the gold table. So, we are going to be using the merge command to do this. And what’s different here is, for the source table of the merge, we will be using the Change Data Feed. So, we use the table changes and we filter out the pre-image for this use case, because we only care again the final actual value of the row, rather than what do I use it all of it for. And the clauses again, when the rows match, we update the column vaccination rate and when the rows match, we actually insert a new column.
Let me quickly run the merge command and see if that actually updated the gold table. As you can see, we have two new rows for the gold table and updated values, as well for the other row. So, we saw here, how in a batch mode, how we can use table changes to actually keep two tables in sync. Now, to do this in a more automated manner, let’s say we wanted to set up a screen from the silver table and ingest all the incremental changes into the gold table, that is possible as well. And let’s actually run a stream and see how that can be done.
So, here we have the first step where we actually lead the Change Data Feed from the silver table. What’s different here is, you just need to provide additional parameter called re-change feed to the streaming query. And here we… Let’s provide the starting version as latest, because we already ingested some changes using the batch APS, so. The future changes is what we need to ingest, so let’s provide the starting [inaudible]. And again, let’s filter out the change type, because we don’t really care about the PMH here, as well. And the second step here, is to define a method that can be used for each batch. In this method, it’s similar to what we did earlier with doing a merge for each batch. And similarly, we have… Like when we see matched rows, we update the existing rows in the gold table. And when we don’t see matched rows, we insert new values into the gold table.
Now, let’s actually run the stream to write out these changes into the gold table. And I’ve provided that for each batch here and here, I’m going to give a processing time of two seconds and start the stream. So, the stream has been started. And now, let us do updates on the silver table. So, similar to what we had before, we have an absurd. Let’s run it. As you can see, this seems to have been processed by the stream. Let’s actually see if the silver table and gold table actually changes. So, now the silver table has nine rows, we have the updated rows as well. And let’s see if these actually made it into the gold table. As you can see, the gold table is updated by our stream as well. And in this tutorial we saw how to use bad JPI’s and streaming API’s for the Changed Data Feed feature of Databricks. Thank you.
Itai Weiss: Thanks, Rahul. Really good demo. Now, we all know how to use Change Data Feed in SQL and Python and process changes downstream. Please fill up your feedback and let us know how was the session, what you enjoyed and what we can do better, so we’d be able to serve you better next year. Thank you for joining this session. Good Bye.
Rahul Mahadev is a Software Engineer at Databricks. He is a developer focusing on building Delta Lake and Structured Streaming. Rahul received his MS in Computer Science from the University of Illinoi...
Information Solution Architect with over 20 years of experience. Extensive background in Data Management, Big Data, Information Systems, Data Governance as well as process and project management. Impl...
Jose is a software engineer working on the Spark execution engine and Delta Lake. He holds a bachelor’s degree in computer science from Caltech.