Generalized Linear Models in SparkR and R Formula Support in MLlib

To get started with SparkR, download Apache Spark 1.5 or sign up for a 14-day free trial of Databricks today.


Apache Spark 1.5 adds initial support for distributed machine learning over SparkR DataFrames. To provide an intuitive interface for R users, SparkR extends R’s native methods for fitting and evaluating models to use MLlib for large-scale machine learning. In this blog post, we cover how to work with generalized linear models in SparkR, and how to use the new R formula support in MLlib to simplify machine learning pipelines. This work was contributed by Databricks in Spark 1.5. We’d also like to thank Alteryx for providing input on early designs.

Generalized Linear Models

Generalized linear models unify various statistical models such as linear and logistic regression through the specification of a model family and link function. In R, such models can be fitted by passing an R model formula, family, and training dataset to the glm() function. Spark 1.5 extends glm() to operate over Spark DataFrames, which are distributed data collections managed by Spark. We also support elastic-net regularization for these models, the same as in R’s glmnet package.

Fitting Models

Since we extend R’s native methods for model fitting, the interface is very similar. R lets you specify the modeling of a response variable in a compact symbolic form. For example, the formula y ~ f0 + f1 indicates the response y is modeled linearly by variables f0 and f1. In 1.5 we support a subset of the R formula operators available. This includes the + (inclusion), - (exclusion), . (include all), and intercept operators. To demonstrate glm in SparkR, we will walk through fitting a model over a 12 GB dataset (with over 120 million records) in the example below. Datasets of this size are hard to train on a single machine due to their size.

Preprocessing

The dataset we will operate on is the publicly available airlines dataset, which contains twenty years of flight records (from 1987 to 2008). We are interested in predicting airline arrival delay based on the flight departure delay, aircraft type, and distance traveled.

First, we read the data from the CSV format using the spark-csv package and join it with an auxiliary planes table with details on individual aircraft.

> airlines <- read.df(sqlContext, path="/home/ekl/airlines",
    source="com.databricks.spark.csv", header="true", inferSchema="true")
> planes <- read.df(sqlContext, "/home/ekl/plane_info",
    source="com.databricks.spark.csv", header="true", inferSchema="true")
> joined <- join(airlines, planes, airlines$TailNum == planes$tailnum)

We use functionality from the DataFrame API to apply some preprocessing to the input. As part of the preprocessing, we decide to drop rows containing null values by applying dropna() to the DataFrame.

> training <- dropna(joined)
> showDF(select(training,
    “aircraft_type”, “Distance”, “ArrDelay”, “DepDelay”))

 aircraft_type              | Distance | DepDelay  | ArrDelay
----------------------------|----------|-----------|----------
 "Balloon"                  | 23       | 18        | 20
 "Fixed Wing Multi-Engine"  | 815      | 2         | -2
 "Fixed Wing Single-Engine" | 174      | 0         | 1

Training

The next step is to use MLlib by calling glm() with a formula specifying the model variables. We specify the Gaussian family here to indicate that we want to perform linear regression. MLlib caches the input DataFrame and launches a series of Spark jobs to fit our model over the distributed dataset.

> model <- glm(ArrDelay ~ DepDelay + Distance + aircraft_type,
    family = "gaussian", data = training)

Note that parameter “lambda” can be used with glm to add regularization and “alpha” to adjust elastic-net constant.

Evaluation

As with R’s native models, coefficients can be retrieved using the summary() function.

> summary(model)
$coefficients
                                             Estimate
(Intercept)                             -0.5155037863
DepDelay                                 0.9776640253
Distance                                -0.0009826032
aircraft_type__Fixed Wing Multi-Engine   0.3348238914
aircraft_type__Fixed Wing Single-Engine  0.2296622061
aircraft_type__Balloon                   0.5374569269

Note that the aircraft_type feature is categorical. Under the hood, SparkR automatically performs one-hot encoding of such features so that it does not need to be done manually. Beyond String and Double type features, it is also possible to fit over MLlib Vector features, for compatibility with other MLlib components.

To evaluate our model we can also use predict() just like in R. We can pass in the training data or another DataFrame that contains test data.

> preds <- predict(model, training)
> errors <- select(
    preds, preds$label, preds$prediction, preds$aircraft_type,
    alias(preds$label - preds$prediction, "error"))

Since the returned DataFrame contains the original columns in addition to the label, features, and predicted value, it is easy to inspect the result. Here we take advantage of the built-in visualizations from Databricks to examine the error distribution with respect to the aircraft type.

> display(sample(errors, F, .0001))

Screen Shot 2015-10-01 at 4.40.42 PM

In summary, SparkR now provides seamless integration of DataFrames with common R modeling functions, making it simple for R users to take advantage of MLlib’s distributed machine learning algorithms.

To learn more about SparkR and its integration with MLlib, see the latest SparkR documentation.

R formula support in other languages

SparkR implements the interpretation of R model formulas as an MLlib feature transformer, for integration with the ML Pipelines API. The RFormula transformer provides a convenient way to specify feature transformations like in R.

To see how the RFormula transformer can be used, let’s start with the same airlines dataset from before. In Python, we create an RFormula transformer with the same formula used in the previous section.

>>> import pyspark.ml.feature.RFormula
>>> formula = RFormula(
        formula="ArrDelay ~ DepDelay + Distance + aircraft_type")

After the transformation, a DataFrame with features and label column appended is returned. Note
that we have to call fit() on a dataset before we can call transform(). The fit() step determines the mapping of categorical feature values to vector indices in the output, so that the fitted RFormula can be used across different datasets.

>>> formula.fit(training).transform(training).show()
+--------------+---------+---------+---------+--------------------+------+
| aircraft_type| Distance| DepDelay| ArrDelay|            features| label|
+--------------+---------+---------+---------+--------------------+------+
|       Balloon|       23|       18|       20| [0.0,0.0,23.0,18.0]|  20.0|
|  Multi-Engine|      815|        2|       -2| [0.0,1.0,815.0,2.0]|  -2.0|
| Single-Engine|      174|        0|        1| [1.0,0.0,174.0,0.0]|   1.0|
+--------------+---------+---------+---------+--------------------+------+

Any ML pipeline can include the RFormula transformer as a pipeline stage, which is in fact how SparkR implements glm(). After we have created an appropriate RFormula transformer and an estimator for the desired model family, fitting a GLM model takes only one step:

>>> import pyspark.ml.Pipeline
>>> import pyspark.ml.regression.LinearRegression

>>> estimator = LinearRegression()
>>> model = Pipeline(stages=[formula, estimator]).fit(training)

When the pipeline executes, the features referenced by the formula will be encoded into an output feature vector for use by the linear regression stage.

We hope that RFormula will simplify the creation of ML pipelines by providing a concise way of expressing complex feature transformations. Starting in Spark 1.5 the RFormula transformer is available for use in Python, Java, and Scala.

What’s next?

In Spark 1.6 we are adding support for more advanced features of R model formulas, including feature interactions, more model families, link functions, and better summary support.

As part of this blog post, we would like to thank Dan Putler and Chris Freeman from Alteryx for useful discussions during the implementation of this functionality in SparkR, and Hossein Falaki for input on content.

Try Databricks for free Get started

Sign up