Image Processing on Delta Lake

May 26, 2021 03:15 PM (PT)

Download Slides

Deep learning has come a long way over the past few years, with advances in cloud computing, frameworks, and open source tooling, working with images has gotten simpler over time. Delta Lake has been amazing at creating a tabular structured transactional layer on object storage, but what about images? Would you like to know how to gain a 45x improvement in your image processing pipeline? Join Jason and Rohit to find out how!

In this session watch:
Jason Robey, Architect, Databricks
Rohit Gopal, , Databricks



Jasen Robey: Hi. Thank you for joining our session today on image processing in Delta Lake. My name’s Jason Robey. I am the platform lead form enterprise platform features at Databricks. And I’m joined here with my esteemed colleague. Rohit, do you want to introduce yourself?

Rohit Gopal: Thank you, Jason. Hi, all. I’m Rohit Gopal. I’m a solutions architect at Databricks, based out of Seattle. And my background is in data science and machine learning. And I’m excited to walk you all through what we’re going to show here. Right back at you Jason.

Jasen Robey: All right. So to kick things off, just going over a couple things. As we know and we’ve seen, deep learning has come a long way over the past few years. Even working with different types of data and images specifically, which is the subject of this talk, has improved. We wanted to share also that using images on Delta Lake and how to do that in a way that’s super efficient. Some use cases that I’ve worked through with various customers and departments, I get easily 45 times the improvement in various cases. So Rohit and I want to share this with you today.
So what we’ll go through is first we’ll start off with a little architecture for an image processing pipeline. Walk over some challenges. And then look at some various techniques for operating with images as kind of first class citizens. So we’ll look at that on Delta Lake. And then Rohit is going to cover training on using distributed models and looking at a couple techniques there. And then he’ll show off an inference pipeline where your data scientists can actually bring in whichever model framework they need and still not have an operational code change in order to use those new models.
So we’ll show you those, then we’ll go over some limitations and sum up. So hopefully this will be useful for you and for your image processing pipelines.
Starting off, architecture of an image processing pipeline. So maybe some sample use cases that we’ve seen this in. If you’re already coming to us knowing you have image processing needs then you probably already have this. But we see this kind of across the board for defect and anomaly detection. Sometimes even augmenting SCADA processes with more low cost video sensors and such. And doing some processing on that in parallel to what’s already in place. Identifying health issues. We see object recognition, face recognition. That’s all over the place these days as well as auto grouping and OCR. Lots of different use cases where you might need to really deal with images and not only deal with them onesie, twosie, but at scale, which is what we’re all about.
So the pipeline that we’re going to walk through today really looks like this. So we are going to pretend that we have a batch stream source. We’re going to just use a pool of images, and I’ll show you that during the demo. But typically you’ll have your images come in and they could be in batch or streaming use case. You’ll likely store them and then maybe have a queuing process to go in and run inference based on your existing model. So classify them as it has an object, or it’s this letter, or it says it should be grouped with these people, or has some sort of feature that you’ve extracted from the image. Typically then store those classified images and optionally set up some notifications that say, hey, we really want to look for this thing, notify us immediately. Or just make it available for processing with those classified features.
In a separate cycle, usually, you’ll have your actual training for the model and for your classification model. And so we’ve got that modeled here using MLflow, and in this case you’ll normally have a piece of training code. You’ll pull that in, it may have different variants. Actually, Rohit will show off two different ones that we use for this demo. You’ll want to serialize your model. And then we’re doing this in Databricks so we have the model registry kind of built in so that we can use and immediately mark whether it’s production or not, so that as the inference pipeline is running in production, it can immediately pick up the next version without any other changes. So this is what we will be demoing, walking thought today.
So just kind of starting off, and this is just said in more detail, the architecture of an image processing pipeline. So we’ve got the training and the inference. You’re often going to run your training much less frequently than the images that you’ll be inferring. And so we have that set up here using MLflow again. And then trade models can, in our version, can be deployed immediately to production. So we’re using MLflow. We’ve got a special class of model that bakes in some of the pipeline features that Rohit will show, and those can be dockerized at the end of the day and taken to wherever you need if you’re actual need for classification is not even in the cloud. So we’ll show you a little bit how to do that today.
We’ll also show you a switch between deep learning framework. So it’s often the case that when you bring in a new framework and you bring this in because you need just a bit more lift or a new technique has come in, you can run the danger of needing to change production and then therefore making it a larger change. With this technique that we’re showing you, we’ll show you that seamless switch from one framework to another, provided you follow the MLflow model practices we’re showing here.
So why is image processing difficult, we might ask. Just a few challenges here that I’ve seen. With limited on-prem GPU available, sometimes it’s super expensive to pull it in. It’s a commitment once you develop a solution, all of sudden to scale out you need to buy even more GPUs. And then GPUs get updated and then you’re kind of stuck with that same hardware. GPUs are sometimes hard to architect and manage securely so in your deployment processes, oftentimes you end up with large image repositories with just having the challenge of purely the number of image files. And so pulling those into this technique with Delta Lake, we’re going to show you how to kind of reduce that small file problem. And then it’s hard to create structured table using unstructured image data along side features. So oftentimes it gets overly complex to build and run with images because you’re having to assemble that from file. This technique, it’ll all be a nice, tidy table. A lot less confusion and potential bugs, really. And still be able to process and treat them as images when you need to.
So using Delta Lake to store images. Just a look at what we’re going to do. So I’m going to start of kind of on the data engineering side and get these images ready for processing. And to do that, I’ve mentioned a few times that we’ll be using Delta Lake. Delta Lake does a few things for us. So I mentioned the small files problem. If you’re dealing with lots and lots of tiny files, your cloud sources and your distributed sources kind of get overwhelmed, or even directory listings won’t work if you have too many in the same folder. Being able to store files. The binary for images in the Delta Lake is going to enable the fast inference queries. It’ll keep down the files. It immediately gives you stream processing capability with virtually the same code as your batch. Gives you ACID transactions so you can know that you have consistent data sets. And then you can even use configurable compressions so if you’re dealing with images, as we will today, that are already compressed, there’s no need to spend the extra time on read and write with taking care of that. So I’ll show these off in the demo as well.
And then exploration versus storage. With Delta Lake we support basically two kinds of formats. So if you’ve read through the smart docs, there’s a special image format that is nice because as you can see down here on the left, it shows the images. It generates thumbnails and processes. And that’s really nice to look at a folder full of images, or a folder with sub-folders full of images, and really get a sense of what’s going on. As it turns out this image format is not necessarily the best in what we’ve found for processing images at scale. And so we like to use the binary format. We’ll use the image format for exploration, and use the binary format to just keep it as a site of bytes. Keep the images a set of bytes and then alongside with some other features. That’s what we’ll go through today. The binary file, we’ll use it in various kind of pandas, functions directly, or in UDFs to work with the image.
And then further image processing. If you need to do stuff with the images, potentially with the features and the images, we’ve found that applying pandas is a very convenient way to get this done. So you can have the luxury of innovating with your image processing libraries, Pillow or CB2 or something like that, while still being able to work at scale. Using this applyInPandas function, which we’ll show you, avoids some of the technical syntax that itself can become a barrier to doing something beyond a scaler UDF function. And then, like I said, it allows you to use the APIs that you’re comfortable with.
And so the basics, which you’ll see during the demo shortly here, are first you just shape your schema to what you want it to be and use spark for that. And then use your pandas function. ApplyInPandas operates using a group dataset. So we’ll group it by sets of images that you want to have at the same time or maybe sets of rows that you’re going to tun into an image that you’re later going to turn into something else. Lots of different use cases there. But the short of it is, once you define your schema, you can call the applyInPandas, you give in the return schema, and then you’re able to join it efficiently. And doing this process is how we’ve achieved some of the excellent speed of performance verses some on-prem and some other techniques that folks use.
So with that, I’m going to jump into the demo and just show you what this looks like in Databricks.
All right so here is the Databricks workspace. And I’ve got our demo set up. I’m showing the workspace right now. I’ve got us in DAIS, and I’ve set up a few different notebooks here. I like to number them like this just to kind of keep them in order in my mind for whatever process I’m working on. But in this case I’ve pulled up the explorer notebook and I’ve got the pipeline at the top. The nice thing about these notebooks, of course, is you can make them really rich, add in documentation and all that while still being able to run them as production pipes directly from the notebooks. It’s some pretty cool functionality there. But without going into this, this is what we just covered on the slides, so let’s jump into the dataset event.
So for this notebook, I’m just going to be showing you some of the features of the image file type. And you can see we’re pulling in some image from pill as well as some of the spark stuff. So for today’s demo we’re going to use the flower dataset. And what this data set is, is it’s got photos of flowers and they’re in sub-folders for the different types of flowers. So we have daisies, dandelions, roses, sunflowers, and tulips. And just to show off some of the functionality, if I use that image type that we saw in the slide and just point it at a directory that, in this case I’m using Databricks or I have a cluster running, so the cluster has access to that folder, then what it’s going to show us is this nice visualization. It’s going to generate the thumbnails for us and just kind of give us the ability to really scan through and see what’s in there. You can always target that.
So in this first one we read in just a high level folder. The next level folder down is sunflowers, and so now it’s only just going to pick up sunflowers. And what you’ll notice here is that when I read in a folder with sub-folders, image is going to automatically assume those are a label and it will label them with the folder name. So then, just kind of pushing through this… there we go. We’re going to push this over into Delta Lake. So if I wanted to save that directly out so that other folks could do it, I could save that out as Delta. We’re just going to push it into as data set. DIAS 2021 flowers train. And then make sure that other people can process on that. So in this case, I’ve taken that image file. That set of images directly from disc, and stored it out into a Delta table so that other folks can explore and you can get these convenient into a thumbnail.
If I look at that we’ve got 36, 70 images in this one. So fairly small. You can kind of think about scrolling through and seeing the images and that’s fine, but when you get up to scale you probably don’t need all of that functionality. You’ll probably be doing that only on a subset. But using the Delta functionality to optimize, you can see that we’re going to take care of those files since they were loaded through the images method. Basically give you a chunk per image. And we’ve run optimize here to bring that down so that it comes into a more reasonable set of files for this much data, which is not a lot.
So as I’m going through this I’m going to use that image format to explore things and kind of get a sense, but then when I’m actually building my pipeline, let’s go through what that might look like. So if I switch over to notebook 01 ingest. I’ve got ETL images into the Delta table. So in this case I have the same set of functions. In my use case I’m just going to use the image library or the pill library to process images. We’re going to now read the same directory but this time we’re going to read them in as binary files. And so in this case we want to re-curse, go down and look for jpegs, which my images are in the source of. And then if I display that, you can see that we get a nice convenient data frame with the actual image file itself just loaded in as binary. And again what that’s going to do, as opposed to reading it in as a set of images, is it’s going to avoid doing all the processing that happens just so we can display it nicely. So for large streaming use cases this is probably what you’re going to want to do after you have explored the data.
So processing further, what we’ll do here is we’ll extract some features. So what I’d really like to do is get the class, what kind of flower this is. Look at the size. And maybe you’ll want to do some other things. So in this case we’ve just chosen a couple of features to extract. We’re going to use a simple UDF. So this is a scaler UDF to just pull out and give the stream back or give the object back with these two values. And so in this case when I process, I do images, which is what we loaded in before. Select. We’re going to take it the path that was there. We’re going to use those scale UDFs. And then we’re going to grab the content which is the image.
We do do a step here. So in this case we’re doing this in the data engineering sign so we’re going to change that label into an index. So you could do just so that we could work with the numbers for the classes. And then at the end of this process we’re going to still end up with that same number of images but now with a data frame that we can use efficiently by using that binary object type. Let’s pretend, in this case, that I have other image processing that I want to do. So in this case I’m going to do a simple convert to gray scale. You wouldn’t necessarily need to do that, but let’s say you wanted to pull out the red filter and do something, do edge analysis on the different color bands, or whatever you need to do here. I’m just going to simply use the gray scale to demonstrate the applyInPandas function.
So if gray scale was what I chose to do, then what I’ve done here is I’ve defined two functions. Get image bytes, which just takes and image and gets the bytes back so that it can save them in a column. And then a simple add gray scale image, which takes in a pandas data frame and then adds a couple columns. So we’re going to add gray scale image and gray scale format. Format’s just going to be png but we’ll apply a function to this to take the original image and turn it into gray scale. So if you look at this all we’re doing is doing plain old python and then using the pandas API. And then using my image library to… and IO, BytesIO, to convert between the binary string and something that’s readable in a file looking thing for image processing.
So once I have those defined, then I need to prepare my dataset to actually call it and augment my data frame. And so to that you are going to set up a return schema. This return schema is not actually going to so anything, so there’s no action statement in here. All I’m doing is using the select syntax and the width column to define the schema itself so that I don’t have to manually type it out and that it can be more dynamic with my processing. So this return schema needs to match what the return value… this might say output data frame but I was doing a one-to-one mapping so I just returned it as input data frame. But whatever this function returns, needs to match this return schema. And then the apache arrow integration with the applyInPandas is going to make that all line up for you. So then I want to reduce my data frame.
So if I have a data frame that might have lots and lots of columns in it but I don’t need all those columns for my function, the most efficient way to process that is going to be to limit your data frame just down to those elements that you need.So in this case I’m doing gray scale so I really only need content. But I’m going to group by label to send it across. So I’m going to do all daises together and all roses together. That’s just a choice, but you should have whatever you’re going to group by in there. And then you’ll need, in this case the path, you’ll need to rejoin that dataset back together. Because presumably you want to pull this in.
Now the reason I just don’t cheat and send the entire data frame over is if there are a lot of columns a lot of data, maybe other images in that row depending on your application, that’s just going to generate a lot of serialization. Data serialization that you don’t need. So doing this will keep it trimmed down and make the functions super fast. And then using the power of spark, both for the definition of the schema and the rejoin, this all happens really quick.
Okay, so I’ve got my function. I’ve got my return schema. I’ve got my cut down data frame. So now we just call it. So this is just going to be take that cut down data frame, group it by the label, and apply my function. And then you’ll notice that the two things that I provided are the function and then the schema. So what that’s going to do is that’s going to take that group at a time. It’ll send out a group per task and that will show up as a pandas data frame here without you really knowing it or having to annotate it or anything. So it’s pretty convenient. And you can see here that I’ve got my schemas called out and ultimately I’m generating this as a data frame.
So then just to finish that up a bit, oftentimes when you’re dealing with your full dataset you sometimes need to debug this function without waiting all the time and then figuring out how to extract it. So I will often have potentially a commented out testing section, so that’s what I’m doing here. I’ve just dropped down to 10 rows and brought it in as a pandas data frame and you can see my chopped down schema here. And then in this case I’m going to test the function directly. So I got my data frame, I’m going to call it directly. It’s just pandas so there’s no spark involved, I’m doing all this on the driver just to sanity check my function. And then at the end of the day I iterate and do a couple subplots to compare the images. So for me as a developer now I can see that my gray scale is coming across all right. I feel good with that mapping on the 10 images I looked at. So now I’m ready to loose the hounds and apply it across my whole dataset
So I’d normally comment that out, but then now we can go and say, hey, let’s run this thing. Let’s take an action. Let’s display the output df. So when I do that here, I get my gray scale image created and my gray scale format. Because I’ve tested it I’m confident that that’s what I need for later on in the pipelines and now I can go ahead and commit this to disc. I’m showing all this in a batch example, but all of this can be streamed simply with a change to write stream and change to read stream. So that’s all possible and the rest of the code stays the same.
One thing to call out here is if you’re using compressed images already, so jpg, png, others, you will probably want to save as uncompressed because that compression is already in your binary object. If most of the data that you’ll be storing is the images and they’re already in a compressed format, then it’s a good idea to save off your current compressions and set it uncompressed, and then output, right out to Delta Lake, just like we saw before. And then restore your compression in case there’s other things in your job.
And then just like we did on the original files, we can optimize our ride here given that we have pulled it in and processed it and it was tasked up nice. The optimize in this case didn’t do as much because it’s already being written to an optimal format.
So hopefully that will help you with some functions and features that you’ll be needing to do with imaging. It’s really freeing to be able to treat images as first class citizens and also use your familiar APIs. So with that I’m going to turn it over to my data scientist Rohit to take you through what somebody might do with this.

Rohit Gopal: All right. Thanks a lot Jason. So now you’ve seen how we can ETL and ingest image data into Delta Lake. We’ve seen how we can use the binary file format and optimize and avoid the small file problem. Now that you have a Delta Lake with images, the next question is how do you build models? So let’s say you want to build a deep learning model in one of the more popular, TensorFlow or PyTorch, libraries. We will take a look at that and see how we do that, not just in a single computer, not just in a single machine, but in a distributed manner.
So with Databricks one thing you can do is actually spin up clusters with multiple nodes. Basically multiple computers all tied together and each of it with multiple GPUs underneath it. So what this really allows is you can do distributed training on deep learning models so you’re not really waiting for one machine for a day to complete it’s training, rather you can do that in parallel at the same cost, much faster.
This allows, also, for you to work your single node, deep learning model into a distributed training framework. So you can take most of what you have, put it into a framework that I’m going to show you, and you’ll be able to have that framing run in parallel. And Databricks cluster offer scale, so you really only pay for what you’re using. Now that you can see how Databricks allows you to create clusters that support distributed training of deep learning models, the next thing is how do you do it? So we do have frameworks that support distributed training. So a couple of the open source ones that are supported here are HorovodRunner where you can do distributed training of TensorFlow, Keras, and PyTorch models. We also support spark TensorFlow distributor in case your focus is really just on the TensorFlow side of things.
And the last piece here is the fact that you can load your data using Petastorm. So let’s see, you have your dataset as Jason showed you in, let’s say, a data table. You can use that spark data frame and then load it either into TensorFlow dataset or PyTorch data loader format so that you can feed that data into your deep learning framework without having to worry about that conversion from spark or Parquet into your TensorFlow PyTorch dataset.
Petastorm also caches the data set in Apache Parquet format and then it loads it either into the TensorFlow dataset or into the torch data loader format. So again, really supporting the fact that you can feed your data table into these distributed training frameworks.
One practical tip, and I’ll cover this in the demo shortly, but we’ve seen that while loading the data using Petastorm, specifically the function called make spark converter, where you feed in a particular spark data frame and you create a Petastorm object that you’ll then consume. In that process you want to set a low value for this product parameter Parquet group sized bytes. In our project we realized that the default of 32 megabytes can be too high. Using one megabyte as we see in the example below is definitely a better option so that your cluster does not run out of memory. So that’s a nice step we learned
And here you see in the screenshot below how we use HorovodRunner that I covered in the previous slide to do distributed training over two worker nodes.
Once we do distributed training and you have a deep learning model that is ready to be used, the next question is how do you actually use it in an inference pipeline. As Jason alluded to earlier, we’ll see how MLflow actually supports that. And supports that in a way that if you are, as a data scientist, you build your models using whichever framework you like, which are all relying on parameters, frameworks, whatever that is, MLflow will allow that particular model to be used in a generic format so that the inference pipeline is actually not going to be affected if the underlying model type changes as the data scientist trains and tries out different models.
So MLFlow’s model registry, and specifically when we track a particular model, TensorFlow, PyTorch, and many of the other ones, open source ones that are out there, apart from registering them in the original format and flavor, we also support what is called a bifunc model flavor. It’s a generic python function based flavor which then allows the inference pipeline to be completely isolated from the training pipeline and whatever model is used. So as a data scientist I can switch from TensorFlow to PyTorch and nothing would change in the inference pipeline. And I’ll show you that in just a second.
The other thing to understand here is when we run inference using MLFlow, and Databricks, and Delta, you can do distributed inference so that multiple nodes of your cluster is running the inference, it’s applying the model in parallel. And you can do that either and the batch mode or in a real time streaming mode, and we’ll actually show both of those in a second. And as Jason mentioned you can also deploy these models to an edge device. So let’s say you just want to train it using a set of GPUs within Databricks, but your end use case is elsewhere, you can use MLFlow to do even do that type of deployment of the model to edge devices.
With that I’m going to switch over to the demo where I’ll walk you through both the training pipeline as well as the inference pipeline.
So here we are now looking at the Databrick UI. And before jumping into the training and the inference pipelines I just wanted to quickly show you what a cluster that we’re using would look like. So as you can see here, I’m just going to walk you through the key pieces that matters. I’ve selected the 8.2 ML so that’s basically an optimized run time that Databricks provides. It has a lot of the machine learning packages pre-installed. And you can see here it has GPU support so that then allows you to pick GPU instances. As you see here in the driver and the workers, I have in this example the standard NC24, which has full GPUs and whatever the memory is. And we’ve set the auto-scaling and auto domination neighbor so it can sense its workload, it can increase the number of workers from two to a maximum of eight [inaudible].
So the key idea here being this is available on multiple clouds. So depending on the cloud that you’re in you can set your clusters with GPU very easily using just a couple of clicks within Databricks. And now that we have this cluster ready and running, I’m going to switch over to the very first pipeline, which is the TensorFlow training that I wanted to walk you all through.
So as you can see here we’ll take a look at some of the steps. The very first is how do you load the data into spark. And then once you have a spark data frame, you want to convert that either to TensorFlow dataset or some other format. And for that we’ll use Petastorm. We’ll use spark converter. Once we do that, we’ll then take a look at the example where we train a single node TensorFlow model. So that’s something you may be already familiar with. The kind of stuff you might be doing on a single work flow machine. Once we do that, we’ll then use that to then train that TensorFlow model in a distributed manner across this cluster we just showed you, across the multiple GPUs in the cluster.
So scrolling down here we’ll start off by setting where we want to track MLFlow experiments. So as you build and you train on different types of models and parameters and so on, you can track all of them automatically. You can see here I’ve also set a couple of other parameters for MLFlow. One is TensorFlow auto log and spark auto log. So what this will do is as you train experiments it’s automatically going to track what is it that you’re running, what are the parameters, and what are the metrics, validation metrics and so on. So it makes it really easy for you as a data scientist to then go back and say, okay, this is what I ran a couple of days ago. These are the parameters. It’ll also give you the version of the notebook so it makes it very easy to go back and reproduce experiments. So it gives you a good way to automatically track your experiments. We’ve set a few parameters here including shape, by size, and number of epics. So these are your standard TensorFlow parameters. We’ll see how that’s used.
So going to the first step we’re going to be looking at how we ingest the data that Jason has created and curated for us. As you see here, I’m just reading it in Delta format. The flower stream binary dataset. So since we are actually in the production pipeline here we want to use the binary format of the image data set. And I’m picking some columns that I want and we are actually re-partitioning it. We want to make sure that there are at least as many partitions as there are workers, if not more. Once we do that, once we’ve ingested the dataset that Jason created for us, the next step is then to make a spark converter Petastorm object.
So what this really does is for each for your training and validation set that we created, you can see here we used this particular function, make spark motor. We feed in the spark data frame and the particular parameter, Parquet reduced sized bytes that I was referring to early. You want reduce that down to about one megabyte. And once you do that you can then use this object within your framework to then get a TensorFlow dataset and build your model. So that’s how you use Petastorm to convert the spark data frame into something you TensorFlow or PyTorch can understand.
What Petastorm also does is it creates a cache. As you can see here, we’ve listed the cache on [BBFS]. And you just specify the particular folder and it’ll save that particular whatever is being gestured in a Parquet format so then it can easily be accessed pretty quick.
Once we do that the next step is to check how many records we have. This is just a sample. So I have 71 training records and 29 validation images. Again, one idea we wanted to also help users understand is for deep learning models you don’t necessarily need to have terabytes. If you have, that’s great. You can probably get great value out of it. But even if you have small datasets you can use pre-trained models as you can see here. So it’s basically transfer learning. In TensorFlow we have the binary B2 model with weights that learn from imagenet. We’ll just pull that model, we’ll take out it’s last layer and we’ll retrain it on the specific example that we have. So what that helps us do is it avoids the problem of having to gather terabytes worth of data in case you don’t have that and you can just use one of these models that have been trained on publicly available datasets and it’s been tried and tested. So that’s what we’re going to do here. We’ll ingest, we’ll pull on the mobile net B2 model from TensorFlow. We’ll provide the parameters of the input dataset which is 224 pixels by 224 by three color channels. And once we do that you can see the summary of the pre-trained model that we get from TensorFlow.
Once we have that model ready, the next step we’re going to do here is then add that last layer. So we’re adding one dense layer at the end, and we’ll make sure to train that with the specific classes and the specific data that we have so it’s more applicable to what we’re trying to do here. Once you build a model the next thing is obviously to combine it. So depending on the parameters you set, in this case we’re doing a stochastic gradient descent, providing momentum as well as what metrics we wanted to optimize on. And we’re obviously using categorical cross entropy as is quite common. Once we do that, you can again take a look at the summary here. It clearly shows that it’s a mobile net B2, followed by a pulling layer, followed by a dense layer and the number of parameters corresponding to it.
Once we have that ready, the next step is then to pre-process the images. So we have a lot of images that are stored either in a binary format or in some cases just a bot to the image. But in this case starting with the binary format that Jason provided us, the first step we want to do is we want to do a transformation so that it conforms to what imagenet expects. So we’ll do that here. As you can see, we are first of all using BytesIO object to decode the binary data and then reintroducing Python’s imaging library, pill library. And once we open it, we’ll then resize it and use Keras pre-processing to then convert it to a read. And once we have that, we will then feed that into the next layer.
So as you can see here in the next function, we’re transforming each route. We’re using a map. Again all of this is just regular Python code. We are using a map and applying the function that we just defined up here to each row. That will then do the transformation on the content column which is where the binary data was stored. Now once we have that, as you can see that we listed here, our recommendation is actually not to do this datically on a spark data frame because it can generate substantially large intermediate files. What would definitely be recommended is to then create a transform specification that Petastorm can then use. So as it loads the data from the spark data frame, it’ll do these transformations and then feed it into the TensorFlow or PyTorch framework. So it makes it pretty easy to do a transformation as it’s loading in the data.So that’s what we’re doing here. We’ll just provide this particular function. We’ll provide the fields that we’ve changed, as well find the set of fields, which is features and labels, in this case.
Once we do that, the next step here is to then create a custom wrapper. So this is the place where MLFlow will allow you to isolate your inference pipeline from the training pipeline. So even though I’m developing a TensorFlow model, what I ideally want to see, it’s a model that’s wrapped in a generic Python function flavor, so that the inference pipeline does not need to know what is going on underneath. Is it a TensorFlow, what are the parameters, and so on. So what we’re going to do is we create this custom class, this wrapper class, that will extend the MLFlow Python model flavor. And we’ll need to do one thing here which is to define a predict function, a predict method, rather, that will then do the transformation. So what we’ll assume is when we use this particular training model in the inference pipeline, we’ll just provide it the data in the raw format. And when we see the raw format it’ll either be the binary or the path to the file.
Now you’ll see in our inference pipeline, just to kind of mix it up, we show you an example of how you can use the path to a file and feed that into a predict function and then within the predict function defined below, it will take into account all the transformations that we need for the specific framework. So right now we’re using TensorFlow, so we’re going to make sure that for all the TensorFlow models that we’ve created it is able to use that as a tensor. So we make sure to do some of the pre-processing. For example mapping the class definitions from the text name to the numeric index. Making sure that the path to the file has the correct [DVFS]. It’s what we call a fuse mount. Using the fuse mount here and then creating the structure of what I want to run here through my function. Through my predict method. And once I have that then I’m going to iterate through my input that is being passed in. In this case the set of bots. I’m going to go through each bot, each file, I’m going to open that, do the transformations that is required, use TensoFlow’s reshape function to reshape it, and then feed that tensor into the model so that my TensorFlow model can do the prediction. So that’s what it’s doing here.
What all of this really is trying to do is basically make sure that we make sure the data scientist is doing the transformations that TensorFlow expects. So rather than having the inference pipeline do that for us. As you can see here, I’m returning two things. One is the prediction, as well as the probabilities corresponding to each class that we expect to predict on. Once I’ve defined this wrapper, I’m then going to use it in the training pipeline down here where I’ll wrap the model that I’ve trained with and then log that into MLFlow.
So let’s take a look at that. So this particular function is going to be running the training, but on a single node. We’ll start with that. That’s an easy way to then step up your development, then into a distributed framework. So always starting with a single node framework. So we’ll do that. We’ll first of all consume the converter train and converter validation objects that Petastorm provided. And we’ll make a TensorFlow dataset out of those that were provided with the specific byte size. And it applies the transformations that we just saw earlier.
Once we have those two pieces ready, the rest of it is really the regular TensorFlow code here. With dot mapping from the input to the features and label index. Making sure that we have the correct byte size, which really drives the steps for epic as well as the number of validation steps. And once we have that then as you’ve already seen earlier, you can do the TensorFlow doc fit and provide the train dataset as well as the validation dataset and these other parameters. Once we do that, it’ll train the model.
And now that we have a trained model, the next step, what we want to do is then log it within MLFlow. As you can see here I’m pulling in the run ID and experiment ID from MLFlow, and I’m also making sure that I wrap this particular model that I just created using the custom wrapper class above. So you can see here I’m using the run ID and experiment ID and I’m pointing to the model. And once I have this wrapped model, I’m then logging that as a generic Python function. And you can see here, I’m calling the Python model equal to wrapped model. Once I do that, I know have something that’s stored in a way that any inference pipeline that supports MLFlow can easily use without knowing what’s going on under the hood.
So now that we’ve run this as you can see here, we triggered the run with MLFlow. Let’s stop run and call this function. So it run’s through that. Again this is a simple node process and it’s just a simple example, it’s not something that’s the most accurate. We’re not trying to find the best architecture, that really depends on what you’re trying to do and what you’re use case is. But now that we’ve trained this on a single node, what we can see is I come here on to the experiments and it tracks all the different experiments I’ve been running and it tells me which one corresponds to what. I know this is the one that I used from this particular TensorFlow example so I’m going to open that up and it’ll give me everything corresponding to this particular model run.
So let’s take a look at that. You can see here within MLFlow tracking, it tells me which is a source notebook. It actually has the version of the notebook that was used, so if you want to reproduce your run, you can easily do that. It tells me all the parameters that we do so when we call MLFlow or TensorFlow, that upper log, it captures all these parameters, including the default ones. It has the metrics as well from the training that we called. And it has tags, some of which are pretty useful so you can actually go back and use a version of that delta table, using time travel and go back to an earlier version. So in this case it was version five. So even if your pipeline is running and that particular table is changing, you can say, hey, give me the version five, and you can recreate your experiment. So that’s pretty helpful that you have the could, you have the parameters, and you have the dat source that you can use to recreate it.
Coming down here in the artifacts that are stored we can take a look at some of these things. So it shows the model summary. As you can see the mobile net and the dense here that we added. It shows the original TensorFlow model that we created. So this is the one without any wrapper around it. As you can see here, by default we saved two flavors of it. One is Keras and the other one is a generic Python function. But much more interestingly, for us to be able to use inner inference pipeline, I closed this one and we can take a look at the version where we actually wrapped it with a custom class with the predict method. So you can see here it’s just a generic Python function flavor.
So you can see that we’ve saved everything that corresponds to our particular training run in TensorFlow, and switching back here… coming down here before we move over, I just want to quickly walk you thought the example where we actually train the same model but over a distributed cluster like what we have here. And doing it over all the worker nodes and the GPUs within it. So for that we use this particular framework Horovodrunner. And with Horovodrunner what we’ll do is it’ll actually make sure to distribute the entire computation. And to do so you need to make sure that you set up these callbacks that Horovodrunner supports, and once you enable the callbacks you can see that the rest of the functions are practically the same thing as we had earlier. And I’m really using most of my functions from earlier so you can see I’m not having to redefine my model or my framework. I’m just making sure I do the re-training, but this time in a distributed manner. So when I do the fit method on to the model, apart from the training dataset, validation dataset, I also make sure to prorate the callbacks.
Once you do that, as we saw previously, you just do a dot stop run and call Horovodrunner and you provide this parameter in B2. So in this case it assumes a cluster of two worker nodes each having it’s own GPUs. And once you do that, you just do a hr dot run and call this particular function that we just defined up here. Once you do that you can see that it does the distributed training of the particular model we saw.
Now all of this was an example where we had the raw data in binary format. But let’s say you actually have very large datasets, each of which with very large images, and you’d rather note store it in a binary format. What you can then do is actual create a Delta table that contains the path to those files rather than storing the binary data itself. And just linked a couple of blog and notebook that you can access later to see how that works with these frameworks.
Now that we’ve seen this, let’s kid of switch over to the particular model. What I’ve done here is I’ve already registered it into the MLFlow model registry. So in the MLFlow model registry we’re able to track what are the different versions of our model. So in this case you’ll create one logical model for our project. So I’ve called it data DI summit image classification. And I have this version one which is a TensorFlow version that is currently in production. So you can apply [governance] so only an admin can actually move it into production staging, but as a data scientist I could make a requests to either move it into prod, or staging, and so on. All of this matters because you can then apply the governance so if data scientist not doing a science experiment and then deploying it into production and breaking things.
We’ll see in just a second how this matters for the inference pipeline. So let me switch over to the inference pipeline here and show you what this looks like. So on the inference side, I’m going to read a particular table. In this case I’m reading something that is still in the image format but in reality that doesn’t matter because if you un-click this image preview, you’ll see that it’s really storing an origin object which has the parts to the file, and that’s what we’re really feeding into the predict method. So at the end of the day if you have the location to the files you can easily call this particular TensorFlow model without having to worry about which format and decoding the data and so on.
In this example, because it’s a dummy example, I just have the training dataset that I’m re-using which is why you see the label. But obviously in a real pipeline you wouldn’t see that. So we’ll ignore this label but we’ll still use the path to the file and score the images. So this is the inference pipeline where I want to use the model that I’ve saved in the MLFlow model registry. The way you use a particular model is just called models colon slash and the model names. So that’s the one that we just saw here. Data DI summit image classification, that’s what I call it. And you just say, hey, just give me the production version, right? I don’t care if it’s version one, version ten, it doesn’t matter how many iterations, just give me the latest production version.
Right now you can see, I have this one has the production variant. Once I do that, I then load that as a spark udf. So you can see here, the way you call it is using MLFlow dot Python function, the generic Python function. And load it as a spark udf that I can then use in a spark data frame. You just give the file path as well. Once you do that the last piece here is just using your test dataset and calling this particular spark udf down here. And I save that in my predictions column. To rationalize that, you can then just do a display on it and you see here, I now have the predictions from my TensorFlow models that are saved here and it gives me the two values that I need. The prediction as well as the area that contains the probabilities for each of the clauses.
Now that we’ve seen an example of how you you create a model using TensorFlow, train it in a distributed manor, and run it through the inference pipeline, let’s take a quick look at a PyTorch example. It really covers most of what we did in the TensorFlow as well. So you log the data, use Petastorm to create a PyTorch on data loader. Train it in a single node example, and then use that to then do a distributed training. So in the interest of time we’re going to quickly go through this and not go into the details here. But we’ll do the same steps. Load the data, feed it into Petastorm. We will use the mobile net B2 example. Add a last layer to it. We’ll define the train and evaluate functions. And after that we’ll then do pre-processing of the images like was in the example earlier. Again feed that into a transform spec for Petastorm. And once you do that then you cans see we also define a custom bifunc MLFlow class, a wrapper class, that you can make sure it’ll do all the transformations that are Python specific without having to worry what these transformations are in the inference pipeline.
So again this is super critical so make sure that your inference pipeline is isolated from the training framework. Once you have that you then just start off with the single note training. Most of this is your PyTorch codes, I’m not going to go through it. It really depends on what framework you use. But the key pieces are you can then grab that trained model and log it into MLFlow. You can see here I’m also transitioning that into staging. Once we do that you’ll also see there’s an example below of using Horovodrunner. So the same framework to do a distributed training of PyTorch over the GPUs on the worker nodes of this particular cluster.
Now that we have the second PyTorch example, let’s take a quick look at what that means for us in the model registry. So I manually clicked and transitioned this into production, so now that I’ve switched over to TensorFlow, which was the one that was in production earlier, I’ve switched over to B2 which is the PyTorch into production. And once I do that and go pack to my inference pipeline the inferring logic doesn’t change at all. So it’s just going to pick up the latest version from production and it’ll apply the model to the test dataset. So it’s pretty nice to see that transition from TensorFlow to PyTorch does not impact your inference pipeline at all. You’ll also this streaming example which really looks very much the same as the other example with just wheat stream and white stream.
With that, I’ll pass it back to Jason for wrapping our talk up.

Jasen Robey: All right. Thank Rohit. And I’ll just take us home here. Just going over a couple limitations. So we’ve seen today using Delta Lake directly for the image binaries or pulling up through file system. For the binary system if you have very large images than you’re going to blow past the Delta Lake default block limits which are up to about 2 gigs. So we recommend using this technique if your images are 512 mg or less. That’s a pretty big image so if you have something bigger than that, maybe it makes sense to chunk it up and be able to use the binary formats or just switch to the file format as you saw in the model example.
So in summary, you can use Delta Lake. It’s a very good thing to use for storing images. It helps on the small files problem as well as gives you faster inference queries, particularly for images 512 mg or less. MLFlow allows us to separate along with the custom MLFlow class, to separate the inference pipeline so that you can bring in as a data scientist whatever model or framework works best. And then if you’re running this on Databricks, we can really efficiently support distributed training and inference over hardware that matches in whatever cloud that you need, giving you a consistent pipeline and a consistent run wherever your data is.
So I hope you have enjoyed the presentation, it’ll be helpful for you. And we look forward to your feedback on the session.

Jason Robey

Jason Robey is a Senior Solutions Architect at Databricks leading the Enterprise Platform SME group at Databricks. Jason has been leading and supporting technology teams for over 20 years in software...
Read more

Rohit Gopal

Rohit Gopal is a Solutions Architect at Databricks, where he helps customers build data science, machine learning and data engineering applications. Previously, Rohit worked as a Data Scientist at IBM...
Read more