On August 15th, Data Science Central hosted a live webinar—Parallelize R Code Using Apache Spark—with Databricks’ Hossein Falaki. This webinar introduced SparkR concepts, architecture, and a range of new APIs introduced as part of SparkR in Apache Spark 2.x, providing data scientists and statisticians with new capabilities to distribute their existing computation across a Spark cluster.
With the release of Spark 2.0, and subsequent releases, the R API officially supports executing user code on distributed data. This is done primarily through a family of apply() functions.
If you missed the webinar, you can view it now as well peruse the slides here. Also, we demonstrated two R notebooks:
Toward the end, we held a Q & A, and below are all the questions and their answers.
If I can use R on top of Spark, why do I need a separate ML library which seems limited right now?
Although it is possible to implement a wide range of functionality using apply() functions, there are many algorithms that require distributed optimization implementation. Examples include generalized linear models or tree-based models. For these use-cases, you can use SparkR’s ML functionality.
Regarding spark.lapply: Is necessary to explicitly load the libraries to the workers, and, to “push” shared variables? Something like
clusterExport in the parallel R package?
Yes, you need to explicitly load libraries on workers. As for variables, you don’t necessarily need to “push” them to workers. This would work fine if the variables are small SparkR’s closure capture will easily take care of them. It is recommended to push auxiliary data to workers directly (using the data plane) if they are large.
Does each worker in spark lapply work on a partition of the original list or on the whole original list? What are the main differences between
dapply beside the fact that one works on list and the other works on a dataframe?
- When using
spark.lapply()each worker will operate on a single value of the input list.*
spark.lapply()ships its arguments to workers over the control plane. However,
gapply()rely on Spark’s data plane.
How is it determined which worker works on which data? Which part of the data the worker each get with their closure?
dapply() you cannot control which worker gets to process what part of the data. However, with
gapply() you can make sure that each worker processes all data associated with a specific key.
gapply()/dapply() be used for functions or tasks like training a model?
If the training process can be implemented in parallel or there are ways to combine partial results (from different workers) into a final model, you can use
dapply()/gapply() for model training.
Can you give an example of when
dapply() would actually be useful?
When using simple transformations that are agnostic to data grouping, you can use
Are these R Workers part of Microsoft R Server or part of Spark binaries?
No. SparkR is an open source project which is part of Apache Spark.
spark.lapply(), can we not point to a network path for
.libPath() so we don’t have to install.packages() on each node?
When package is missing in worker, does it import from an already downloaded package in driver or from CRAN mirror?
You need to explicitly install third-party packages on the workers.
Is possible to share the notebook you have just shown in this webinar?
Yes, please see the links (Notebook 1 and Notebook 2) above in the post.
Can we only use SparkR on the Databricks platform? Or can we use it in RStudio as well?
You can use SparkR in other platforms as well.
If we have a large data set around 15~16 million, which function do you recommend lappy,
Do not use
spark.lapply() to distribute your data. First, parallelize your data as a
SparkDataFrame, and then use
gapply() depending on your use case.
Can SparkR be used for distributed scoring of records for prediction using a model?
Yes. You can distribute the model object to all workers (for example by persisting to disk and reading from disk), and then you can use
dapply()/gapply() to score your data against the model in parallel.
Where is the result dataframe instance stored/resided as the result of parallel processing? Is it distributed over cluster or held by master’s memory?
If you use
dapplyCollect()/gapplyCollect(), the result is returned as a local
data.frame object. Otherwise, when using
dapply()/gapply(), the result is a distributed object stored on all the workers.
dapplyCollect()) will do lazy execution, right?
gapply() are lazy.