Predicting Optimal Parallelism for Data Analytics

May 26, 2021 03:50 PM (PT)

Download Slides

A key benefit of serverless computing is that resources can be allocated on demand, but the quantity of resources to request, and allocate, for a job can profoundly impact its running time and cost. For a job that has not yet run, how can we provide users with an estimate of how the job’s performance changes with provisioned resources, so that users can make an informed choice upfront about cost-performance tradeoffs? 

This talk will describe several related research efforts at Microsoft to address this question. We focus on optimizing the amount of computational resources that control a data analytics query’s achieved intra-parallelism. These use machine learning models on query characteristics to predict the run time or Performance Characteristic Curve (PCC) as a function of the maximum parallelism that the query will be allowed to exploit.

The AutoToken project uses models to predict the peak number of tokens (resource units) that is determined by the maximum parallelism that the recurring SCOPE job can ever exploit while running in Cosmos, an Exascale Big Data analytics platform at Microsoft. AutoToken_vNext, or TASQ, predicts the PCC as a function of the number of allocated tokens (limited parallelism). The AutoExecutor project uses models to predict the PCC for Apache Spark SQL queries as a function of the number of executors. The AutoDOP project uses models to predict the run time for SQL Server analytics queries, running on a single machine, as a function of their maximum allowed Degree Of Parallelism (DOP). 

We will present our approaches and prediction results for these scenarios, discuss some common challenges that we handled, and outline some open research questions in this space.

In this session watch:
Rathijit Sen, , Microsoft
Vishal Rohra, Machine Learning Engineer, Microsoft



Rathijit Sen: Hello everyone. Thank you for attending this talk. I’m Rathijit from Microsoft, and together with my colleague, Vishal, we will be presenting about predicting optimal parallelism for data analytics. Here’s the structure of our talk. First, we’ll present a general overview of the problem, then we’ll give a brief overview of four case studies; AutoDOP, AutoToken, TASQ, and AutoExecutor. Finally, we will conclude with a couple of open research questions in this area. Cloud computing platforms provide opportunities to users, to provision resources dynamically in a fine-grained manner as much as it’s needed. Cloud service providers also provision the cluster capacities, according to resource demand and utilization. Our question is, how much resources does a job actually need to have to meet its cost and performance objectives.
And in this talk, we focus on automatically predicting the optimal parallelism for jobs and the competition resources that it needs. We also want to allow flexibility in choosing the optimal operating point depending on the cost and performance objectives, and to provision resources accordingly. Our approach is to predict the job’s peak parallelism as a function of its characteristics, or a job’s run time as a function of its characteristics and amount of parallelism. In this talk, by a job, we mean an analytics query, and we’ll be using the term job and query interchangeably. Why we are interested in peak parallelism is that ideally, at that point, the job would have the lowest run time, and any further resource allocation would be wasteful, but we may be able to achieve the same run time at a lower amount of parallelism, and in some cases, if there are overheads, as I stated with high parallelism, we might actually be able to improve the run time at lower parallels.
For query characteristics, we are interested in [crosstalk] and optimization time properties and estimates. This is because we want to do the prediction for the queries before they have run. So at that time, the run time properties are not available. And we learned this function F using machine learning models on past executions. So this is the general framework, and we will be seeing this over and over again in the case studies that we discuss next.
So here is a quick summary of the case studies, they differ according to the platform, the number of nodes involved in the operations, and the type of predictions. We will be explaining these concepts as we go along. One of the concepts that will appear is that of the performance characteristic over PCC, which shows a plot of the relationship between the run time and the parallelism. And we want to provide such a view to the user, so that they can select the optimal operating point to meet cost and performance objectives. So now, I will start with the first case study, which is AutoDop on SQL Server.
This is joint work with my collaborators at the university of Wisconsin, Madison, and more information is available in these papers. In this AutoDOP project, we are focused on SQL Server running on a single note. For every query, we can specify the degree of parallelism or DOP, which is the maximum number of threads that can be active at any time for executing the GUID. And depending on this choice of DOP, the query run time could be affected, and hence also its cost. And thus, this can also affect the resource utilization and provisioning in cloud computing platforms.
The choice of the DOP value affects different queries in different ways. Here we show a few example of queries from TPC-DS benchmark suite. The X axis shows a degree of parallelism with DOP, and the Y axis show query run time normalized to the run time at DOP [inaudible]. Some queries are well-parallelizable, that is the run times decreased with increasing DOP. Whereas for some other queries, they’re not so much parallelizable, particularly at the high DOPs, and we see that actually there are on-time increases at high DOPs. So from these examples, we see that a single tries of the DOP is not likely to be optimal for all queries, and we need to take into account query characteristics when selecting the optimal DOP. The choice of the optimal DOP also depends on the amount of data that the query processes. Here, we show six workloads from TPC-DS and TPC-H, and for different scale factors.
So on the x-axis, DS means TPC-DS and H means TPC-H, and the numbers indicate the scale factor. So DS hundred is on a scale factor hundred, whereas DS thousand shows TPC-DS thousand, which is a scale factor thousand, and it processes about 10 times as much data as DS hundred. Within each bar, the colors represent the percentage of queries that have that value of the optimal DOP, and the legend shows those optimal DOP values. So 1, 8, 16, 20, 32, 40, 64, and 80. As we have discussed previously, for any workload, there isn’t a single DOP value that’s optimal for all queries. Rather, there is a mix of optimal [inaudible]. Moreover, we see that as the scale factor increases, that is as the data set sizes increases, the higher DOP values become optimal for a larger fraction of the queries. For example, a DS thousand more queries have a DOP 80 as optimal, then a DS hundred.
Similarly, for TPC-H thousand, there’s a high percentage of queries with 80 as optimal DOP than at a TPC-H hundred. So we need to take into account the data size as well. Why selecting document DOP? So our approach is to use the query characteristics as a query plan operators, and then estimate some of the data sizes, [inaudible] of two-plus processed, and so on. And then given a target DOP, we use machine learning models as a random forest to predict the run time at the DOP. And once you have the run time at different DOPs, we can select the optimal DOP. For the machine learning models, we use a regression model to first predict the run time, and then select the DOP and stuff using a classification monitor directly select the DOP, because with the regression approach, we can have a richer selection of the optimization objectives, and we can select the operating point, accordingly.
Here are some example results from subsets of queries taken from TPC-DS benchmark suites, we should do tester suites; test one and test two. For each test, we consider the sum of query run times over all the queries in that test. And for the baseline, we consider all queries executing at DOP 64, which is the default value for assistance. A speed-up of one means no performance change, greater than one means performance improvement, and less than one wins performance loss. For each desk, the third, fourth, and fifth series are with policies where all the queries are running with the same DOP value. So workload optimal means all queries with the DOP that’s optimal for the entire test suite, and 40 and 80 minutes, all queries at DOP [inaudible]. Query optimal means each query has been given its optimal value of DOP, and it shows a speed-up overwatered optima.
[inaudible] query optimal and watered optimal assume the existence of an article, which accurately selects the correct DOP, but this may not be realizable in practice. So the first series, which shows ML is actually from the auto DOP predictions, and we can see that with those values, we can get speed-ups that are closer to the query optimal policies than with the static selection policies. We’ll now move over to the next two case studies; AutoToken and TASQ. While AutoDOP predicted the run time for each DOP value, AutoToken will predict the peak parallelism, and then Vishal will be talking about the TASQ [inaudible] run time and [inaudible]. So I’ll start with the AutoToken. This is joint work with my colleagues at Microsoft, and more information is available in this paper.
In this project, we looked at the executional scope queries on Cosmos, which is our exabyte scale, big data analytics platform at Microsoft. On this platform, the need of resource allocation is a token, and a token consists of a number of cores, and amount of memory. Tokens can be of two types; guaranteed and spare tokens. Spare tokens are allocated dynamically as the job runs, depending on available cluster capacity. In this work, we focused on provisioning our guarantee tokens, and the number of tokens allocated to a job may affect its run time, and hence its cost, as well as resource utilization, and provisioning decisions.
The AutoToken project focuses on predicting the token count so that the job can achieve its peak parallelism. So it looks at peak resource provisioning. Any more tokens would be wasteful, because the job performance would not improve, and then the jobs would also need to wait unnecessarily to get those tokens. On the other hand, with less tokens, there could be a loss of performance, or the performance could be unpredictable, particularly if the job is relying on the availability of spare tokens or performance. Now given a job, how would it use a select to this token count?
They could use a guest, or they could use a default value, or default percentage for the VC [inaudible], but we have seen on our clusters that this policy is quite suboptimal, so this chart shows the distribution of the ratio of the requested token count divided by the peak token count, and find our other production clusters. A value of one means perfect allocation, whereas greater than one means the job is over allocated, and less than one means it is under allocated. And as you can see on our clusters, 40 to 60% of jobs are over allocated, and this is the space that AutoToken tries to focus on to reduce this over allocation for the jobs.
This figure shows the skyline of an example job where the x-axis shows run time, and the y-axis shows resources or the token [inaudible]. And here, as we can see, the default allocation is much more than the peak requirements, so this job is over allocated and we want to go towards this ideal allocation. So that’s what AutoToken aims to do, tries to eliminate this over-allocation, and there would be no performance impact, ideally, because these tokens are not going to be used. Our approach to do this is to use machine learning models to learn the peak token counts from past behavior, so we group jobs by their characteristics, and we use signatures for grouping these jobs, and then we build ML models for each job group. When a new job arrives, we compute a signature and see if, for the corresponding group, our model has been built, and if so, we use that model to predict the peak token count for that job.
Here are some exemple results. As we can see on the chart, with our token predictions, the distributions of the requested token count divided with the actual peak token counts are closer to the idea of [inaudible], so that is shown by the blue line. And this is much closer than what we get with the default allocation. We tested the AutoToken over hundreds and thousands of jobs in our production clusters, and overall the accuracy was quite good. The median error was zero and the 90th percentile error was 50% or less than the default token counts. The coverage range from about 11% to 28%, because AutoToken focused on recurring jobs only, and we also were conservative in selecting which jobs we want to predict far, because we want to minimize the cases where a job’s performance could suffer if the predictions were not accurate. But in TASQ, we are going to look at higher coverage that Vishal will be talking about.
So to recap, AutoToken focuses only on recurring jobs, and it tries to eliminate the over allocations from those jobs. That is, it goes from default allocation to peak allocation, but perhaps we can do even better by reducing the allocation even further and going towards a tight allocation, so that is the focus of the TASQ project that Vishal will be discussing.

Vishal Rohra: Thanks for that, Rathijit. My name is Vishal Rohra and I’m a software engineer at Microsoft. Let me walk you through our project TASQ, which is short for Token Allocation for Serverless Queries. The previous project, AutoToken, focused on eliminating over allocation for recurring jobs. This project aims for a more aggressive resource allocation strategy, which we call tight allocation. So why do we need tight allocation? The first obvious answer is cost savings. And to measure the scale of savings, we looked at the cumulative distribution of potential reduction in jobs requested, tokens requested by skilled jobs. Let’s look at the figure here. The red curve is when the user does not want any change in performance, which in our case, is the job run time. As we can see, 50% of the jobs can request fewer tokens, and still maintain the same performance. And the real value is for 20% of those jobs, that can at least get the request tokens by half, and seem negligible change in performance.
And if the user is willing to take a 5% performance loss, the savings are even more significant. Apart from savings as a whole system, tight allocation, collectively decreases job wait times, and increases resource availability. Now that we have validated the need for tight allocation, how do we help our users achieve it? At its core, given any job’s compile time features. We want to predict the number of tokens that would lead to tight allocation for that job, but through domain knowledge and upon interacting with our users, we learned that optimal token allocation means different things for different people. Users prioritize cost savings or faster run time based on the task at hand. So instead of recommending a single token count for a job, it would be more useful to protect the relationship between the number of tokens allocated versus job run time. This should facilitate a more transparent process and empower users to make an informed decision.
And finally, from domain knowledge, we understand that the token versus run time relationship is usually an exponentially decaying code, which we refer to as the PCC that is performance characteristic curve. So instead of predicting tight allocation, we want to predict the parameters A and B for the PCC, we’ll explain what these parameters are in native flights. So let’s look at the first crucial challenge in this approach. Our data like the job graph, meta characteristics, for most of the historical workloads, it’s only for a single token count, it will also really experiment with different open counts for the same job, but to predict the token versus run time relationship, we need data for multiple open grounds, so we can learn the trend for different types of job graphs. To tackle this, we introduce an efficient data augmentation technique, which we call AREPAS, short for Area Preserving Allocation Simulator. We leverage past job skylines of a single token for a simulator that generates multiple skylines, but with different token counts. Based on a system level intuition, the simulator assumes a couple of things.
First, the total number of computations, regardless of the number of tokens allocated, are constant. Then, if we deputize the skyline at one second, [inaudible], a one by one square in the plot represents one token second. We assume these token seconds are also constant. And finally, the total area under the resource consumption skyline remains constant. Given these assumptions, this is how the simulator creates multiple skylines. The blue curve is the original workload skyline, and if you decrease the number of tokens, the orange curve that presents the skyline for those constraints disseminate as essentially flattening out the peaks, and thereby proportionally increasing it on time, since the area under the code has to be preserved. All of this significantly enriches our sparse training data.
Now that we have enough training data, let’s understand what does the run time versus token relationship look like? Generally, there’s a user expectation that if I provide more tokens that would lead to lower run time, while that isn’t completely true, we have to acknowledge that the elbow region of the performance characteristic curve usually emerges before any sort of parallelism overhead kicks in. Since we mostly care about the [inaudible], it’s fair to enforce that we need a monotonically non-increasing curve.
We enforce this by characterizing the inverse relationship between the run time and tokens as a power law. When in the science of the scale of parameters, A and B are different, the goal then is to predict these parameters, this approach simplifies the problem and makes it easier for users to understand the trade-off. Given that constraint, we tried a couple of different modeling techniques, XGBoost, feedforward neural networks, and graph neural networks, and we evaluate each model by three criterias. First, whether the predictive curve maintains a monotonically, non-increasing pattern. Second, the main absolute error for the curve parameters. Third, the median absolute error for a job run time. As you can see the first two methods; XGBoost [inaudible] and XGBoost [inaudible], they’re not always predicted to non-increasing curve. On the other hand, in neural networks and graphical networks, you can enforce that pattern by design, both the neural network models have comparable performance for cost parameters and run time.
GNN however, are particularly suited for this problem, because the input data consists of a computational job graph. Also XGBoost and feedforward neural networks require the input features dimensions to be equal, while GNN is flexible and can accept variable feature dimensions based on the number of operators for the job. This can be seen in the results where GNN outperform other models. And to feed forward neural networks’ advantage, they take significantly less planning time, which is more favorable for production. Now, let’s see how all of this ties together for the user. We piloted this project in an internal Python client for scope, and here’s the expected user workflow. First, they user submits the job with the scope script and parameters like whether they want to visualize the PCC, and if they have constraints around the number of tokens they can allocate. Then the job is compiled and a computational job graph is generated at the backend.
We use that as input for our model, and predict the code parameters. Those parameters generate this visualization. As you can see, they use it as they were to inspect our given jobs that are based on different token counts. And if they aren’t interested in inspecting this manually for every job, they can put this on autopilot and get an optimal token count, which is for this case is 32, right around the time that run time gains start to diminish. Ultimately, all this effort helps users make informed resource allocation division for all their jobs before it’s executed.
And this is the high-level system architecture of how this feature was integrated. On the top, we have the storage for job graphs, job characteristics, and the skyline, that data is clean and ingested. And in Azure machine learning, features are created, the modern strain registered, and an end point is generated, to consume the model in any environment. Our first environment is our Python submission system, which we showed earlier, wherein the query is submitted, compiled time features are ingested in the required format, and the end point is called in for the results. Now, I’ll pass this back to Rathijit who will dwell into another project, AutoExecutor, which extends the work of TASQ. Thank you.

Rathijit Sen: Thanks, Vishal. Now, we will talk about our last case study, which is AutoExecutor. This is joint work with my colleagues at Microsoft, and this is one that is currently under submission. In this project, we look at predicting the optimal number of executors for Spark SQL queries for Spark in Azure Synapse platforms. Executors are processes on worker nodes, and each executor has a certain number of cores and memory that it can use. And by changing the number of executors that a query can use, we can affect its run time and cost, and hence it will also affect the resource utilization and provisioning decisions.
So here in this project, we extended the PCC model in task, as Vishal just described. So here on the right, we see the run time for Spark SQL query, from TPC-DS, the x-axis shows the number of executors and the y-axis shows run time, and we model the PCC in two parts. The first part is the decreasing part, which you model as a poll of functions, similar to TASQ. But then there is a portion where the run time flattens out, and that is the portion that we model with a constant function. So our business model is a combination of these two functions, and we take the maximum value that is obtained from these two functions, and here in these functions, A, B, and M are scalar parameters that depend on the query, and our approach is to use machine learning models, to predict these PCC parameters. So use a combined time and optimization time properties that discount of operators and input cardinality is average rule and et cetera, to the LN models.
Here are some example predictions. So this one is showing a TPC-DS query, and the x-axis shows the number of executors, and the y-axis shows time, the green line shows the actual time obtained by running the query at different executor counts, the blue line shows the predicted values with AutoExecutor, and for reference, we also show the estimates from Sparklens tool, so Sparklens is an open source tool, which can estimate the run time for the query of different executor counts. However, Sparklens can do its prediction after one execution of the query. Whereas for AutoExecutor, it was able to do the prediction before we would execute the query, so that’s the difference. And as you can see here from this example, the prediction was quite good. Now here are the summary results over the full workload.
So here we take TPC-DS, and then we split it up into 80% of training and 20% for tests, and we do a tenfold cross-validation. So F1 to F10 shows the tenfolds, and on the y-axis, we show the relative error, and we show the errors for different executor calls. So 1, 3, 8, 16, 20, 25. So for both Sparklens and AutoExecutor, we see that the relative error is about 20% or less for higher executor counts, whereas the areas are larger and the smaller executor counts. However, from the PCC curve, we see that the elbow region, which is the interesting region of operation is usually at the medium to higher executor counts. So overall, this looks quite promising.
So here’s the overall system architecture. So when the query executes, we collect executed events and advise plans and metrics and our framework. And then there is a workload analysis phase, which then extracts the query characteristics and puts it into a workload table. Then there is a feature extraction phase that raise in query characteristics from the workload table, and then sends it to the model training phase. The model training phase trains the model using these features and the PCC parameter values on the training data, and then it stores the model in a database. When a new query arrives, we look up this model from this database, and school this model, according to the previous characteristics, predicts as PCC, select the optimal operating point, and then request the optimal number of executors from the spark engine.
So this concludes our case studies, and we’ll now summarize our talk. Our overall theme was the automatic selection of optimal parallelism for data analytics. And the reason why we do this is because we want to provide this capability of being able to select the optimal operating point, depending on the cost and performance objectives funcitons, and our approach is to use machine learning models that can predict the queries, peak parallelism, or run time based on its characteristics. We’ll now briefly talk about a couple of challenges and open research questions in this area. The first challenge is about modeling the PCC curve. In the AutoDOP case study, we predicted the run time for each DOP, and using these individual estimates, we can build up the PCC curve. In TASQ, we looked at both point-wise prediction, as well as data queue predicting the PCC, using a power-law function as an approximation. In the AutoExecutor case study, we used a poll of function, as well as a constant function to approximate the PCC. So the question is, are there other ways to model the PCC and then what is the best way to model the PCC?
The second challenge is about collecting training data. So for training the machine learning models we need for every query, its run times have different amounts of parallelism, but this could be expensive to collect. So our approach on the TASQ project was to use the AREPAS simulation that Vishal described. It is using a skyline simulator, and to estimate this run times at different parallelisms. And on the AutoExecutor project, we use the open source Sparklens tool, which is using a schedule simulator to estimate their run times and different parallelisms. However, parallelism may not be the only factor that affects a job’s performance. There could be other inflation factors, for example, memory capacity, types of [inaudible 00:25:49], et cetera. And then how would a simulator handle changes to those parameters? So having such capabilities, so in the simulator to handle changes to those parameter values as well, would be very useful. Thank you for your attention. We look forward to receiving your feedback, and we are happy to answer any further questions that you may have.

Rathijit Sen

Rathijit Sen is a Senior Scientist at the Microsoft Gray Systems Lab (GSL), which is an applied research lab under Azure Data in Microsoft. His work is centered on automatic configuration selection te...
Read more

Vishal Rohra

Vishal is a software engineer at Microsoft's AI Development Acceleration Program (MAIDAP). He works with teams across the company to build and productize machine learning solutions. Currently, he is w...
Read more