Apache Spark 2.0 introduced a new family of APIs in SparkR, the R interface to Apache Spark to enable users to parallelize existing R functions. The new dapply, gapply and spark.lapply methods open exciting possibilities for R users. In this post, we present details on one use case jointly done by Shell Oil Company and Databricks.
Use Case: Stocking Recommendation
In Shell, current stocking practices are often driven by a combination of vendor recommendations, prior operational experience and “gut feeling.” As such, a limited focus is directed towards incorporating historical data in these decisions, which can sometimes lead to excessive or insufficient stock being held at Shell’s locations (e.g., an oil rig).
The prototype tool, Inventory Optimization Analytics solution, has proven that Shell can use advanced data analysis techniques on SAP inventory data to:
- Optimize warehouse inventory levels
- Forecast safety stock levels
- Rationalize slow moving materials
- Review and re-assign non-stock and stock items on materials list
- Identify material criticality (e.g., via bill of materials linkage, past usage or lead time)
To calculate the recommended stocking inventory level requirement for a material, the Data Science team has implemented a Markov Chain Monte Carlo (MCMC) bootstrapping statistical model in R. The model is applied to each and every material (typically 3000+) issued across 50+ Shell locations. Each individual material model involves simulating 10,000 MCMC iterations to capture the historical distribution of issues. Cumulatively, the computational task is large but, fortunately, is one of an embarrassingly parallel nature because the model can be applied independently to each material.
The full model is currently executed on a 48-core, 192GB RAM standalone physical offline PC. The MCMC bootstrap model is a custom built set of functions which use a number of third-party R packages
(“fExtremes”, “ismev”, “dplyr”, “tidyr”, “stringr”).
The script iterates through each of the Shell locations and distributes the historical material into roughly equally sized groups of materials across the 48 cores. Each core then iteratively applies the model to each individual material. We are grouping the materials because a simple loop for each material would create too much overhead (e.g. starting the R process etc.) as each calculation takes 2-5 seconds. The distribution of the material group jobs across the cores is implemented via the R parallel package. When the last of the individual 48 core jobs complete, the script moves on to the next location and repeats the process. The script takes a total time of approximately 48 hours to calculate the recommended inventory levels for all Shell locations.
Using Apache Spark on Databricks
Instead of relying on a single large machine with many cores, Shell decided to use cluster computing to scale out. The new R API in Apache Spark was a good fit for this use case. Two versions of the workload were developed as prototypes to verify scalability and performance of SparkR.
Prototype I: A proof of concept
For the first prototype, we tried to minimize the amount of code change as the goal was to quickly validate that the new SparkR API can handle the workload. We limited all the changes to the simulation step as following:
For each Shell location list element:
- Parallelize input date as a Spark DataFrame
SparkR::gapply()to perform parallel simulation for each of the chunks.
With limited change to existing simulation code base, we could reduce the total simulation time to 3.97 hours on a 50 node Spark cluster on Databricks.
Prototype II: Improving performance
While the first prototype was quick to implement, it suffered from one obvious performance bottleneck: a Spark job is launched for every iteration of the simulation. The data is highly skewed and as a result, during each job most executors are waiting idle for the straglers to finish before they can take on more work from the next job. Further, at the beginning of each job, we spend time parallelizing the data as a Spark DataFrame while most of the CPU cores on the cluster are idle.
To solve these problems, we modified the pre-processing step to produce input and auxiliary date for all locations and material values up-front. Input data was parallelized as a large Spark DataFrame. Next, we used a single
SparkR::gapply() call with two keys: location ID and material ID to perform the simulation
With these simple improvements, we could reduce the simulation time to 45 minutes on a 50 node Spark cluster on Databricks.
Improvements to SparkR
SparkR is one of the latest additions to Apache Spark, and the
apply API family was the latest addition to SparkR at the time of this work. Through this experiment, we identified a number of limitations and bugs in SparkR and fixed them in Apache Spark.
- [SPARK-17790] Support for parallelizing R data.frame larger than 2GB
- [SPARK-17919] Make timeout to RBackend configurable in SparkR
- [SPARK-17811] SparkR cannot parallelize data.frame with NA or NULL in Date columns