How Databricks Leverages Auto Loader to Ingest Millions of Files an Hour

May 27, 2021 12:10 PM (PT)

Continuously and incrementally ingesting data as it arrives in cloud storage has become a common workflow in our customers’ ETL pipelines. However, managing this workflow is rife with challenges, such as scalable and efficient file discovery, schema inference and evolution, and fault tolerance with exactly-once guarantees. Auto Loader is a new Structured Streaming source in Databricks as our all-in-one solution to tackle these challenges.


In this talk, we’ll discuss how Auto Loader:

  • Can discover files efficiently through file notifications or incremental file listing
  • Can scale to handling billions of files as metadata and still provide exactly once processing guarantees
  • Can infer the schema of data and detect schema drift over time
  • Can evolve the schema of the data being processed
  • Is used within Databricks to ingest millions of files that are being uploaded every hour efficiently
In this session watch:
Pranav Anand, Software Engineer, Databricks



Pranav: Hi, everyone, I’m Pranav, and today we’re going to talk about how Databricks and a whole bunch of its customers are using Auto Loader, a new structured streaming source to ingest data at petabyte scale. So the first step in any sort of data pipeline, which we can see here from a 10,000-foot view data ingestion. And here what we can see is what we want to do is take these input files which can contain unstructured or semi-structured data and put them into cloud storage. So let’s say an S3 bucket, like our logs bucket here, and finally processed that data into some final form. So this is where data ingestion comes in.
A lot of our customers and what we ourselves at Databricks use is structured streaming for this purpose. It allows us to take data as it’s coming in to get the freshest data and process it right away. And structured streaming has many facets to it that enables easy data ingestion. For example, it helps with restarting from failure and doing complex transforms, but there are actually still a lot of challenges associated with data ingestion. Let’s look at what some of those challenges are here. So let’s take the file stream source, which is an existing structured streaming source, and how it could fit into this kind of data pipeline we’ve been talking about. So let’s take a look at what it looks like with the file stream source. So here we have that data pipeline using the file stream source to list files at an input directory.
And what we do in structured streaming is we have this concept of triggers. So on each trigger, we would look at what files exist at the source and which to ingest into our final form. And those triggers can be set by the user at arbitrary granularity. So you can have trigger ones to imitate something more a batch workflow to anything else that you want. So let’s take a periodic trigger as in this example. So at our first trigger, what we would do is we would list all the files that exist in our log directory.
And we would get back say A and B.JSON and we were transformed that data to our final source. The issue occurs on our next trigger. So at T equals five, we list the whole directory again, and what happens is though we do find this new file, which wasn’t there before C.JSON, we are relisting the files that we’ve already ingested, so A and B.JSON, you can see how this can become a problem. Once we have a directory with millions of files, relisting that whole directory, just to find a couple of new files would be extremely slow and extremely expensive.
And what I also want to draw your attention to here is we have this data structure to a D duplicate files that we’ve seen so far so that we don’t have duplicate data in our final source. So here, even though we’re, relisting A, B, and C.JSON, or relisting A and B.JSON specifically, we wouldn’t re-ingest them because we’ve included them in the same file paths map. But the issue is that the data structure here that we’re using is an in-memory map and an in-memory map does not scale, especially when it’s in the driver. And once your stream reaches a certain load, you might have to increase the size of the driver. And it just remains a headache.
Let’s look at another challenge that often plagues data ingestion and that challenge is schema. So the very first step where this kind of challenge creeps in is setting the schema in the first place, in the file stream source. You have to go in manually infer the schema of the files that exist in your logs directory. So you would take, say that A and B.JSON, and you would look in and find that they both have an ID and a name. And so you would think that I’ll set the schema manually for my stream to be a ID as an end-to-end name, as a string. And you say, now I’m ready to go. I’m ready to get my data ingestion, kick it off. But just shortly after what you end up getting is new data potentially from some change upstream and your input files that isn’t of the form that you expected.
And this is extremely common. So what we have here is F.JSON, which contains a new column age that we weren’t expecting when we manually set our schema. And what would happen normally using the file stream source is that you would just have data loss. There’s no real good way to deal with this. You could build other pipelines to kind of look at what data has arrived and see if you’re missing anything and deal with it at a later point. But this is ideally not what you want to do. And this isn’t even a one-time issue. This is something that can happen constantly as your input data sources evolve.
So this is where Auto Loader comes in. We took a look at all of these challenges that we’ve been seeing and built this new structured streaming source, which solves some of these biggest challenges, mainly scalability and schema management. So let’s take a look at how it does that. So firstly, we have this new, a mode called foul notification mode. And to explain it, I need to introduce a couple of new concepts. So one is foul notifications that are generated by a cloud storage providers. So when a new file comes in, you can set up cloud storage providers, such as AWS with S3 to generate a notification for a file as it’s coming in.
So in this case, as we’re getting our input files each time a new input file comes in, it generates a notification. And the other concept we have is cloud cues. Cloud cues can, are also provided by the cloud providers, which can store these notifications, something like this. So in this case, we have A.JSON coming in and as it lands into our S3 bucket for logs, we get a file notification that’s generated for that file. And we can put that notification into a cloud queue, which is going to be large and distributed and a relatively fail-safe.
And you can see where I’m going with this, where we can then use this cloud queue to pull whatever information or whatever files have arrived into structured streaming to finally process. And this is what Auto Loader does, and it’s essentially consuming from this queue. And once the data has been processed, we’re deleting those fall events from the queue, and this completely oblivious and need for listing. So files are ingested as they arrive, which reduces the latency between file arriving to being it being shown in your structured data as well. And the other issue we had earlier, where we had a map to see what files had already arrived and led to a whole bunch of issues with the drivers sizes, just to keep enough memory to contain this map, instead of that we’re using RocksDB, which is a key-value store and is used for point look-ups but is also scalable indefinitely.
So with these concepts of file notification modes, as well as RocksDB for file duplication, we aren’t really hitting any scalability limits anymore, but you might be wondering what if I don’t want to ingest just the new files as they arrive? What about my existing files that already are in my logs directory, for example, and we’ve dealt with that here. And this is kind of the philosophy of Auto Loader as a whole, where we try to do the best thing by default, but always provide the knobs for you to adjust all the loader to your workload. So here we have what we call backfill. So A.JSON Is a new file. That’s arriving into your bucket, but you already have this capital A.JSON in your cloud.
And what we can do is include those existing files by listing them and ingest from that cloud queue we talked about before and put both of those sets of files into this internal queue from which we’ll pull into RocksDB and so on into your structured table. So, and this is all done super simply. This is just a couple of options added and just a few lines of code to essentially implement this whole thing from RocksDB to the cloud queues, to the event notification. So this is an incredibly simple way to implement it and essentially infinitely scalable data ingestion workflow.
Let’s talk about another challenge that we saw earlier schema. So the first thing was that we had this kind of annoying step of having to manually and for our schema before we can even get the stream started, I want to start my stream right away. So Auto Loader allows you to automatically infer the schema of the files that are there through some sample of the files in your bucket. So here we’re going to automatically infer that ID is an end-to-end name as a string. And now we’re ready to go, this is where we got stuck last time we got this new column and we didn’t really know how to handle it exactly.
So what we do is you can use Auto Loader with delta to handle it in the best way. And what the best way really is, at least in, in some theoretical example, is that we want to ingest all our data without losing any of it to some location. Let’s say like a bronze table. It’s not, it contains all the data in some structured way, but it’s not necessarily what we’re going to be using for a BI or ML use cases. This is just a store of information that you can use for future tables to pull from and further refine.
So that’s why we have this schema evolution. So what we want and what we can do with a Delta and Auto Loader is once we see this new F.JSON, We can fail the stream since we don’t recognize this new age column and on stream restart, what will happen is that our schema will evolve to contain this new column. Delta will realize that there’s a new column and add that age column to its schema. And this is done basically automatically if you allow, for example, Databricks jobs to restart your stream automatically, this can continue to happen. As your input files evolves their own schema. Your final structured tables are also evolving with them with almost like no manual intervention. And again, this is done super simply you just set one option and this whole thing is done for you. So here I talked about one specific schema evolution mode called add new columns where I said, adding any columns that we didn’t recognize initially, but we have other schema evolution boats as well.
So we have rescue data as well. So rescue data is similar to add new columns in the sense that it handles how to deal with changes upstream in your input files that you don’t really anticipate, but this deals with a whole set of other issues. For example, data tightness matches, or if you have case sensitivity issues, all of that data would be quarantined into a new column called rescue data. So here in this example, when F.JSON comes in, we would contain, we would have the record for ID 23 and name Alex. And then the age 31 would be contained in a blob in a rescue data. And this gives you the flexibility of not saying maybe I’m too suspicious of evolving my schema all the time. And instead I want to just keep that data. So I there’s no data loss, but it gives me the flexibility to deal with it.
How I want, maybe I don’t want to evolve my schema. Maybe this is garbage data, and I want to throw it out, or maybe I do want to evolve, but only for a some sets of columns, which you can do later on some of the other schema evolution modes we have our fail on new columns, which as you expect, we just fail the stream and don’t evolve the schema, but we do still capture data type mismatches, et cetera, and none where the file streams source is kind of similar in that we would have data loss here. So if you want to use Auto Loader, as you’ve been using your existing streams, this schema evolution mode is for you while still giving you Auto Loaders scalability. Let’s take a look at the demo and see all of this in practice.
So here we have an Auto Loader stream. That’s reading from S3 and writing to Delta. And here we see some of the options in use that we’ve talked about. So one is the schema evolution mode where we’re adding new columns. So what this means is that as new columns that we didn’t anticipate arrive, we will add those to the schema of the Delta table at the end, once their adjusted. And we’re also using this file notification mode. So the file notification mode, this sets up the whole S3 event notifications, as well as the cloud queue that takes those notification and puts them into queue from which Auto Loader will read the notifications in order to ingest the files instead of listing the directory each time to find new files. And we also have this interesting option called schema hints, which we’ll come back to later, let’s see the stream running.
So it’s running and it had processed some files in the past. Let’s just add some new data that’s of the form that we’ve already seen. And what’s, this is going to do is add a new file test three, it’s going to generate a new notification, which gets picked up by a cloud queue, which then gets picked up by a Auto Loader put into RocksDB and ingested. And as we can see, this is working as intended, and let’s take a look at what the data looks like. Here we can see just data that looks pretty healthy so far. There doesn’t seem to be anything wrong with it. Let’s look at the schema in particular and here we can see that all of these columns are actually strings.
And this is something that Auto Loader tries to do is to make things very permissive and very non-error-prone in the initial version, but allowing you, or providing you knobs to give you the behavior that you want, if you know what you’re going for. So in this case, everything is inferred as a string by default. And so we don’t get into any issues with inference where some columns may be inferred as longs versus doubles and so forth. So we also have this column called rescue data, which we talked about, and `it captures all the data type issues that can occur, where if something is inferred as a long, for example, and then a long comes a float, that data would be essentially quarantined into this rescue data column.
Let’s take a look at what happens when new data arrives, that isn’t of the form that we expected specifically in that there’s a new field that we didn’t anticipate. So we added that new file. Let’s go up to our stream. And our stream has stopped with an exception for an unknown field exception, where we saw this column metadata, which wasn’t in our initial schema, as we see here. And normally, if we were using the file stream source, for example, our stream would have continued without any knowledge of this and we could have potentially lost out, or we would have lost out on that data if there weren’t other mechanisms in place to catch this. So here, all we have to do is restart the stream.
And because we set our schema evolution mode as add new columns, what this is going to do is take this metadata column and add it to the scheme of the Delta table we were writing to. And if we list the latest data, we can see that there’s a new metadata column, which has the string column containing the new data that was in the file that we just added. And that’s how simple it really is. But let’s take a look at some other examples. So here we had inferred everything as a string. So let’s take a look at what happens in this case if timestamp is, there’s some data with a timestamp, that’s not of the correct format. So adding a new data and let’s take a look at what the latest data looks like after inserting it’s this kind of format, which we don’t really like.
If you recall what our timestamps looked like earlier, they used to be this kind of nice format. This is kind of more scientific notation, which we don’t really want, but since we were using strings, they were determined. There was no nothing to catch this from happening. This is where schema hints, which I mentioned at the very beginning, come in, let’s reset our streams and see how schema hints can help us out here. What schema hints are are for these kinds of cases where you are expecting data of some format specifically. So let’s say, I know that signal strength is a float. Timestamp is along and metadata is a map of string to string. I can force my stream to, or force my schema rather to have these columns and for any data that’s coming in to be cast to these types. So let’s restart the stream and we will be able to now see that our table schema has changed to reflect this. So we have our signal strength as a float and timestamp as a long or begin to and metadata as a map of string to string.
Now let’s go back and add more data of this weird timestamp value, right? And let’s take a look at… Here we go, so all this new data that is of this strange format gets put into the rescue data. Since there was a data-type mismatch between what we explicitly said we want and what we actually got. And this is really powerful because what it lets us do is really understand what’s happening upstream in our input files, and also gives us the flexibility to deal with it, how we want the, later on, if we deemed this to be just garbage data entirely, we can discard the whole row or what we can do is figure out some kind of logic to insert these timestamps in for our later tables. That might be more refine, let’s see, take a look at also the metadata column. Now that we’ve added the hint for a metadata to be a map of strength to string, rather than just a string in itself.
And taking a look now we can see that all of those string columns or string data for metadata are now this nice map that we can go into and manipulate much more easily. And all of this was done just through a few lines of code. And that’s kind of the idea behind the simplicity of Auto Loader. So in practice, Databricks has been using it for our internal logs pipeline to ingest 50 plus terabytes of data per day. And dozens of our customers are using it to ingest tens of petabytes so far. And hopefully, you’re going to be one too. We appreciate all feedback and thank you.

Pranav Anand

Pranav Anand is a Software Engineer at Databricks. He has been developing Auto Loader and Delta Lake to simplify the lives of Data Engineers. Pranav received his BS in Computer Science at University o...
Read more