Resource-Efficient Deep Learning Model Selection on Apache Spark

Download Slides

Deep neural networks (deep nets) are revolutionizing many machine learning (ML) applications. But there is a major bottleneck to broader adoption: the pain of model selection. This empirical process involves exploring the deep net architecture and hyper-parameters, often requiring hundreds of trials. Alas, most ML systems, including Spark, focus on training one model at a time, reducing throughput and raising costs; some also sacrifice reproducibility. We present Cerebro, a system to raise deep net model selection throughput at scale without raising resource costs and without sacrificing reproducibility or accuracy. Cerebro uses a novel parallel SGD execution strategy from our research which we call model hopper parallelism.* It is also general enough to work on top of Spark. This talk is about Cerebro and its integration into Spark.

First, we will review the state-of-art research in this active field and introduce Cerebro. We will go over the core intuitions, designs, and architectures of it. Then we will demonstrate with experiments that considering resource efficiency, including memory/storage usage, runtime, and communication costs, standalone Cerebro can outperform existing systems, including Horovod and task-parallelism. Finally, we will showcase the integration of Cerebro into Spark by using/extending the Spark APIs. We will describe the challenges and technical efforts required to achieve this goal. We will then show some experiments to exhibit that Cerebro on Spark can be a more resource-efficient choice for Spark users working on deep learning model selection. *Cerebro: Efficient and Reproducible Model Selection on Deep Learning Systems. Supun Nakandala, Yuhao Zhang, and Arun Kumar. ACM SIGMOD 2019 DEEM Workshop

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Hello, everyone. Welcome to this session. I’m one of the two speakers here, my name is Yuhao. The other speaker is Supun. He will cover the second half of this talk. We will talk about our research project called Cerebro. It’s a data system for resource efficient deep learning model selection. It can support multiple model selection algorithms as well as various backends. Spark is one of the backends we support. The software is open sourced and publicly available, so if you find it interesting, please do try it yourself. A little bit of self introduction.

About us

We are PHD students from University of California, San Diego and advised by Professor Arun Kumar. Our research mission is to democratize data science, making it dramatically easier, faster, and cheaper to build and deploy ML and AI applications. You can find more about us on our websites.

Now, let’s get started. Artificial neural networks, or better known as deep learning, are revolutionizing many, many domains, such as machine translation, autonomous driving, or even critical domains like health and medicine.

So probably, you are already considering using deep learning in you next project. But there’s a problem.

Problem: training deep nets is Painful!

Training a deep net is not trivial. Deep learning models are highly non-linear and still largely remain a black box to the user. As a result, your model performance depends on several factors such as model architectures and hyperparameters in a very complex rate. Model architectures such as CNN, RNN, is how you define your model. Hyperparameters like learning rate, batch size, are knobs that can affect the learning process and the final outcome in a very profound way. Together, these are the model configurations you have to decide before actually training anything and it’s difficult to know which combination of these parameters is the best beforehand. As a result, you often need to run model selection and have a parameter tuning on the trial and error phases. For each model config, you will need to train one model. Later, you can pick the best model. And the number of models you need to train quickly explodes because you have so many choices. That’s one pain of deep learning. You don’t just train one model, usually have to train dozens if not hundreds of models. In this example, we have model architectures, batch size, learning rates, as well as regularizations to train. Although we have only four choices, each of them will still end up with over 200 models to train. And the training could be very slow. It can take days to just train one model, let alone 200. We desperately need speed. We need to increase the throughput of model selection. And the solution to it, quite unfortunately and not surprisingly, is money. Distributed deep learning. You purchase more machines, you scale up your cluster so you can have more computational power. In the meantime, you will need to better utilize your machines. Otherwise, they sit idle and your money is wasted. This is where our system, in coming to help, we can help you best utilize your machines and ultimately save your money. Here is the agenda.

I will first cover some backgrounds and the existing solutions to the problem, then I will present you with our system and our core technique, MOP, it stands for Model Hopper Parallelism. Then Supun will take over and talk about the implementation of MOP on Spark. Before we move on, first thing you need to know is how we actually train a model on just one machine. The most popular choice of training algorithm is mini-batch SGD.

Introduction – mini-batch SGD

It works as follows. Here you have your dataset, nine rows, three columns.

What the algorithm does, is it first batches some rows of your data and updates your model based on the gradients calculated from this mini-batch.

Then it takes another mini-batch, it updates again, and again.

In the end, your model has visited all data, and this is called one epoch. The whole training process usually takes multiple epochs before your model finally converges. This is the core data access pattern of mini-batch SGD. This process, as you can see, is inherently sequential. That’s one of the reasons why running this in a parallel and distributed manner is so interesting.

Now you know locally how model is trained. Let’s get back to our problem, which is training multiple models with mini-batch SGD on the same dataset, on multiple machines.

Task Parallelism – Problem Setting

Well, take a second and think about how would you actually do it. Well, the most naive way to me is to copy your data around to each machine and you have machines with replicated datasets.

(Embarrassing) Task Parallelism

Later, I just send one model to one machine and let the machines train locally in isolation. And I get my trained models in the end. This is of course embarrassingly easy and embarrassingly parallel. Oh, it’s great, huh? But there’s a problem. What if your dataset is so large that it doesn’t fit in a single node’s memory, or even the storage? And remember, the dataset is replicated and that’s wastage, which is not good.

Well, you can always use a shared file system to bypass this issue. But instead of storage, you’re wasting network this time because your workers need to do remote race now. This solves one problem, but introduces another, which is still not good. All right, the question is, can we do something to mitigate this issue?

This leads us to another branch of the research, data parallelism. In this scheme, you have models, but this time you have partitioned data.

Data Parallelism – Problem Setting

Each worker has only a part of the dataset, so you cannot fully train a model on a single node anymore. Here is how data parallelism works.

In this scheme, you train one model at a time. You first send a model to your workers and the workers train the model with their local data.

And the master node collects, updates, and notifies the global model accordingly. This process then repeats. There are several flavors of it. If you collect updates only per epoch, then it’s called model averaging. Unfortunately, model averaging can have serious convergence issues as it’s not equivalent to synchronous SGD anymore. Or, you could update per mini-batch. Which is equivalent to synchronous SGD. This is known as synchronous parameter server. You can make it asynchronous, you can make it decentralized. But all of these approaches suffer from high communication cost. Because communications take place every mini-batch. And there could be tens of thousands of mini-batches per epoch, which is not good either. To recap, so far we have seen task parallelism, it has high throughput, but low data scalability and has wastage. On the other hand, we have data parallelism, which is very scalable, but has low throughput because of the high communication cost. A natural question is, can we combine the advantages of both, but somehow leave the disadvantages behind? The answer is yes. The result is model hopper parallelism and Cerebro, our software system that implements it. It has high throughput, high data scalability, low communication cost, low storage wastage, everything you want in one box. The secret to it is model hopper parallelism, or MOP in short. It’s a new type of parallelism that combines the benefits of task and data parallelism. The problem setting is identical to data parallelism, we have models and partitioned data.

Model Hopper Parallelism Problem Setting

MOP works as follows.

Model Hopper Parallelism

First step, we allocate one model to one worker, just like task parallelism. And we train the models on the full local partitions, just like model averaging.

This is what we call one sub-epoch. Next is the key to everything and the reason why this is called model hopper. We hop the models around, and resume training on the next workers. Models get updated and we hop again.

At the end of the day, you can see each model has visited all data in sequential manner, so that they can converge fast. This whole process is equivalent to one epoch. Besides, we only communicate per sub-epoch, instead of per mini-batch, so that we can keep communication cost low. Data is partitioned and there’s no wastage. Basically everything we have asked before. Here is our system, Cerebro, which implements MOP.

Cerebro — Data System with MOP

It has this narrow waist architecture,

that supports various Model Search and AutoML procedures like PBT and HyperBand, as well as multiple deep learning systems and execution backends. For deep learning, we support TensorFlow, PyTorch. For backends, we have Spark, Greenplum Database, and also a standalone version of Cerebro. In this talk, we will focus on TensorFlow with Spark. Now, please welcome my co-presenter, Supun. He will talk about the implementation and details of Cerebro and MOP on Spark. Thank you. – Okay, so let’s look into some of implementation details of Cerebro on top of Apache Spark. We have implemented Cerebro Scheduler and Cerebro Workers as a Spark job.

Cerebro Scheduler runs inside the Spark Driver

MOP (Cerebro) on Spark

and Cerebro Workers run in Spark workers. As the underlying deep learning framework, we use TensorFlow. Currently, for the data storage layer, we suckle two different flavors, HDFS and NFS. Cerebro takes in input data in the form of Spark DataFrames and coverts them to partitioned Parquet files and locally cached into new partitions inside workers.

Implementation Details

To achieve this, underneath, we use the Petastorm library.

Using TensorFlow, we train models only on the locally available partition of the data.

And in order to achieve model hopping, we use a shared file system, which is either SGFS or NFS.

Now, let’s look into some details about Cerebro’s APIs.

Example: Grid Search on Model Selection # Hyperparameter

As a running example, I used the popular ImageNet dataset and do grid search for performing the model selection plus hyperparameter search workbook. I explored two different model architectures, two learning grids, and two different batch sizes.

The first step is the initialization step.

Initialization from pyspark.sql import SparkSession

As usual, you import your regular Python imports and you also input Cerebro. Then you initialize your SparkSession.

And then create a Cerebro-Spark backend object, providing the Spark context and the number of workers for Cerebro as the input.

Then you create a Cerebro HDFS Store object for HDFS based storage, providing a prefixed spot to a directory, which will be used to store all the intermediate data that will be generated during the model selection process.

Define the Models

After the initialization, you can define the models. The first thing you need to do is to define the parameter search space by defining a dictionary object, which has all the configuration parameters and the partition values for each of those parameters. We also provide APIs for defining more complex search spaces, such as the ones that use random sample.

Next, you need to define this factory method which is called estimator generic function, which takes in a specific instance of the search space and returns a Cerebro-Spark estimate. Cerebro-Spark Estimator has a reference to a keras model, the learning optimizer, the loss function that needs to be used, and also the batch size for mini-batch best training.

Run Grid Search

After defining the models, you can run grid search. First, you need to initialize your input DataFrame, using any method you like, and optionally performing any ETL in order to make the the data amenable for deep learning training. Then you need to create Cerebro grid search object, providing the Spark backend, the data store, the estimator generator function, parameters, space as input. You also need to provide several other parameters like how many epochs you want to train each model for, the fraction of the data that you need to use for model validation, and the names of the feature columns and the label columns. This grid search object is very similar to any other Spark MLlib model and you can involve the model selection process by calling the fit method and providing the input data frame. After successful completion, it will return a summary object, which you can use to access the best model and also all the other models that were explored during your model selection process. During model selection, you can initialize the training focus of all the models. Either using TensorBoard or MLflow.

Next, let’s look into some experimental research.

As the experimental setup, I used a 9-node cluster, which has one master node and eight worker nodes. Each node has an Intel Xeon processor, 192 gigbytes of RAM, and one Nvidia P100 GPU.

Tests – Setups – Workload

As the experimental workload, I used the same ImageNet workload that I explained earlier and used Adam as the optimizer. Furthermore, I explored two different values for L2 regularization, essentially generating 16 different model configurations for this workload.

Tests – Results – Learning Curves

Here, I show the learning curves for the different systems that we experimented. The x-axis shows the epoch number, which ranges from 1 to 10. And the y-axis shows the top-5 validation error, which is the standard metric for evaluating model strain on ImageNet data.

Cerebro-Standalone is our version of Cerebro, which directly runs on data files that are available on the file system.

In terms of convergence, we see TensorFlow Model Averaging converges very poorly, as expected. Horovod converges better than TensorFlow Model Averaging, but slightly worse than other systems. The reason for this is its high effective mini-batch size. All the other systems, TensorFlow Parameter Server Asynchronous, Celery, which is the task parallel system, Cerebro-Standalone, and Cerebro-Spark converge similarly.

Tests – Results – Per Epoch Runtimes

In terms of per epoch runtimes,

we see TensorFlow Parameter Server Asynchronous takes around 19 hours to complete one epoch of training of this workload. Horovod takes around 5.42 hours. The reasons for high runtimes of these systems is their high communication overheads. All the other systems complete this workload in a relatively low runtime, around 1.7 to 1.9 hours.

However, it should be noted that even though Celery has low runtime, it has a storage footprint which has a blown up factor of eight bits. And even though TensorFlow Model Averaging has low runtime, it converges very poorly.

Another important thing to note here is that the requirements of deep learning model training significantly differs from the requirements of a typical data processing workload on Spark. And as a result, when running Cerebro on top of Spark, Spark has to be configured in a way so that it is optimized for deep learning training. How to configure Spark for Cerebro can be found in the documentations of our system.

Tests – Cerebro-Spark Gantt Chart

Here I show the Gantt chart produced for first epoch of training for Cerebro-Spark. Each color uniquely identifies a different model configuration and you can see how models hop between different workers in order to complete one epoch of training. You can also see how the system achieves high system utilization, having very little idle times between model hoppings.

Other Available Hyperparameter

In addition to grid search, we also support several other hyperparameter tuning algorithms. Such as Population Based Tuning, HyperBand, ASHA, and Hyberopt.

More Features to Come

We are also currently working on several other features in Cerebro, such as support for group learning, APIs for transfer learning, and also model parallelism.

If you’re interested in learning more about Cerebro-Spark, you can check our public website. The code is also open sourced under a partial license and can be accessed on GitHub. If you’re more interested about the technical details about Cerebro or MOP, you can check out blog post, or the tech report.

With that, I would like to conclude.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Yuhao Zhang

University of California, San Diego

Ph.D. student in Computer Science and Engineering Department at the University of California, San Diego. Advised by Prof. Arun Kumar. Research interest focuses on machine learning systems intending to make data science easier and faster. Has been working on data systems for video analytics and deep learning model selection.

About Supun Nakandala

University of California, San Diego

I am currently a third year Ph.D. student in the Computer Science and Engineering Department at University of California, San Diego. My research interest lies broadly in the intersection of Systems and Machine Learning, an emerging area which is increasingly referred to as Systems for ML. In this space, I operate as a data management researcher. Taking inspirations from classical data management techniques, I build new abstractions, algorithms, and systems to improve efficiency, scalability, and usability of machine learning workloads. I am also interested in large-scale applied ML, which opens new systems challenges.