To scale out deep learning training, a popular approach is to use Distributed Deep Learning Frameworks to parallelize processing and computation across multiple GPUs/CPUs. Distributed Deep Learning Frameworks work well when input training data elements are independent, allowing parallel processing to start immediately. However preprocessing and featurization steps, crucial to Deep Learning development, might involve complex business logic with computations across multiple data elements that the standard Distributed Frameworks cannot handle efficiently. These preprocessing and featurization steps are where Spark can shine, especially with the upcoming support in version 3.0 for binary data formats commonly found in Deep Learning applications. The first part of this talk will cover how Pandas UDFs together with Spark’s support for binary data and Tensorflow’s TFRecord formats can be used to efficiently speed up Deep Learning’s preprocessing and featurization steps. For the second part, the focus will be techniques to efficiently perform batch scoring on large data volume with Deep Learning models where real-time scoring methods do not suffice. Upcoming Spark 3.0’s new Pandas UDFs’ features helpful for Deep Learning inference will be covered.
– Hello everyone, my name is James Nguyen. I am a data and a cloud solution architect with Microsoft. I do customer success. So I work in data and machine learning and I have the opportunities to work with deep learning and big data engagements with my customers. Today I’d like to share with you some techniques to leverage Spark for scalable data preparation and inference in deep learning. And thank you for attending my session.
So the agenda of the talk, we’re gonna first talk about the topic of how we can scale data preparation and featurization in deep learning process. In the second part of the talk we’re gonna talk about how to do the batch inference using Spark.
So To deal with the problem of big data in deep learning, we have distributed deep learning frameworks. So the frameworks work very well when the input datas are independent so that the distributed processing can start right away. But there are scenarios where it may require complex logic processing involve multiple data elements across the data set. So in those scenarios the distributed learning framework may have a challenge. And normally you would need another process to perform the data preparation and featurization before you pass on the data to deep learning. And on the other hand when it comes to inference, we have a lot of support for online inference, but the batch inference is still a challenge.
In this talk we’re gonna discuss about leveraging different features, especially the new features of Spark 3.0 to address these gaps. So you guys can see in the diagram the steps in the (mumbles) namely data transformation, featurization, and inference, are the steps that can be offloaded to Spark for efficient and scalable processing.
Right, so let’s first talk about how Spark can be used to accelerate data preparation and featurization. So in a typical deep learning pipeline you’re gonna have a first step of performing data acquisition and initial transformation from different sources. Then, we’re gonna perform data preparation for the ML task, and then featurization before we pass on to the core deep learning treaining. So in the step one, people normally use some different kind of data query and extraction tools to query, and then draw data from different sources doing some initial transformation.
And then we normally export that to a flat file and then do some formal python, pandas, or that kind of languages to perform data preparation. And this step is normally done in a single computer environment, single node environment. And it’s not space scalable. And then we perform the featurization which is normally done as part of the deep learning training using the deep learning data API like data set APIs and Tensorflow.
So you see that each of these steps is done using different tools and some of the tools may not be scalable when it comes to a large volume of data. So with Spark we can offload these steps to Spark. We can do all these three steps using Spark. Including transmission and featurization. So it is a handy technology and it is very scalable. So Next, we’re gonna talk about a few Spark features that can be useful to offload the processing and inference, right? So the first feature I like to talk about is the Pandas UDFs.
Spark Pandas UDFs is a technology that allows the Python processing to be done in parallel across multiple nodes. So the input can be a Spark dataframe over the big data. And then Spark gonna have us split the Spark dataframe into multiple batches and then for each of the batch, it can be passed on to a Python unit in the form of the Pandas UDF in the form of a Pandas dataframe or a Pandas series. So from here, the data scientists can have the freedom of using a regular Pandas or Python or any other Python logic that we normally do in a singular environment to perform any kind of transformation or scoring logic. Built in the regular and familiar Python environment. And then we just need to output that into a format of the Pandas dataframe or a series depending on the type of the Pandas UDF that we used. Then Spark gonna consolidate the output and then output as a Spark dataframe, so the key the key feature here is that with the Spark UDF, the Spark Pandas UDF, Spark makes use of the Pyarrow library to make the data exchange between Spark data in the JVM into Python data very efficiently. And then the data is passed on to the UDF in the form of a Pandas series or Pandas dataframe.
So it’s very performant, so it’s very good for large-scale data transformation.
So let’s look at different types of the Pandas UDFs.
Since Spark 2.3 we have support for Pandas UDFs. So the first type of Pandas UDF is the scalar UDF. So scalar UDF helps us to work on individual columns the columns from the Spark dataframe can be passed on, can be split into batches and then passed on to the UDF. So we can tie the group on the column values. It can be one column or multiple columns. So this is very good for direct parallel column value computation.
The second type of UDF is grouped maps.
Spark’s gonna group by the values of a particular column and then pass on the group value to the UDF. So it follows a split and apply pattern. So the thing to be careful about is the data of the group-by is loaded into the memory of the executor. So we need to be aware about that to plan appropriately.
And the reason and the upcoming Spark 3.0 is gonna have support for a very important type of Pandas UDF which is the scalar iterator UDFs. So this is very good for deep learning because it minimizes the frequency of triggering the UDF function because instead of passing on individual Pandas objects to the UDF, it passes the iterator of Pandas objects.
When being used in deep learning, especially when you need to load or initialize and expansive process, this is very helpful for that, it helps a lot. So, look at the chart, this is the performance data published by databricks. We can see that the Pandas UDF have anywhere between 3x to 100x better performance compared to the one drill at a time, kind of regular Python UDF that we use in the older Spark version. It’s very useful, very good for performance.
All right, so the second features of Spark especially the new version 3.0 that is useful for deep learning is the support for binary data formats and the Tensorflow TFrecords formats.
So in deep learning, we don’t really work with structured data, but if it’s more common, then we need to be with unstructured data like video image, audio files. So Spark has in the new Spark version, it has the ability to read and then process binary files.
And then with the again with the support of the UDF, we can do the transformation of the binary files and then another thing, it also has support for the deep learning data format, like from TFrecords.
So the TFrecords format, it is very handy because the output from the Spark computation can directly be used by the deep learning training in a subsequent step.
This is the second important feature that we want to cover. Right, so to understand how the features of the Spark UDF can be useful in large scale data preparation, let’s take a look at a couple of examples. So the first example is a scenario where we train a time series classification models. So the scenario is in the telecom industry where we want to make use of the history of the customer interaction with the providers. So that we can predict the probability that a customer may churn at some point in the future. In order to perform this churning, we have to build the history of the customer, right? We have to take a look at the history of the customer, all the impressions we have. Of the customer with the company. For example, transactions they made, the type of the call they made, and any kind of event that happened during the customer lifetime with the company. And then not only that but we need to, so the technique for this is that we need to build a sliding windows. So you see here, let’s assume the sliding window is 14 days, so we need to move this window along the history of the customers with the company. And for each of these windows we need to compute pictures.
Any metrics we compute from the events that happened in these 14 day windows, plus the labels, which is the outcome of whether the customer churned or not.
So look at the volume of the data that a company in the telecom industry, normally they have 40 million customers, I mean millions and millions of customers, and then each customer, you can have a long history of several years of transactions with the company. And then with the history like that, we need to generate hundreds of windows of data so the output data can be a lot, right?
It can be a terabyte of data for the training. So if we do this, in a situation when we do this, on like a single machine or Python, it would take days just to build the training data for the deep learning process. So with the help of Spark, we can we can run this all in parallel and the entire entire data preparation just took a couple hours. With the last Spark cluster. So let’s take a look into the detail of how this can be done on the next slide.
The steps is like, first we need to read the input data from different sources to build the complete history for all the customers.
Here, Spark can be used, Spark SQL can be used to select and then do certain kinds of initial data transformation or data cleaning. And then for each customer, we need to build the history. All the transaction history for each customer. So here, we can use the dataframe. The group by, the “groupby” command gonna give you the history of each customer.
And then for the each customer history, we’re gonna be able to apply the Spark UDF, and ask the UDF function to generate starting with those data for the history data for each customer.
And then, we need to we can output the computation of the Pandas UDF to the format of Tensor dataflow, right? That’s the TFrecords format. So take a look at the detail of the Pandas UDF. You can see here the input to the UDF because of the boot drive we’ve done here, it’s gonna give the history of each individual customer, right? So when we have the individual history of a customer we’re gonna have the computation of the start time for that history and the duration and then we’re gonna have a follow to create multiple sliding windows from this historical data and then generate, compute features from each window, and then we append the output into the lists of Python and then output that. Output a complete output to the main Spark process. And then once we have the output computation like in the line 18 here, we’re gonna have the data frame written out in the format of TFrecords. Which is ready for deep learning, right? So you can see all the computation the featurization can happen like we do in a regular Python environment, the only difference is here it can be parallelized to run for millions of customers so depending on your size, you can run this into thousands and thousands of parallel stacks. So this can save a lot of effort and scale very well.
So another examples of how Spark UDF and the support for binary can be used to speed up deep learning is a speech recognition problem. So in speech recognition, we need to have hundred-thousands of clips of sample data, sound data available to label, and together with the sound data automation technique, like adding the noise to the data, the volume of data can be tremendous.
In this example we have Spark process to read the data in the binary data format, we see here, spark.read.format dot binary files, which is a new feature in Spark 3.0. So with this, we’re gonna be able to read the binary data and then we can apply any featurization technique to turn the binary data into features in the numerical format, right? And then the second UDF we can use, which is much simpler, you just need to extract the label from the path of the file, so here in the example you see there are two UDFs. The first UDF we have logic to read in the binary files and then the binary file we can apply the librosa library to extract the spectrograms and the features. And then these spectrograms can be output in the form of the Pandas series they can be used for link labeling process later on. A second UDF, we get the path of the files and then we just extract the name of the files to be used as the label for data and then in the last step, we write the output into the, again, the TFrecords for ML training. So you see here the ability of Spark to not only to process structured data but also to process binary data and handily export it into the deep learning data format for deep learning later on.
So the second part of the talk, I like to talk about how we can use Spark to scale to accelerate batch inference.
So for batch inference it comes from the ability from Spark to perform very well in distributed data processing. And then we can use the same kind of properties to scale up to scale up the scoring when it comes to you having a lot of data to score with the deep learning models. So here we’ll talk about two scenarios one is that the deep learning model can be hosted as an API externally to Spark, and we also have the scenario where we directly load the deep learning model and the score within Spark itself. And then we can collect the data at the result dataset and then pass on to the downstream. Into the downstream process. So let’s talk first about the scenario when we need to load data from within Spark.
So the challenge with with doing data loading and scoring within Spark is building models, normally very large in size and it’s not serializable, so broadcasting won’t work.
We need to either load the model from shared storage, like in the cloud shared storage. Or we can use a method of up index that file to cache the model file to the work machines. So if you take a look at the processing model here we have inputted a frame where we have input data and then we can perform any kind of initial transformation. And then here again we’re gonna make use of the Pandas UDF technique to efficiently score the data. So the key here, is there are two ways you can do it. You can do this with the regular Pandas scalar UDF or the second, which is the recommended method, is to use Pandas scalar iterator UDF.
So why the second method is recommended is because in deep learning if we can minimize the frequency of loading the ML models which is an expensive operation, then it’s gonna be more efficient, right?
so instead of passing the chosen Pandas series to the UDF in the second, in the Pandas scalar iterator UDF, we pass an iterator of the series to the UDF.
And then we do the model loading and then we do the scoring. And then we can output the data to the Spark environment.
Let’s take a look at the code examples of how we can do that.
So this example is to score the binary wave audio files with the model we trained in the special recognition process. So we, first step we just need to read the data from the binary files into our dataframe. And then the second step, we define a prediction function using the scalar iterator Pandas UDF. So in this UDF, you see here, the first part is like on line number nine, we do the model loading, right? So we load the model from the path can be loaded from a local path in the workers or it can be the path to shared storage in the cloud. So instead of having the direct access to the Pandas series, here we need to look to the batch of Pandas series, so we see in line number 10, we have the follow, because we can have the iterator, and we can have access to individual Pandas series. So within this we perform featurization. Extract the features, extract the spectrograms, and then we score the models, and then we yield, right? We yield the Pandas series return that back to the coding program. So with this we can minimize the frequency of doing the model loading. And it’s good for the performance.
Right, so another example of doing the batch inference. Is when we have the model hosted as the external API. So for this there’s no difference in terms of performance between the regular scalar UDF and the scalar iterator UDF. With this, we will just build the regular Pandas UDF because we don’t have to initialize the ML models within the UDF. The ML model is already hosted and is initialized by the external service.
So for this, the example. This is the example of doing
calling external APIs in a Pandas UDF. For here, we just need to worry about how we’re gonna make sure that the data that we send out for our scoring is in the format that can be sent over the network. So here we need to convert. So you see here in the Pandas UDF, we’re gonna pass on multiple columns which are the feature needed for the UDF and then we need to convert the data into JSON, right? We need to use the method code JSON.dumps in line number 10 to convert the Pandas dataframe into a JSON raw format. Then we can drop the data into a
HTTP request post-method. And then we get back the result, we need to convert back into the Pandas series in the return. We
convert data into the appropriate format at the different steps. With that I conclude my talk. So hopefully you find it useful. So basically, I have covered methods to use Spark to accelerate the training in terms of data preparation and featurization. And the second part was about
performing scalable batch inference with Spark. Thank you very much for your time, thank you very much for attending my session. And this is a list of references and don’t forget to rate and review the sessions. Your feedback is important to us.
James Nguyen is a Principal Cloud Solution Architect at Microsoft’s Azure Customer Success Organization. He has a master’s degree in Data Science from UC Berkeley California. He mainly focuses on Big Data and Machine Learning. James has delivered multiple successful large scale implementations in advanced analytics for Microsoft’s strategic customers. He is passionate about scaling Data Science with the power of Apache Spark.