Accelerating Data Ingestion with Databricks Autoloader

May 27, 2021 11:35 AM (PT)

Download Slides

Tracking which incoming files have been processed has always required thought and design when implementing an ETL framework. The Autoloader feature of Databricks looks to simplify this, taking away the pain of file watching and queue management. However, there can also be a lot of nuance and complexity in setting up Autoloader and managing the process of ingesting data using it. After implementing an automated data loading process in a major US CPMG, Simon has some lessons to share from the experience.

This session will run through the initial setup and configuration of Autoloader in a Microsoft Azure environment, looking at the components used and what is created behind the scenes. We’ll then look at some of the limitations of the feature, before walking through the process of overcoming these limitations. We will build out a practical example that tackles evolving schemas, applying transformations to your stream, extracting telemetry from the process and finally, how to merge the incoming data into a Delta table.

After this session you will be better equipped to use Autoloader in a data ingestion platform, simplifying your production workloads and accelerating the time to realise value in your data!

In this session watch:
Simon Whiteley, Director of Engineering, Advancing Analytics



Simon Whiteley: Hello, and welcome to Accelerating Data Ingestion with Databricks Autoloader. So my name is Simon Whiteley, and I’m a director of engineering. What that actually means is I build a lot of data lakes. I spent a lot of time with my clients figuring out how I get all their various bits of data that’s coming in at different times every minute, every hour, every day, how I figure out what data they’ve read and what they haven’t read, and how I get that into a lake. Data ingestion is fairly complicated, it’s a bit of a challenge.
Now, Autoloader is a fairly new feature in Databricks, that allows us to simplify some of that process. But there’s some gotchas, there’s some things you need to know. There’s a bit of config you need to understand before you tackle it. So I’m going to take you through some of that in this session.
We’ll be looking at why incremental loading is hard, the various patterns we’ve all been using for years, and some of the limitations to them. We’ll have a look at the components that go into Autoloader, and then some of the patterns that we actually use in production that I’ve used with some major US manufacturing companies to actually implement this in real life. Then we’ll have a look at evolution. So what happens if my data changes? And that is one of the most painful parts of doing any kind of data ingestion is managing schema drift, and schema evolution. And then I’ll share some life lessons, some config things that we missed first time around, that is important to know.
Okay, so let’s first have a look at why incremental is hard. And there’s a few different patterns that people actually use to try and get over this. And they’re all trying to solve the same problems.
So firstly, we’ve got this scenario. We’ve got a couple of CSV files, and I’m loading it into a Delta table. So I’m getting into my lake via a load process. And that’s nice and easy, I can just do it in batch and suck all my data up. But the challenge comes from the next CSV comes along. And I have to then kick off the next process. And how do I then work out what I have read and haven’t read? And there are some golden rules that we need to follow for whatever pattern of ingestion we’re doing to actually make sure we’re doing it properly.
So a couple of golden rules for what we need our ingestion engine to do. So we need to one, only read new files. I don’t want to have to go back and read the files each time, if I’ve got a growing and growing pile of files, I don’t want it to get slower and slower as it rereads and rereads all my data. I don’t want to miss files, that’s the worst thing in the world. If a file comes in, and my processor doesn’t notice it, and then I don’t see those records and my fingers and my reporting is then all wrong. I need it to trigger immediately.
Now, that’s not always a golden rule. We don’t always have to have that immediate, snappy thing, but it’s good to have the ability. If a file comes in and just have that automatically get processed, and just automatically stream into my lake, that is fantastic. So a lot of ingestion engines are trying to do that. So we’re going to use that as one of our patterns. Now speaking of patterns, it needs to be reusable. So I don’t want to have to build a new thing each time there’s a new data set. I’ve got a new table, my vendors added their new table to their database, I’ve got a new source that might be interesting.
I just want to run my same script with some different parameters. I don’t have to build something new each time I get new datasets, and I need it to not slow down. So a lot of existing approaches, they read out of the entire list of files. They might not read the data, but that growing list of files becomes trickier and trickier to manage. So they’re the problems that I’m normally facing.
So there are three different ways that I’ve built this in the past, I want to talk through just to say, “Here’s some ways that you can build ingestion engines.” Now, number one that we see all the time is the ETL Metadata driven place. And that’s essentially, I’ve got my list of files, I’ve got some files that are currently in that Blob store. And I’m saying, “I need to keep track of the last one I read.” So I’m manually managing that, and keeping track of the data to say, “I’ve read that file, I haven’t read that one. I’ve read that one, I haven’t read that one.” And then I can just pass that directly into my DataFrame and say, “Go and load that data, load that piece, go and do that specific thing.” That’s great. And that’s nice and repeatable. And it’s all parameter driven. But it’s not that reactive, I have to pull it. I have to trigger that. I have to manage where that Metadata lives, so that has its own challenges, but that’s the most common thing you see all over the place.
Number two, I can just use File Streaming to do this. So Spark can just read a directory of files. And it’ll go and read in anything it needs to. They’ll say, “Here’s my list of files on one side, this is what I have read.” And it’ll keep track of the files it’s read so far. Now the challenge comes, in order to do this each micro-batch. So each time it finishes writing and it starts a new batch, it’ll read the entire directory structure again, compare that to its entire history of files its read, and then work out what’s missing and then load that file.
And that’s challenging. When we talk about hundreds of thousands of files, this long-term landing Blob of data coming in, and then really starts to slow down when we’re doing directory listing, and this grows and grows and grows. So it has its existing challenges that streaming approach.
Now number three, we can build something. We can use things like Azure or AWS. We can essentially extend our own architecture. So we get to have on Azure land, a Blob can have a file trigger, so I can have it so it pumps out a message that triggers a Logic App. That Logic App designs what kind of files should be triggered, call the function, calls the Databricks job API, passes a parameter. And that’s fine, I can build that. But then I get to the scenario of, “What about with the next data set? What about the next time we need to spin this up? Am I having to constantly deploy new Logic Apps, new Azure functions, new Azure infrastructure, each time I want to track a new file?”
So essentially, we have of our three different approaches, we’ve got the Metadata. It is nice and repeatable, it’s fairly simple. But I have to pull it, I have to figure out how I do it. I need to build that manually. That’s not out of the box. With File Streaming, sure, it’s repeatable, it’s parameterizable and it’s immediate. So it can have it, so it’s constantly sitting there streaming, as soon as a file lands, that’ll stream through. But it gets slower and slower as my directories get bigger. It has some limitations in terms of having to stream. It’s not very good if my data changes.
So there are limitations in the existing File Streaming approach. And I can build my DIY, stitch some things together. And it’s immediate, and it triggers as soon as a file lands, but it’s not that repeatable. So there’s challenges all across the different ways we work currently.
Let’s talk about Autoloader. Autoloader is specifically to try and resolve this challenge. That set of, we’ve got some approaches, but none of them are perfect. That’s what we’re trying to fix.
So when Databricks came out with Autoloader, this is how they described it. “It’s an optimized CloudFile source for Spark. It loads data continuously and efficiently.” Make sense? That’s kind of what we’re after. So that’s from Prakash, on the regional announcement Blob. But when we actually take a look and compare it to those approaches, this is what it’s doing. So Autoloader is storing Metadata about the files that we’ve read so far. So it’s doing our Metadata driven thing, except it’s doing it for us. It’s using Spark Structured Streaming. So it can do that immediate polling, it can do the reactive. As soon as a file lands, it’ll start processing it. But it’s also using Cloud-Native components. It’s using things like EventGrid. It’s using a message handler. It’s doing the file watch.
So essentially, Autoloader is taking the lineage of those three different ways that we have historically worked, and said, “Do you know what? We’re going to take the best of them, bring it together into one out of the box solution.” That’s the whole point of Autoloader. It’s working the way we normally work, except it’s done for us, which makes a lot of sense.
So let’s dig into what’s actually inside Autoloader, how it actually works. We’ve got two main parts. So there’s the CloudFiles DataReader. So just a normal Spark DataReader used to reading data in Spark. It’s a format, the same as we might have a JSON DataReader or a CSV DataReader, we’ve got a CloudFiles DataReader, which is specifically for using Autoloader.
And then in extension to that, we’ve also got the CloudNotification Services. But that’s like a little bit of the extension that goes and creates these file watches, it creates a message queue. And we’ll talk about what that actually means.
Okay, so in this case, we’re using the CloudFiles reader. And what we need to see is a queue of data already. So we’ve got our Blob Storage down the bottom, we’ve got our list of files that we can actually go and read in our landing Blob. But we’ve already got a queue setup that has a list of messages. And each message is a file that we haven’t read yet. So we’ve got this track, we’ve got this list of, “Here’s all the files I need to go and read next time I execute.”
Now the CloudFiles DataReader is expecting that to be set up in advance. So when we start, we kick off that DataFrame, the first thing it does is have a look in that queue and say, “Is there anything I need to read?” And then it’s going to go, “Yes. You need to read file four, you’ve not read that yet.” And then the actual DataReader kicks off, reads that JSON file and process it as normal. So the CloudFiles reader is always looking for that Blob queue if we’re using Notification Services. That’s a trigger that we can switch and say, “Yes. I want you to use a queue, rather than do the direct listing, rather than working the same way that Spark Streaming works. But again, have to read the entire structure of the directory each time, I want you to go look in the queue, just read what you need to read.” And that makes it much easier.
Okay, so in terms of the normal DataFrame, there’s those couple of bits of information we need to give it. So you can see we’ve got to kind of Spark readStream. It’s a streaming DataFrame, always. It’s not a batch one, we’ve got format of CloudFiles. So we’re telling it to use Autoloader, we’re telling it, it’s going to be loading from a queue of messages. And then because we’re telling it that, we also need to tell it, “Well, the data still has a format.” So we’ve got that separation. So we’ve got these two different bits that we’re keeping track of. Spark’s telling it to use Autoloader via the format of CloudFiles. Then Autoloader is telling it to expect JSON. We’re saying cloudFiles.format, the format that the CloudFile DataReader should look for is JSON. And you’ll see a couple of those CloudFile configurations will step into detail with what they actually look like.
So we’re also using CloudFiles that use notifications. And that’s the thing that allows us to switch it between doing directory listing, which gets slower and slower as things go on. And expecting to have a queue and have a list of files it should read. So get used to it. There’s a couple of those different configurations, and we’ll have a look at some of these interesting ones as we get deeper into it.
Okay, so on the other side, so when we’re looking at CloudNotification Services, so if we were saying, “Yes, please. Please use Notification Services.” What’s going to happen? So in Azure we’ve got Blob Storage, that’s where our files are coming in, they’re landing into Blob as we go.
Now, underneath that, every single Blob store has an EventGrid Topic. Now, that is an automatic file watcher, that something’s always keeping track of anytime anything happens in that Blob. That could be a Blob is created, deleted, updated, someone’s edited some permissions, or something in the queue. Essentially, it’s a whole event tracker. Now, that’s going to file whenever something happens. And we can have the same as any other kind of Kafka style queue, we can have subscribers on that topic. So what we’re going to say is, “I want to subscribe to any changes to this EventGrid, when it pertains to files landing in this folder.” And then what that’s going to do, put messages into a Blob Storage queue.
And then for each different data set that we’re tracking. So I’ve got my customer’s table, my product table, my sales table, I might have different subscriptions setup, each of which will have a different queue. So when I kick off my Autoloader job, it looks to the relevant queue, and then can just go and grab all the messages saying, “Here are the files I need to load.”
Okay. So to kind of break that down a little bit more. In our Blob Storage or CSV lands. As soon as it lands, it triggers that message into the Event Topic. And then my subscriber’s watching that Event Topic with certain filters set on it. And if it matches that filter, that’s when it will go and put that message into my queue.
So that is all the Notification Services. And you can go and build that manually yourself. You can just go into Azure and say, “I want to create a new event with subscription, here’s the filters, this is my queue.” You can build that yourself, or you can let Autoloader build it for you. And that’s all down to the config that we set. So bit of config.
So we’ve got that CloudFile settings. There’s different parameters we can pass into the CloudFile settings. So do you want to use notifications? What’s the queue name? If we’ve already set it up, if we’ve done it ourselves, and we’ve produced a queue, and we just want Autoloader to use that existing queue, we can use that queue name. Go and look at that thing. Or we can say, “Well, here’s a connection string. And then here’s some details about a service principle.” Essentially, “Here’s how to connect to Azure, is there an administrator who can create Azure resources for me? And I want you to create an EventGrid Topics subscription, I want you to create a queue in a Blob store, I want you to set up that small bit of architecture to start file watching and keep track of those messages.” So you do need to have that bit of information set.
Okay, so that’s a little bit of basics, that’s a little bit of kind of, “These are the core parts that we’re expecting to see in Autoloader.” So what we’re going to do is going to hop into a notebook, and have a look at how I set that stuff up inside a notebook, kind of what we’re expecting to see from the Blob Storage side of things. And essentially, what it looks like when we start reading some files.
Okay, so here we have a little picture of Azure. So this is my Blob Storage. I’ve got some files landing in here. And we’ll work through a couple of different examples throughout this talk. So starting off with, I’ve got my NYC taxis, the classic New York taxis data set, and I’ve got a single CSV. I’ve got some data living in my Blob store, and I want to start loading it.
So I’ve got a notebook here, which is set up to go and do that and go and read the data in. So you see first things first, I’m having to tell Autoloader, the schema of the data to expect. Now previously, this has always been the case. And we’ve got some advancements, that means we no longer do that. But we’ll come to that later. But for now, we’re having to give it a struct. So I’ve got this struct defined in JSON, I’m converting it into a normal Spark struct, the same as I would with any other data set.
So we’ve got this schema that I’m going to pass into my DataFrame. I then got a whole load of secrets. So inside my Key Vault, so inside my secret store within Azure, I’ve got things such as, “What’s my subscription ID? What’s my service principle ID? What’s the secret?” All the necessary bits of information to connect to Azure and ask it to create an EventGrid subscription, to create a queue to do this kind of resource creation on my behalf.
Now, I’ve also got a key that allows me to go and read that existing queue. So it’s kind of like I’m expecting the queue to be there, I know how to connect to the Blob Storage in which it’s going to create a new queue for me. So there’s a little bit of pre-work, a little bit of setup that you need to have done for this to actually go ahead and work. So I’m populating those variables from my key Bolb.
Then I’m building up a little bit of a config. So I’ve got my CloudFile object. And inside that, I’ve got a few different options that I’m piling up. So I’m just passing in those various things. I’ve just gone and grabbed from my secrets. And I’m passing those in to this config file. I’ve got two extra things, I’ve got the format of the file just expecting to see. So I’ve uploaded this as a CSV, it’s expecting to see a CSV. And I’m telling it, “Yes, please use notifications. I want you to use a queue when you go and create things.” So I’m going to run that. And then finally because I’m going from Blob store, I’m going to give it a little key about how to go and connect to that Blob store when I read from it. Normal Sparky kind of things.
Okay. Then we can actually build our DataFrame. So I’ve got a little variable at the file path. This is telling it to look in my landing Blob, and to look at this NYC taxi folder inside that container. And then spinning up a new DataFrame, I’m using spark.readStreaM. Again, always going to be streaming. It’s a format of CloudFiles, and then I’m just unpacking that config file into my DataFrame using the options additions, using the options attribute.
So essentially, that’s the same as writing option and passing the each of these different things in as a different option, which is kind of packing it all together and sending over one load of options all together. We can do that and go ahead and create that DataFrame. So it’s use that schema. So it’s got that JSON schema that’s passed in. It knows what it’s expecting. At this point, it hasn’t gone and done anything.
Let’s do a little bit of a test. So I want to say, “I want to do a quick new DataFrame, I want to add the file that is coming from and do a quick aggregation.” Just tell me how many rows you’re getting from each file?
So what we’re going to see here is it’s initializing the stream. And now because I’ve got that use notifications option, it’s going to go and create some objects for me. So actually, inside Azure, it’s creating things on the fly. It’s creating an Azure EventGrid Topic. It’s creating a subscription to that topic. It’s creating a queue for me to go and look at. Actually, in this case, it’s using an existing system topic, and we’ll go and see what that looks like. So there we go.
So I’ve now got a query, it ran. It’s got a load of rows that are found in there. I can see it’s doing a normal kind of batch kind of thing. And this’ll just keep running. So the second batch had no rows in it. It’s going to keep running and looking to see what’s happening in there. For now, let’s keep that ID in mind. So a three, a four, et cetera, et cetera. And we’ll go and have a look for that. You can see I’ve got one file with a row count.
So I’ll leave that running. What we can do is step back to our landing area. So I’m going to go and upload a new file. I’m going to go and grab a file. And we can just grab, we’re dealing with taxi number two. Make sense? Now upload that, that’s going to go in, and hopefully that’ll get picked up.
So while that’s actually happening, we can think about what’s going to happen. So the file’s going to get uploaded, it’s going to notice that file’s been uploaded, it’s going to add a message to the queue, then that’s going to go and get passed back and picked up by Autoloader. So we can see inside landingspot, we’ve got our events. So we can go and have a look at what’s actually happening under here. We can see we’ve got a inbuilt system topic. And then it’s got a couple of different things built up, one of which got that a343 actually matches our streaming query. So it’s created this whole subscription for us on the fly on the back of that existing query.
And then on the other side it’s supposed to create the queue. So in my Blob queues, I’ve got a queue for each time I’ve run a query. You can see I run one earlier, and I’ve got no results in there. And that’s because my stream is constantly running. So if I step back over here, you can see it’s already updated. I’ve got my second file in there, and I’ve got a new file account, which I’m going to cancel it.
So you can see in that case, Autoloader, which is constantly looking at that drop zone, we’re currently looking at the messages that are being fed from that drop zone, and it will update as soon as something comes in. Now, in reality, we don’t tend to display over there, that’s a one off query, it’s not going to help us that much. We’ll be doing something like this. We’ll be saying, “I want a dataframe.writeStream, I want to do it in delta format. I want to append it to an existing one. And I can give it a query name, which really helps with debugging, which we’ll look at. I can give it a checkpoint.
Now when you’re dealing with functions for streaming these days, especially with Autoloader. It uses a thing called RocksDB behind the side. And that’s essentially key value pair. So anytime it loads a file, it keeps a log saying, “I’ve loaded this file. I’ve loaded this file. I’ve loaded this file.” So we’ve never saw the file again. And then that they do again, I’ve loaded that before. So it has that kind of nice track of what’s being done, what point it was at. So not only is it clearing down the messages time and keeping track of what it’s done purely by what’s still outstanding in that message queue, it’s also got a checkpointed log of what it’s actually processed.
So we can use this as part of a fairly clear ETL stream going, “Get data constantly watch this area, push it down there when you’re done.” And that’s a real simple little ETL example saying, “I don’t want to have to care about what files I’ve read, what fonts I haven’t read.” Turn it on, keep running and it’ll just go through and process files as they arrive.
Okay, so that was a nice, easy example of how we might do some basic ETL. But it doesn’t really show you what we do in reality, what we do in production because it’s rarely as simple as get the data, append it into an existing table. So there’s a couple of scenarios I want to step through just as to how we actually do some more advanced things and a few little tricks that we used to get the most out of Autoloader.
So as a reminder, we’re doing this where we’re saying, we’re trying to stream from our data when a new CSV, how do I put that in? Now that can be a challenge, right? Because we don’t necessarily know when files are going to arrive. And one of the biggest challenges we had when using Autoloader is essentially what happens if they don’t send files that regularly.
So this kind of scenario. I’ve got a cluster that’s turned on 24/7. So that is a fairly expensive cluster, there’s a big processing, distributed engine sitting there waiting for files to arrive. And they send one once a day. That means most of the day, my file’s sitting there going, “I got nothing to do.”
Now, obviously, I might have lots of other stuff. I might use a cluster for loads of other things and be getting tons of value out of it. That makes entire sense. But would I have a dedicated cluster just for that? I don’t want to leave it turned on. The same as with any other streaming, we can use it in a trigger mode. So we can do things like this. We can say, “Well, my DataFrame on a write stream, I only want to trigger it once.” That means when I execute that notebook code, it’ll initialize the stream, it’ll go to the message queue, it’ll find any file that’s got a node, set for micro-batch to process them all. And then it will stop. And I’m using things like auto terminate in my cluster, that’s really nice. I can just have an ETL process that starts that notebook. It’ll read any files it’s found, then I’ll turn itself off, and I don’t have to worry about anything.
So for low frequency streams, it actually works really nicely as long as you use that trigger once process. So you’ve got a choice, you can use it streaming, you can use it so it’s just always turned on, always doing things. As soon as the file lands, it’ll go straight through and you’ll start seeing it in your later downstream tables. Or you can use it as a, “You know what? I’m still going to do things batch, I’ve still got a batch process. But I run this every hour every day, every week if I wanted to.” With some limitations, which we’ll get to. And then it will just look after itself, it’s essentially saying, “I just want to use this for the file watching, the file management side of things. So definitely some interesting use cases, we can do that.
The other scenario is merge. Now if I’m doing streaming, streaming and merge don’t really talk to each other. Normally, we’ve got things like we’ve got a pen, you’ve got update, you’ve got complete. There’s a few different ways of managing streams. And there is complexity there. Because that’s expecting you to have kind of a steak full stream. You can do things like drop duplicates, or merge into an existing Delta table. There’s real challenges there.
So if we’re doing this Autoloader thing to try and suck data in automatically and actually land it in a usable state. What happens if we want to update the records? What happens if we want to upset the data, and we don’t want to have a flat append? So in that case, there’s a little normal streaming trick, which is for each batch. So I can save my DataFrame, write my stream, and then for each batch, and then give it the name of a function. So I need to define a function saying, “Expect an input DataFrame,” and that is a micro-batch DataFrame. So each time the streaming query goes and does a micro-batch, each time it kicks off a separate little bit of processing, it’ll actually kick that DataFrame over to this function, and then run as a normal batch. So anything you do inside a normal batch process, writing to multiple different outputs, writing to things that aren’t an existing streaming sync, doing a Delta frame merge, you can include inside that function.
So by building up that function, and then accepting that inside that for each batch, actually, suddenly you unlock being able to do all the things you can do in a normal batch process. It’s just you’re having to wrap it in a function. So we use that quite a lot. And that ability to say, “I want to use Autoloader, and I wanted to merge into an existing data set.” It’s pretty cool. There’s a lot we can do in there. And that’s suddenly unlocks a huge amount of functionality in terms of getting not just into that first landing bronze layer of your leg. But actually your ability to do streaming into things like your silver and your gold curated areas of your leg because we can do things like merge things properly.
So we’ll have a bit of a look at that next. We’ll say, “How do we actually turn it so its batch?” We’ll have a look at our trigger once. We’ll have a look again, what happens if I’ve got an existing set of data in a Delta table? And I want to use Autoloader to merge some updates into it. What does that actually look like? What does that for each batch function look like? And then what do we actually do with logs? How do we actually see what’s going on inside there? And I’ll show you quickly the Spark Streaming UI and how we can see what’s going on inside those queries.
Okay, so we’re back in the Azure portal, we’re back in my Blob store. And this time, I’ve got some employees. So I’ve got a basic employee CSV. I’ve loaded this previously into my delta table. Let’s have a look at that. So I’ve got my Autoloader too. Same again, I’ve got my schema defined at the top. I want to go and create this, make sure that’s all happy. I’ve bundled all of that set up into one cell. So I’ve got my schema, I’ve got getting the secrets from my Key Vault. I’ve got building up that CloudFile and config bits, and I’ve got my Blob store. I’ve just combined all of that into a single thing.
And we’re then going to run my DataFrame as normal. That’s nice and easy. In this case because I’m reading from a CSV, I can actually include things I would normally include in a CSV batch DataReader. So I can say, “Well, how is this true?” I want you to ignore that first row. I can do things like bad roads or bad records that I can play with some of the normal settings in conjunction with some of the CloudFile settings.
So I’ve got that going in. And before we actually kick things off, let’s go and have a look at what this currently looks like. So I’ve got this table stored as a Delta table. I’ve already created this death table in advance, and I’ve run that first file through. So you can see inside this data, we’ve gotten some people, I’ve got some departments, some salaries, a variety of people, I think from [Public Fiction]. And what I want to do is I’ve got this extra CSV. So I’ve got this other data set, some people, and this is some updates to existing people. So it can see Jimmy Dimmick, I think at the top, he’s going to get a big whack of salary added. So he’s going to match an existing employee IDs, and there’s going to be some new people. It’s a normal update, insert, absurd kind of operation. So I want to be able to take this CSV, drop that into Autoloader, and have that table become updated with those values. Okay? I want to do a merge. Makes sense?
So I’ve got my DataFrame, I created my DataFrame that resembles normal a pattern in my CloudFiles options. I’ve got those various settings are passed in the schema, all good to go. And then how do I get this working? So I’ve got these two bits, I’ve got my upsertToDelta function. And then I’ve got my stream query. So let’s do that. Weirdly, let’s do the stream query first. So I’m saying dataframe.writeStream. It’s in delta format, I’m using a pen mode, I just want to send all the data through. I don’t want it to try and aggregate. I don’t want it to look at anything of the existing stream, basically just treat each incoming file as a separate chunk of data to be processed independently.
And then I’m doing for each batch. So I’m saying, “For every single chunk of data, you try and process, go and run this function. And that’s what we’ve got in that. So both the upsertToDelta function, which we’ll have a look in a moment. I’m giving it a query name. Very useful, we’ll see that in logging. I’m saying trigger once is true. So in that case, I’m going to kick this off. It’s going to start running. It’s going to do a single batch. And then instead of the first one that we saw, where it just continued running. And if I dropped a new file in, I would automatically get processed. This is going to run. As soon as it’s finished, that first micro-batch, it’s going to stop. It’s going to turn itself off. And that means we’ve got a little bit more control about using it as a batch process, using it incrementally. Whenever I want to trigger it, I’ll let it go, and it will do anything it needs to do since it last ran.
I’m also again giving it a checkpoint, so it’s got somewhere to store that data. And then we’re saying store. And I don’t need to give it a path weirdly because that’s handled inside this upsertToDelta function. So that’s our stream query action. Let’s have a look at that upsertToDelta.
So I’ve got a function up here. So I’m having to import the Delta tables, I’m doing a Delta table merge. It’s expecting these two inputs. So it’s expecting the first input being a DataFrame of some kind. Call it what you will. And the second being basically an integer, which is how many batches I own. What number of micro-batches is stream query being running on? And that will continue incrementing even if you stop it, it’s going to use the checkpoints to work out where it’s at.
So I’m saying, “I want to go and grab my delta table from an existing registered table,” and then just a normal straightforward merge, I want to merge. I’m going to alias my table into an S, so I can do that. They both got employee ID, so I’m giving it that match criteria so it knows what to merge it on. And then if it already exists, just update all the records. If it doesn’t exist, insert it as a new record, the most blog standard basic of merges. That’s all I want to have.
Okay, so let’s run that first. So we’ve registered that function, I can now call that function. You could have a generic function that you call from many different Autoloader jobs, it’s kind of up to you how that works. And then we’ll call that in a second, what I’m going to do first is make sure I drop in my new file. So I’m going to go grab my data, and I’ve got my employees. I’m going to drop in my employees, 2021 May. Upload that file much quicker in a much smaller file. And then that should now trickle through.
Now you saw when I went into the first demo, I already had a queue set up. That was this one, so it’s already got the queue mechanism, the EventGrid handle, all the stuff it needs to actually do this. So we should have the kickoff. It should realize it’s got one file with I think 10 rows in there. Run that through, and then we can do another select and we can see has it successfully managed to merge my updates into my table? And it should then stop.
Okay, so it ran. It ran a single job. My raw data, I can see it’s the batch ID one, so it’s not zero. So this is the second time this has run. So I know it managed to get from the checkpoint, the previous history and knows what batch ID it’s up to. Number of input rows, 10. So this is from the update table it’s managed to put some stuff in. It got loaded telemetry. It got a lot of information about what happened inside there, which is super useful.
And then we can go and rerun this. Now I’d like to have a look at my employees table. It gets to see a Jimmy Dimmick has been updated, he’s got that extra salary, I’ve got some new rows for people who didn’t exist previously. Essentially, I’ve managed to merge in my stream of data that’s coming live from a file that’s just being dropped in with no config about having to worry about Metadata of what I’ve read, what I have read, and all of that kind of stuff. And suddenly, that becomes really, really quite useful to be able to just turn this on, start merging things in and then managing that data.
As a final thing, I want to switch over to the Spark UI over here, which has the whole Spark Streaming UI. And that can be really hard to find things. It can be hard to find out what I was dealing with when I look at all my display queries. Now that’s the reason why I have named that query. So because in my thing, I had that query name saying, “DIASMerge.” I go into structure streaming, and I can see my nice labeled query, I can see the Run ID, then I can get some stats about it. I can see around the previous version of it there, and batch one. So I get a history of it running and running and running. So always fantastic idea to make sure you’ve named your queries, and then you can start pulling the logs out from that. I can also inspect that job.
Okay, so that gives us a fairly good idea of some of the more advanced features, when we’re thinking about how do I merge into things? How do I plummet into things when I’ve got kind of existing data. But that still doesn’t really give us a huge amount over existing Structured Streaming. I can use for each batch with a structured stream. I can do that merging approach. I’ve got all that logging, already with Structured Streaming. So at the moment, the only real advantage that we’ve got is the fact it’s not doing directory listings. It’s keeping nice and efficient and quick, until we start talking about involving schemas.
Now this is a real big one. I’m talking about, essentially, if you’ve got a disconnect between the people sending you data, if you’re essentially you’ve got a storage layer, and you’re at the mercy of what people are sending you over, schema drift can be a real hot problem. So we’ve got things like this, where we’ve got some JSON files. So I’ve got a basic, I had an ID and a ProductName. And then we got ID, ProductName and a Size. And then I had an ID, a ProductName, a Size, and then some nested structures and some more detailed, and machine washing and all that kind of stuff. And essentially, that gradual growth of data complexity can be really hard to look after, can be really hard to manage.
So that’s what we’re talking about when we’re saying schema evolution, kind of getting extra data, what do we do with it? So we have a choice. How do we handle that? Well, we can say, “No, broken. Stop it.” And just fail the stream. And then we have to go and fix it. We have to break it. We have to reload our data and go and change around all the schema that we’re passing into it. So it updates the new version of things, and then figure out how we get that into our existing data set. Or manually intervene, we can go and update our tables, so it looks the right way. Or we can automatically evolve. And that’s the dream, right? That is paradise, if I can just keep my stream running. And it will just automatically figure out, “Oh, there’s new data, that’s cool. We’ll capture the new data, we’ll put it somewhere else, which won’t fail.” That’s really, really kind of Nirvana for dealing with this ongoing schema drift problem.
So in order to do that, we kind of need to have a few bits of information. What does the schemer expect it to be? And that in the previous demos, we’re giving that in at the start. We’re saying, “This is the schema. This is the thing I want you to deal with.” Then what the schema is now. So actually, if that was my schema, and this is my schema now, do they still match? Do they not match? And then what to do in a case that they don’t match. So we can kind of dig into that and work out what’s actually happening then.
Though schema inference is brand spanking new as of the point of recording. So Databricks Runtime 8.2, which is currently the most recent Databricks Runtime, allows you to turn off that schema input into a DataFrame, allows you to do schema inference. So when you start that Autoloader stream, it’ll take a look at the file and go, “Oh, that’s what it looks like,” and generate a schema for you. The same way if you’re doing a normal DataFrame, and you turned on the infer schema option, it allows you to do the same thing, but inside of a stream. And that’s super, super useful.
And then we can start to manipulate actually how well it figures out what that schema looks like. So by default, it’s going to assume everything’s a string. And there’s a few things attached with that. So we’ve got schema location, that’s where it goes and stores the Metadata about the schema that it inferred. So unlike when we’re dealing with batch DataFrames when it infers the schema, it doesn’t go and put that anywhere, that’s just used for the lifetime of that particular session. Whereas here when we’re dealing with streaming, and we need to have a reference of what did we think it was, what do we think it is now? It goes and puts down some Metadata about what it thought the schema was.
And we can say, “I want you to inferColumnTypes.” So should it just go, “Well, here are the columns, I’m going to treat everything as a string, I’m not going to worry too much about it.” Or do we want it to do some data sampling and go, “Well, that looks like an integer. That’s a long. That’s a decimal.” And actually put some intelligence behind it. Or finally, we can have schema hints. So I can say, “Well, just go and infer most of the schema. But if you come across the ProductID, that really has to be a long because we’ve had problems with that in the past. So we don’t have to provide the full schema, or we can kind of nudge in the right direction for certain bits and pieces of it.
So we’ve got some things that we can do, which really allows it to get on top of schema inference inside of Autoloader. So in this kind of case, so we’re dealing with that straight file, the first time we turn on Autoloader, the first time we run this thing, it’s going to take a look at that initial file, and go, “Cool, it’s got two columns. It’s going to store that as a Metadata file, inside my schemas folder.” We’re going to have a zero file. And it’s going to have a little bit of JSON in there that says, “There’s a bit of like Metadata wrapping,” then it’ll say, “Well, here’s my struct. Here’s my Spark struct object that is going to expect to use as the DataFrame. And that’s an ID, it’s a string, [inaudible] string. Again, as I said, it’s going, “Assume everything is a string unless we tell it otherwise.” So that is new. That whole idea of having a Schema Metastore that keeps track of what it currently thinks the schemer is, is fairly cool.
Now, if we’re not happy with that thing, well, strings aren’t good. I want you to do a bit of work to figure out what that looks like. Then we can say, “Well, inferColumnTypes.” We can add in an additional column, CloudFiles inferColumnTypes option. And then it’s going to say, “Well, that zero mass data in generates, you’re going to be a bit more intelligent. So Id, it’s gone. That’s an end. ProductName, it knows a string. It’s going to go and do a little bit of sampling of some files. And we’ve got some options about how many files it should sample. How many bytes it should read before it goes, “That’s enough, I’ll work out the data types from there.” And it’ll do that for us. But if we’re still looking at that going, “No, we know better. That ID that, that should be a long.
Then we can use schema hints, that’s an alternative to allowing it to do infer schema or on top off. And we can say, we can pat in essentially a little bit like a SQL table definition. We can say hints is along with comma, ProductName is a string comma, and we can just give it a comma separated list of column definitions. And I’ll take that and go, “Great, I’m going to use that to improve the Metadata I’m generating.” So in this case, I’m saying schema hints are on. ID should be a long. And then it’s going to generate the same schema file, which is to update the data type, if it found a column matching the one I gave a hint for.
So we’ve got a lot of options about how we do the schema inference and how we’re going to deal with it. They’re going to just tweak and adjust this Metadata object that’s created. And that’s okay, that’s useful. That saves me the efforts of creating that schema that I’ve pat in when I first turn on Autoloader. But that’s not the really cool stuff. That’s not where we get the most value out of this. Where we’re dealing with the interesting stuff is when we talk about schema evolution, and we’ve got some options. So we said at the start kind of when we’re dealing with this stuff, we need to know what we thought the schema was, what the schema is now. So schema inference is doing that for us, Metadata store is doing that for us. What we should do if the schema differs is where we come to these options.
So we’ve got a few options about how we can tell it to work in a new schema evolution mode. So we can say, “Well, addNewColumns.” So if I infer the schema, I’m doing a micro-batch, and I check the schema of the file I’m opening and it’s different to the schema that I’m expecting, what it’s going to do is one is going to fail, weirdly. It’s going to fail the job. It’s going to go to that Metadata store. It’s going to create a new object with the updated schema. And then when I next restart that job, it’s going to pass, it’s going to work. So it is stopping the stream. If you’re leaving it in streaming mode, it’s going to break. You’re doing it in trigger one’s mode is going to break. But then you can just restart it, and it will then recover from where it was with its updated schema information.
So addNewColumns, updates the schema for you. failOnNewColumns doesn’t. It just stops and goes, “No. Broken, you need to go and fix it.” That’s kind of your manual intervention. You can go and edit that schema object, you can create a new schema object, essentially the DIY, go and play with the Metadata yourself.
I can also do this thing called rescue. Now, that is really cool. And I love it. And that is when instead of actually changing the structure and changing the schema, you kind of just have a dumping ground column. And anything I don’t expect, anything that doesn’t conform to my expected schema, just put it in that dumping column. So don’t evolve my schema, don’t change my schema. But I just have this kind of dumping ground, where I can go and write some queries and explore what’s been held in there without affecting the rest is my schema. And that is a super usable bit of functionality when dealing with unexpected drift. Or I can say no and ignore. If it doesn’t match my schema, throw away those extra columns, I only want these bits, I’m only ever going to want these three bits of information from that table. If it gives me five bits of information, ignore the other two. So it’s up to you how you deal with it.
As a quick reminder of the things we’re looking at in terms of evolution. Got those three files, so we’re expecting ID and ProductName. And then we got ID, ProductName and Size, and then we’ve got a load of extra information. If we’re dealing with rescue, if we’re saying, “I want to rescue column.” Blend it with something that looks a bit like this. So I’ll have my three records in there. And I’ll have ID and Belt and nothing in _rescued_data because we didn’t rescue anything.
In my second one, I’ll just have Size, XL as a little bit of nested JSON. Basically, it’s a JSON string, but I can then go and query and pull bits and pieces out of. And then the same for my third one, despite the fact it’s more complicated. It’s got a nested struct in there, it’s got some complex things inside. But that complexity has all been hidden away, and pushed down into that _rescued_data column. Now, that does have some limitations in terms of if you’re trying to get something really optimized. And using Bloom filter indexes, you’re looking for files, skipping, using set orders, and all that kind of good stuff. You can’t really do that if the attribute is hidden inside that _rescued_data column. But if you’re just trying to create a plain querying layer, but still allowing people to go in and query those bits of information inside that without worrying about schema drift, that is awesome. That’s really one way of doing things.
The alternative, if we’re talking about this, addNewColumns will get this kind of pattern. So when it fails, on failure, you’ll go and create a new method or object which represents the new schema. For next time I start it, it won’t go to that zero file, it will go to the one file and go, “Hey, I’m now expecting Size, I can now see Size as part of my DataFrame,” and it updates and caters for it.
So let’s go and have a look at those things and see, when we’re using Autoloader, and we’ve got a changing evolving schema, what does that look like? And how do we play around with some of the settings to change schema inference to create that schema meta store? And then how do we deal with schema evolution?
Okay, so we are here back again, in the Azure portal. And I’ve got my product list now. So I’ve got a real basic, but a JSON sitting as a product list, kind of getting their ProductID, name, color ListPrice, some basic information, right? I’ve got my one file in there.
Now, got my Autoloader number three. And you can see the first things I can do, I can play around with some settings. So if we’re going to use schema inference, I can say, “I want to use a number of bytes to read or number of files.” Now in this case, I’m using tiny data and a couple of JSON files. It doesn’t really matter. But each time you’re going to go and read and infer the schema, it’s going to have to read a certain number of files. So how many files do you want it to do each time or how much of a rolling window, according to the last modified date in the actual blob, you can have a play with those settings and get it tailored to whatever you’re trying to do.
And then normal stuff. So I’m going to go and get my secrets, I’m going to go and create my CloudFile config, I’m going to go and connect to my Blob, normal stuff. And then this time, I’ve got some more configuration options, I’ve got whole host of extra options I can use to determine how we’re doing schema inference.
So number one, I’ve got that schema hints as an example. So if I knew something about my data, and I knew that ProductID was an int, I knew that ListPrice was a decimal, I can include this option to say, “Here’s some hints. This will help you figure out where you’re going and what you’re trying to understand in terms of your data.” Instead, I’m going to use it to let it infer. I’m like, “No, you can sample the data. You can figure things out. It’s all right. I trust you.” And it will go and figure that out for me.
Now if we’re doing schema inference, We have to give it a schema location. So that’s somewhere to go and store that Metadata file to go and create a little bit of JSON that describes the schema. So whenever I do a micro-batch, whenever I stop and restart the stream, it knows what schema to expect.
I’ve got some extra things, I can do partition columns. So even my landing table, if my source data set actually had a folder structure that represented some kind of hive partitioning, I can tell it to infer the columns into my data set. I’m not using that at the moment. And then we can talk about what our different schema evolution modes are. So how do you want to deal with different data sets?
So for now, let’s actually start off with the rescued data. And again, with that, I can say my schema evolution mode is going to be rescue. So in this case, if the data ever veers away from what I’m expecting, just dump it inside this _rescued_data column, and I can change what that’s called. So I’m leaving in default, I’m actually leaving it. It’s called _rescued_data. If you want to change the name of that column, you can go and have a play with that. And then we’ll have a look at those two later. So let’s stick with that as our basic bit of information about how we want to set things up.
So we’re going to infer columns, we’re going to store the schema in our lake. And we’re going to use rescue mode when we’re doing it. Let’s go and do that and passing in our CloudFiles and our drift config into it. So you can see it has already gone and inferred our column. So it didn’t have a structure. It then inferred that’s the structure. And we’ve got this _rescued_data, it’s automatically appended this extra column onto the end of it, knowing it’s doing some kind of schema inference. So we can say, what does that look like? And go and kick that off. Now while it’s running, so that’s going to creating a queue for me, doing the messaging, doing all the normal Autoloader stuff.
I can actually go and have a look inside my lake, inside this Tokyo Drift Autoloader three, and it created the schemas folder for me automatically. You can see I’ve got that and zero Metadata. It’s created an object because I kicked off that job, it’s gone create that for me, and have a look what that looks like. And there’s a little bit of Metadata, kind of just saying it’s a data schema and all that stuff. But then it’s just the normal struct. So I can see color, I can see ListPrice. It’s just a normal JSON schema that is inferred on my behalf, that represents the same thing that we’ve done inside this particular structure.
Okay, so we can see, shockingly enough, nothing has been rescued. That’s because I inferred my schema based on this file, shock horror, the file hasn’t changed schema. So let’s go and upload a new file. I’m going to go and grab in here and go in my final one is in drift. I upload this 2020 version and push that in, go nice and quick. Let’s go on in. So what we should see is when this actually goes through and refreshes, it’s going to go and push it into that _rescued_data column.
So next time it actually runs on micro-batch, we get this update, we’re expecting to see a load more records. So we’re just doing this play, we’re just saying we’re not trying to merge or anything in this one, we should see a load more records go into that particular DataFrame, that actually then has a load of _rescued_data. So because it had extra data, it had Weight and it had Size. It had some attributes we weren’t expecting to see. It’s fine, we’ve just rescued them. So the rest of our data that conform to the schema, that’s fine, we can see it. And we just populate this _rescued_data with the things we didn’t understand, things that we didn’t see. It’s not going to change my schema at all. So I’ve still got my schema is zero, there’s no change to that, it’s just handled the drift quite naturally.
So let’s go and have a look at the other version of doing that. I don’t want to rescue in that. What I wanted to do. I wanted to date by schema. So when I addNewColumns, I want it to go and fail, and then go and tell me that my structure is updated. I’m going to run this again, so it still got that same [inaudible]. It has not re-inferred the structure because at this time, it’s got a schema in the Metadata. So it’s not working out fresh, it’s going, “No, this is the structure I’m expecting.” When then go run from that. Go and run. Now we’re expecting this to fail. It’s going to say, “No, no, no, there’s columns in here that aren’t in your schema. That’s incorrect.
So once it’s initialized, once it’s got to the queue, once it’s starts reading the data in, we’re going to see a fail. So encountered unknown field, it’s got Weight and Size, it’s not expecting to see that. But because we’re using addNewColumns, it’s gone in here, it created a new schema. We’ve got a new JSON file now that in that structure should have towards the end. If it’s there, we’ve got it. We should see Weight and Size have now been added to that structure. So it breaks the stream immediately. But then next time I actually run that DataFrame, it picks off the latest schema that’s now got Size and Weight in there and then that’ll just work.
So you’ve got these two different approaches. Do you want to put a pin in _rescued_data and say, “Just put everything in there and don’t worry about it?” Or do you want it to actually evolve the schema and represent the newest schema and have that a moving description of what your schema looks like. But it has a failure. Basically, you have to restart the stream each time. And both of them have their pros and cons as to how you like working and what you want it to look like. So you can see we’ve now Size and Weights are in there, nothing’s in our _rescued_data because we’re now using that updated schema.
Okay. So hopefully that is fairly inspiring in terms of, yeah, we can go from what we started off with something fairly similar to approaches we’ve used before. But we’re building on those, we’re adding some more stuff to it, we’re enhancing it with all things like schema drift and schema evolution out of the box, and it suddenly becomes a much more attractive proposition.
Now, there are one or two gotchas, I want to make sure you guys are aware of when you’re using Autoloader because of the nature of some of the systems. Now, some of these recommendations are very much in the Azure side of things. So there will be different if you’re using this in AWS, or GCP.
So some more lessons. One, EventGrid Quotas. We didn’t even think about this, but you need to be aware of what the limitations are, or in the number of different objects you can create. Then there’s a few settings and tweaks you can make when you’re doing streaming. And when you’re working out with batching that you just need to be aware of.
Okay, so first things first, some out of the box quotas. So if you’re working in Azure, custom topics per Azure subscription. That’s something that we’re not too worried about in terms of how we’ve been working, we’ve just let Autoloader use a system topic. So a storage account, when you create it automatically creates its own EventGrid subscription that is a system topic. Now we can add new topics on top of that existing storage account, we’re going to create 100 of them. So if you go in the custom topic route, a little careful.
But that’s generally we can use it to get over the next limitation, which is event subscriptions per topic. So if I’ve got my storage account, and I’m creating multiple different subscriptions, so for this folder structure, for this folder, for this, folder for this folder, and I’m creating lots of them as I go, I can only have a maximum of 500 subscriptions to that topic.
And that doesn’t seem too bad. But if you think about a huge enterprise style data platform, if we’re saying we can only have 500 different data feeds coming into it. 500 different tables just think, you’re going to hit that limit. Especially when it’s good to know, when you’re doing things like a display, when you’re quickly running something to check something that’s going to go and create its own one off queue, that’s then abandoned and left there. You need to be cleaning up to be managing those services. So be very aware with those limitations.
Now, based on that, so we can have 500 files from a single store, be a little careful about that. We can if we delete the checkpoint folder, so if you’re writing out to a Delta table, and you’ve got the checkpoint Metadata that stores inside there, that has the ID of the streaming query inside of it. And that natively links it to the queue and to the topic. If you delete your checkpoint because you want to restart from scratch, you want to report things in, it’s going to ignore the existing queue and topic and start a new one. And then suddenly, that 500 that you had just gets chipped away with orphaned records being left there.
Now you do have the CloudNotification Libraries, it’s worth checking those out. And that allows you to say, “Actually, I don’t want to use the system topic, I want to use a custom topic. So I can actually, instead of having 500, I can have up to 100 custom topics across my subscription, each of which can have 500 subscribers.” So actually, that suddenly gives us a much, much, much greater scalability for how many things I can read from a single storage account. And it allows us to tear down ones we’re not using.
So dig into that. That is very, very important. If you’re trying to run this at scale, and you’ve got thousands of files and thousands of different data feeds, you’re going to need to do some custom topic management, and some essentially maintenance and cleanup of orphaned Notification Services.
Okay, so streaming wise, there’s a few things to do. And it’s all normal streaming stuff. So you’ve got things like MaxBytesPerTrigger or MaxFilesPerTrigger. So you can do that outside of the trigger once. Trigger once will override those settings. But if you’re doing it and leaving it turned on as they’re streaming, you want to continuously doing that. You can say, “Well, how often should it do micro-batch? Should it actually micro-batch per file and so you can treat it independently for each branch, or should it go for each chunks? Depends on how many files you have incoming, how big they are, how often they’re coming in, but you can tweak that and you can manage that. You can combine it with trigger settings. Rather than saying trigger once, you can trigger every minute, trigger every 10 minutes, trigger every two seconds, you’ve got a lot of nuance to how you actually set that stuff.
The other thing to be aware of is Fetch Parallelism. Now that’s how many threads it’s going to use to read your Blob key. Now that’s not a big thing, if you’re dealing with a handful of files. It’s going to use one thread by default and go off and just get a bunch of messages back. But if you’ve got a very, very heavily used thing’s got thousands and thousands of files going into that queue, that’s going to be a big bottleneck in itself. You having one worker just trying to read back all these messages to tell Autoloader, which files to load.
So you can manipulate that, you can have a play with that to turn the dials of how often it micro-batches, how often it triggers, what’s included in each trigger, and then how many threads it’s using to go back and read that queue. So you can go fairly deep into looking at that, and you need to be aware of those settings if you’re leaving it turned on as a stream.
Now on the batch side of things, there’s a couple of real gotchas. Now, on a EventGrid subscription, there’s two settings that are very important. Now you’ve got this storage queue message time to live, essentially, when it’s picked up the fact that a new file and it puts a message into the queue. If you don’t touch that message for a week, it gets de-queued, it disappears. So if you’re running this as a batch mode, and actually you pause it, you pause for maintenance, you pause it over the Christmas holidays, and then you come back and try and start things. Again, if it’s been longer than seven days, you’ll have lost those messages with the default settings.
Now, unfortunately, Autoloader does use the default setting for [inaudible]. It doesn’t provide a number of days when the default is seven. Now you can go and update that. You can update it via Python CLI, you can use the Azure CLI, you can go and update it manually. Various different ways you can do it, but you can set that never expires, in which case it suddenly then becomes if a message goes in your queue, it’s never going to disappear. I’s love if we could just update that as part of the CloudNotification service. I don’t think you currently can.
The other thing is a thing called dead-lettering. Now if for some reason, your EventGrid subscription can’t get the message into the queue, it’s going to retry I think, 30 times by default. But if it fails, if you can’t get there, for some reason that he was not available, it’s just going to give up and go, “Okay.” Well, that message is going nowhere, unless you’ve got dead-lettering turned on.
And then it’s essentially going to store the events and where it’s going, ‘Well, here’s a store of all the events we didn’t manage to write down properly.” Again, that’s not a setting you can get to from the Autoloader settings. But if you’re running it in batch mode, you’re managing this kind of stuff, you’re using it in production trying to get production grade system, you need to be aware of some of these settings, so that you can then actually put a little bit more robustness behind it, a little bit of fracking, especially we’re going to be pausing things.
Now kind of makes sense, there hasn’t been that much consideration of that kind of stuff given Autoloader is built from a streaming perspective, it’s expecting to be turned on and constantly streaming, so the chance of a message being in a queue for seven days is incredibly unlikely. But definitely worth being aware of that stuff.
So in the whole of things, Autoloader we’ve seen, it does reduce the complexity of figuring out which files have I read? Which ones haven’t I read? We haven’t even thought about that across all of those examples, we’re just dropping a new file in, and then it magically appears in our DataFrame. It’s magically going through and working.
But it does add some complexities where we might be just previously just taking a DataFrame and merging in, we now have to use some for each batch. You need to be aware of streaming. You need to be aware of things like Stateful Streaming. If we’re going to do drop duplicates, we need to do that inside there for each batch. So it does adjust on the new files, not on the whole stream, including all files that have been streamed before. There’s a few quirks about how we’re actually building these patterns, depending on how often we’re actually running it.
One of the biggest killer features for me is the schema evolution in the schema drift. And actually that ability to say, “I don’t care what’s going on with the data, just always run. No matter how much the data is changed, get it into my initial tables, and capture the change data in a separate place. Don’t lose the data, don’t throw it away, just put it in a place where I can handle it.” So despite some of the extra complexity we get from the streaming elements, you get a huge amount of benefit from the schema drift and the schema evolution elements. And it’s growing and growing and growing in terms of that functionality that we have at our fingertips.
Okay, so that is all the lessons that I have for you. My name is Simon Whiteley, and I do this stuff a lot. So feel free to reach out on Twitter and emails and whatnot. And I do do a lot of this kind of stuff on YouTube. So feel free to drop by if you got any questions or if you want to look at any more demos digging into this kind of stuff.
Now as always in feedback is super, super important to the conference. All of us speakers really, really appreciate any kind of feedback that we get. So don’t forget to fill it in. And I hope you have a fantastic rest of the conference. Cheers.

Simon Whiteley

Simon is a Microsoft Data Platform MVP, awarded in recognition of his contributions to the Microsoft Data Platform Community. Simon is a seasoned Cloud Solution Architect and technical lead with we...
Read more