From Python to PySpark and Back Again – Unifying Single-host and Distributed Deep Learning with Maggy

Download Slides

Distributed deep learning offers many benefits – faster training of models using more GPUs, parallelizing hyperparameter tuning over many GPUs, and parallelizing ablation studies to help understand the behaviour and performance of deep neural networks. With Spark 3.0, GPUs are coming to executors in Spark, and distributed deep learning using PySpark is now possible. However, PySpark presents challenges for iterative model development – starting on development machines (laptops) and then re-writing them to run on cluster-based environments.

In this talk, we will introduce an open-source framework, Maggy, that enables write-once training functions that can be reused in single-host Python programs and cluster-scale PySpark programs. Training functions written with Maggy look like best-practice TensorFlow programs where we factor out dependencies using popular programming idioms (such as functions to generate models and data batches). In a single Jupyter notebook, developers can mix vanilla Python code to develop and test models on their laptop with PySpark-specific cells that can be run when a cluster is available using a PySpark kernel, such as Sparkmagic. This way, iterative development of deep learning models now becomes possible, moving from the laptop to the cluster and back again, with DRY code in the training function – as all phases reuse the same training code.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– So in this talk, we want to present to you how to conduct machine running model development. And in particular, when you’re doing, for example, exploratory model development on a single host Python environment, and then you want to scale it out on all of your data on a distributed environment without actually having to rewrite your code to move between these steps.

From Python to PySpark and Back Again Unifying Single-host and Distributed Machine Learning with Maggy

So if we take a look at model development

MO Model Development

and the steps involved, in this talk, we’re not gonna cover feature engineering and feature pipelines and model serving, but we can roughly group it into four steps.

And these four steps usually involve some kind of exploration of the data. You build a first draft of your model to find a classifier that you wanna use. And once you’re happy with that, you would go into an experimentation phase where you evaluate a lot of different trials on multiple machines until you found a good hyper parameter configuration for your model. And then you would probably do some validation, drop some components of your model to see how much value it actually adds to the final model. And then once you’re happy with it, you would scale this and train on a lot of resources on a distributed environment. But the problem is to move between these steps, you actually have to rewrite your code. And in particular, when different distribution contexts are involved or different setups, you might do the experimentation on a single host environment, but the experimentation on a multi host environment, and that leads to a lot of artifacts and non dry code. So you will repeat the code. You will add depending on which machine on framework you’re using you will add conflict files, which is really hard to track. And the problem gets even worse when you start to iterate on your process and go back and repeat a previous step, basically the amounts of artifacts will double. So the open source community has developed a lot of nice tools to tackle this problem, like MLflow to track your experiments, or TensorBoard to track the results of your training processes.

What It’s Really Like

But in fact they don’t really tackle the problem at the root of the cause. And so what it’s really like, it’s a pain.

And Jim is gonna talk about the root cause of this problem. – Okay so this is a software engineering problem primarily. And the issue at play here is that when you develop some code to begin with, you might have a small subset of the data.

Root Cause Iterative Development of MU Models

You’ll have it on your laptop. You’ll write a Notebook or a small Python program. And if you’re running on a notebook, you might use a Jupyter panel, and then is when you decide okay, I’m going to move into experimentation phase. I’m gonna work on a larger amount of data, have a model design. This is if we look at the diagram that Moritz presented, we can see there’s this assumption that you’re moving forward, that you’re always going forward. We’re going to try out different hyper parameter combinations. Then we’ll you know do the ablation study to understand the effect of the different components in my model and on the final accuracy of the model. And then I’m gonna train on the huge amount of data in distributed model training. And then I’m gonna be done, but it’s never like that in practice. We always have the need to go back. We may decide that we need to add a new feature because the final accuracy is not good enough. We may decide we need to drop features. We just may decide we need to add new regularization components. And this continual process of iteration of development, basically means you need to move through the four different code bases that Moritz presented. Now not everyone will have all those four code bases. You may skip ablation studies. You may just have typically hyper parameter tuning. You will typically also work on the small subset of data and if you have a larger problem, you might have distributed training. In this case we mean data parallel synchronous stochastic gradient descent. But even if you don’t have all of the phases, models are getting bigger, datasets are getting bigger and distributed. Deep learning in particular is becoming a very important technology with which to develop models. So we know what the problem is and we know that there’s a software engineering issue which is related to how do we move between these different phases.

Iterative Development Is a Pain, We Need DRY Code Each step requires different implementations of the training code

Now currently these different phases and we have the first one Mark Dennis EDA or exploratory data analysis. You’ll have some Python code in your notebook for example. And maybe you don’t factor your code particularly well, and you have a training loop and you have the model creation that the dataset generation all scattered around your code and you go to hyper parameter tuning and you say I’m gonna rewrite this. And we have a different shape that are representing that really it’s gonna be different code base. And the same goes for parallel ablation studies if you decide to include them and for distributed training. So if you’re going to rewrite your code, and we don’t want you to do that, what are we going to do? So we thought about this quite deeply and we came up with a new abstraction, that we call an oblivious training function.

The Oblivious Training Function

And what an Oblivious Training Function is a piece of core training code that we would like to be unchanged between all of these four phases. So this core code is the core code that you’re going to use to create your model, to get your dataset your samples, to compile and train that model. And we want that to be unchanged across all of the phases. And the benefits of this obviously will enable (mumbles) of application development. So moving from distributed training back to your laptop to add a new feature should not be a big problem. So the oblivious training function comes from this idea of transparency, that it should be transparent to the application when you’re running it as to whatever phase you’re running this code, and this oblivious training function, it shouldn’t care about whether it’s running on your laptop or whether it’s running on a cluster. It should be transparent to that code or oblivious. Now the challenges we have in doing this, because if this was an easy thing to do, somebody would have done it already, is that for every different machine learning framework, they introduce different concepts and libraries with which to go through these different phases.

Challenge: Obtrusive Framework Artifacts Example: TensorFlow

So how do you do distribution? How do you parallelize experiments? How do you do for example collective over use if you’re doing a distributed deep learning. I can give the example of TensorFlow because the work we’ve done today has been with TensorFlow and Keras. And in the TensorFlow, if you’re going to do distributed deep learning. So parallel deep learning, you have to set up an environment variable called TF_CONFIG with the IP addresses of all the workers. We have to decide on the distribution strategy. If it’s (mumbles), for example, that would be one distribution strategy. When we shared the data set for training, how do we share that amongst the workers? Do we have a distributed file system available? And then if we’re working with Keras or the estimator framework intensified, or if we have custom training loops, how do we integrate all this? So what the framework needs to do is abstract out these different components that you have in your existing machine learning frameworks, so that the core oblivious training function remains unchanged between the different phases. – So if we take a bigger picture and look at where deep learning is actually headed and how we can make use of this oblivious training function to keep our codes transparent, we can look at recent adoption of machine learning frameworks, like Keras and PyTorch, and why they’re actually so popular among data scientists.

Productive High-Level APIs Or why data scientists love Keras and PyTorch

And the main reason of that is that they’re very productive and they’re high level. And you can also see that by TensorFlow using Keras as their main API now in TensorFlow too, and these high level productive APIs, they help us really to go from an idea implementing it quickly and coming to a point where we can start experimenting. And from that point, it becomes a bit more blurry. So you need to take into account the infrastructure that you’re going to run these experiments on. For example, we use Hopsworks in open source platform that we’re developing. You can use Spark on data breaks. You can use pure Apache Spark, or every Cloud provider has some kind of product to tackle this. And then once we get the results of our experiments, then there’s actually the point where we should use tools like MLflow and TensorBoard to visualize our results track the results, and basically iterate faster on our ideas and be more productive in our iterative development.

How do we keep our high-level APIS

So the question is, how do we keep this high level APIs and how do we keep them transparent and productive? As Jim has already said, what is actually transparent code?

What Is Transparent Code?

With transparent code we mean that when you develop the model in just pure Python, standard Python, we wanna scale it out on Apache Spark or any other execution engines without having to do any changes to the core code of this model.

So to do this, we need to realize that there are a few building blocks that we need. And the first one is that we need to see what is actually the distribution context that we are in. The single host environment explains it pretty much itself. It’s our single host Python environment that we know. Then when you move to experimentation, such as type of parameter tuning or ablation studies, it’s actually a parallel of multi host environment where we train a lot of models in parallel, but they are independent of each other. So there’s no communication between those models needed. And then when we move to distributed training, it’s actually distributed multi host where we need some kind of coordination or communication between the workers. And we typically use a ring or reduce algorithm to achieve this communication.

Model Development Best Practices

And the second part to realize the oblivious training functions are a few core model development best practices, and we’ve seen the communities already starting to adopt this and with that we mean you should encapsulate your model generation code in a class or in a Python function, to return the model such that you can actually parameterize this function and change the model according to for example hyper parameters. And the same for the data set, you wanna encapsulate it in a class or function such that you can later on, just simply switch it out with other parts. And then last but not least you need some kind of training logic, to describe the core training loop, or let’s say this to cast a gradient descent optimization loop. And then we have one other tool available, which is the usage of callbacks, that allows us to hook into the training loop at runtime to intercept it, or get more information out of it when we need it.

Oblivious Training Function as an Abstraction Let the system handle the complexities

If we have these two concepts at hand, then we can actually let the system take care of a lot of the complexities involved with distribution, and make our code transparent. And with that I mean in the single host environment that is for example, just fixing the hyper parameters and launching the function, in a parameter tuning, it’s just launching the trials or the functions with different parameterized instantiations generating new trials from old results and collecting and logging the results. And in distributed training, it’s things like setting up the TF_CONFIG, wrapping the model and a distribution strategy and launching the workers in their different roles for the distributed training.


So last year at Spark Summit, we introduced Maggy as a framework for hyper parameter tuning on Spark. And today we wanna make Maggy a bit more general, and allow for these oblivious training functions to be executed in different distribution contexts, to make our model development transparent.

Hopsworks – Award Winning Pam

And we do so with Hopsworks, we won a few prices with Hopworks it’s all open source So you can go and get started yourself with Maggy and Hopsworks.

Recap: Maggy – Asynchronous Trials on Spark Spark is bulk-synchronous

Just to recap quickly, Maggy allows us to schedule machine learning trials on Spark asynchronously. That means we circumvent the synchronization barriers at the end of stages in the Spark like synchronous execution model. And thereby we don’t waste as much compute and we do so by blocking the executors with very long running tasks and allowing for some communication between the tasks and the driver.

Recap: The Solution

And thereby we are able to run multiple machine learning trials in the same task. And therefore we have a synchronism saving resources. And today what we’re introducing is that also not only these tasks are asking for new trials when they finish training a previous one, but they can also register with information with the driver, such that the driver can send information and set up the distribution context before launching the training function on these workers or on these executors.

And so if we look at the API, really what we want to have is we have this training function and this training function is just the pure Python training function without any further modification. And we just launched, And the training function is gonna be launched through a Python library, through Maggy in a different context, without doing any further modifications. By doing so you can actually move between single host and multi host really easily, because you can always just take the Python function and execute it in a Python kernel, as you would normally do.

And the demo is going to demonstrate the minimal code changes you need and the best practices you need to follow to achieve this level of transparency. – [Jim] So now I’m going to demo the oblivious training function, show you how all of this looks like in terms of extra code. To do so I started a Hopsworks cluster. If you wanna try it out yourself, you can create your own instance on I created a project for this demo and we’ll use mainly the Jupyter service and the experiment service in this case. I already started a notebook server with a Spark session in it, with up to six dynamic executors. So we have a parallelism of six. Usually when you wanna do model development, you start out like I said, on a simple Python kernel. So I have a notebook and it’s a Python kernel. And I just took an example from the Keras website, for an MNIST model, and luckily it was already refactored to be according to our best practices that we need in order to be able to run this as an oblivious training function. So we have a model of function containing our model logic. We’re turning the Keras model, a dataset function to return the TF datasets. And then you can do your model debugging or model development by simply getting the model. You can look at the summary at the training logic. So compile the model, get the datasets and start training. This is nice in a Python (mumbles), because you get immediate feedback on the progress and on possible errors. So however we wanna take this model now and move to hyper parameter optimization once you think it’s ready for that.

So if we go here, and we have our PySpark kernel in this time, what changes do we need to make in order to make use of hyper parameter optimization and disfunction? So we need some hyper parameters. I will use the kernel size in this case and the pooling size, and also maybe the dropout rate. And in order to be able to parameterize this model, we also need to add these hyper parameters to our function signature.

In this case the dataset stays the same, so we can just re-use the same function. And now we come to the model logic. All we have to do for this one is to wrap it inside another Python function, so that we can then ship this Python function to the executors to be executed with different hyper parameter combinations. So again we need to add the kernel, the pool and the dropout to make it parameterizable. And in order to be able to do ablation later on, we also need to make the model function, and also the data set function actually to be pluggable. So the same thing here, we wanna be able to replace the model automatically with Maggy, So you need to exchange this model function placeholder basically. And again you need to pass through the hyper parameters, so this works and the same for the data set.

So now we have this fully pluggable and parameterizable oblivious training function. The only thing that’s left to do is since we want to optimize something, we simply return the metric that we want to optimize. In this case this Spark category like you see.

Additionally for hyper parameter optimization, we need a search space, so we define the hyper parameters, and the feasible intervals to draw the samples from. And then we can import the experiment modules, set a name for this experiment,

set the data set generators, set the model generator, set the context and pass in an optimizer, in this case we wanna use random search, but we are also working on by adding user optimization. And to search space, and then we can just log on the experiment, which will launch a training function with different combinations on the executors.

So while this is running, I’ll also explain the ablation study. And as you can see, we get nice feedback over the progress of the experiment inside the Jupyter Notebook. So no need to go to the Spark UI. And as you can see, it also features early stopping, which means we can stop poorly performing trials early, in order to save resources and try more combinations in the same time. If you’re interested in that, I recommend you watch my Sparks Summit talk from last year.

In ablation study we wanna leave out certain components of the model to see what their actual contribution is to that model. In this case I wanna leave out the convolutional layers one by one. And we’re making use of the feature of Keras here, which lets you give names to components of your Keras model. So these were defined here and that will be used by Maggy in order to drop these components from the model. And then when we start the ablation study, we need to fix the hyper parameters to some fixed value. You could fix them to a random value, but in this case, we wanna use the previously best type of parameter combination that was found by the optimization experiment. And then again, we switch the context to ablation passing an ablator in this case, leave one component out. So it will leave one component out at a time, the ablation study object and the parameters that we want to fix. And again we do a log on. So now we’ll do a quick feet fast forward, until both of these experiments have finished.

So our two experiments have finished as you can see, and we also got all the prints of Keras of the executors inside our notebook, which is nice sometimes to do debugging.

The same for the ablation experiment. And I already started to distribute the training and this case I also just set the context to distributed training. And I pass in a strategy that I want to use, since I’m using six executors, I want to do multi worker strategy. But also if for example, if you have multiple GPUs on one machine, you could do a single executor with multiple GPUs and mirrored strategy. And again I pass into hyper parameters in order to fix them for the training.

This is running on all six executors now. And for example, if you would have noticed that something is off with your model during ablation studies or during hyper parameter optimization, if you want it to go back to a single host environment is as easy as taking your oblivious training function, and just passing in the model function, the data set function and a fixed set of hyper parameters and launching it in a Python kernel manually again. So no more changes. So that’s how I wanted to show you that you can run the same function in all the three different contexts without modifying it. And now you could also get even more feedback from the model training, if you wanted to use TensorBoard for that we provide the possibility to use a normal TensorBoard callback. And the only thing you have to use is the TensorBoard call a module from Maggy in order to dynamically generate the lock gear for each trial within an experiment directory. And if you do that … so now I’m currently running the distributed training experiment. As you can see here our experiments step records all of these since it actually hops first launching the training function for you, we can collect a lot of metadata, and you don’t have to do explicit logging calls

to track your metadata. As we all know tracking metadata is hard, but this way we call it implicit provenance compared to, for example explicit provenance, if you would be using MLflow where you have to explicitly call MLflow to lock certain parameters. So this has finished already. And these were the two previous experiments one is the ablation local experiment, and the hyper parameter tuning experiment. And as you can see, it records all the trials that were being trained. You can look at logs that it produced, and the metric that it produced. And we even take a snapshot of the iPad the Notebook

and check out how your model looked like at that point in time when you’re trained to do this experiment.

And as I said you can use TensorBoard. And for example here, I ran an extra experiment earlier and I already launched the TensorBoard. We’re making use of the new age HParams plugin for TensorBoard. And again you can look at your single trials. In this case I only trained two apex for each trial. You can use this parallel coordinates view to check out which hyper parameters lead to a good accuracy for example, or you can just normally look at your scalar graphs in the scalar step for each trial. If you’re running a lot of trials, this certainly gets a bit messy, but you can just select the trials that you wanna look at for example in specific. So yes this concludes the demo. I hope it was helpful.

Let’s switch back to the presentation. Thank you. – So what’s next? Now that we have the oblivious training function, and the distribution context we wanna make use of another open source to Papermill by Netflix, which will allow us to actually also, even though the code is still in the notebook, we can schedule it in different contexts by simply parameterizing the entire notebook with the context. And then we can for example, simply take a notebook that’s there already and retrain it for example if we detected model drift earlier on, with another context without having to entirely rewrite our code again for distributed training.

To summarize, so what we’ve shown is that moving between the different distribution contexts actually requires code to be rewritten. But what we wanna do is use obtrusive framework artifacts. We want to factor them out, such that we really keep only the pure Python codes of the model definition. And then we’re able to do all the complexities with our framework outside of the scope of this Python code. And let the system handle the complexities for you and the distribution context. And by doing so we’re able to keep actually this high level productive APIs that we’ve been working so hard for, and which will hopefully lead to better models and faster iteration on our machine learning model development.

Thank you very much for your attention. Here are the resources where you can get started and how you can reach out to us on Twitter, or through the website. And I want to especially thank our team, Sinha Robin Alex and Kai, who’ve also been working on this.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Moritz Meister

Logical Clocks AB

Moritz Meister is a Software Engineer at Logical Clocks AB, the developers of Hopsworks. Moritz has a background in Econometrics and is holding MSc degrees in Computer Science from Politecnico di Milano and Universidad Politecnica de Madrid. He has previously worked as a Data Scientist on projects for Deutsche Telekom and Deutsche Lufthansa in Germany, helping them to productionize machine learning models to improve customer relationship management.

About Jim Dowling

Logical Clocks AB

Jim Dowling is CEO of Logical Clocks and an Associate Professor at KTH Royal Institute of Technology. He is lead architect of the open-source Hopsworks platform, a horizontally scalable data platform for machine learning that includes the industry’s first Feature Store.