Skip to main content
Company Blog

This is a guest blog from Eugene Zhulenev on his experiences with Engineering Machine Learning and Audience Modeling at Collective.


At Collective, we heavily rely on machine learning and predictive modeling to run our digital advertising business. All decisions about what ad to show at this particular time to this particular user are made by machine learning models (some of them are in real-time while some of them are offline).

We have a lot of projects that uses machine learning, the common name for all of them can be Audience Modeling, as they all are trying to predict audience conversion (CTR, Viewability Rate, etc…) based on browsing history, behavioral segments, and other type of predictors.

For most of our new development, we use Apache Spark and MLLib. However, while it is an awesome project, we found that there are some widely used tools and libraries that are missing in Spark. To add those missing features that we would really like to have in Spark, we created Spark Ext (Spark Extensions Library).

I’m going to show a simple example of combining Spark Ext with Spark ML pipelines for predicting user conversions based on geo and browsing history data.

Spark ML pipeline example: SparkMlExtExample.scala

Predictors Data

I’m using a dataset with 2 classes, that will be used for solving classification problem (user converted or not). It’s created with dummy data generator so that these 2 classes can be easily separated. It’s pretty similar to real data that usually is available in digital advertising.

Browsing History Log

History of websites that were visited by a user.

Cookie          | Site          | Impressions  
--------------- |-------------- | -------------
wKgQaV0lHZanDrp | live.com      | 24
wKgQaV0lHZanDrp | pinterest.com | 21
rfTZLbQDwbu5mXV | wikipedia.org | 14
rfTZLbQDwbu5mXV | live.com      | 1
rfTZLbQDwbu5mXV | amazon.com    | 1
r1CSY234HTYdvE3 | youtube.com   | 10
 

Geo Location Log

Latitude/Longitude impression history.

Cookie          | Lat     | Lng       | Impressions
--------------- |---------| --------- | ------------
wKgQaV0lHZanDrp | 34.8454 | 77.009742 | 13
wKgQaV0lHZanDrp | 31.8657 | 114.66142 | 1
rfTZLbQDwbu5mXV | 41.1428 | 74.039600 | 20
rfTZLbQDwbu5mXV | 36.6151 | 119.22396 | 4
r1CSY234HTYdvE3 | 42.6732 | 73.454185 | 4
r1CSY234HTYdvE3 | 35.6317 | 120.55839 | 5
20ep6ddsVckCmFy | 42.3448 | 70.730607 | 21
20ep6ddsVckCmFy | 29.8979 | 117.51683 | 1
 

Transforming Predictors Data

As you can see the predictors data (sites and geo) is in long format.   Each cookie has multiple rows associated with it; in general, it is not a good fit for machine learning. We’d like cookie to be a primary key while all other data should form the feature vector.

Gather Transformer

Inspired by R tidyr and reshape2 packages, we convert a long DataFrame with values for each key into a wide DataFrame and apply an aggregation function if the single key has multiple values.

val gather = new Gather()
.setPrimaryKeyCols("cookie")
.setKeyCol("site")
.setValueCol("impressions")
.setValueAgg("sum") // sum impression by key
.setOutputCol("sites")
val gatheredSites = gather.transform(siteLog)

Cookie           | Sites
-----------------|----------------------------------------------
wKgQaV0lHZanDrp  | [
                 |  { site: live.com, impressions: 24.0 }, 
                 |  { site: pinterest.com, impressions: 21.0 }
                 | ]
rfTZLbQDwbu5mXV  | [
                 |  { site: wikipedia.org, impressions: 14.0 }, 
                 |  { site: live.com, impressions: 1.0 },
                 |  { site: amazon.com, impressions: 1.0 }
                 | ]
 

Google S2 Geometry Cell Id Transformer

The S2 Geometry Library is a spherical geometry library, very useful for manipulating regions on the sphere (commonly on Earth) and indexing geographic data. Basically, it assigns a unique cell id for each region on the earth.

For example, you can combine S2 transformer with Gather to convert lat/lon to K-Vpairs, where the key will be S2 cell id. Depending on a level you can assign all people in Greater New York area (level = 4) into one cell, or you can index them block by block (level = 12).

// Transform lat/lon into S2 Cell Id
val s2Transformer = new S2CellTransformer()
.setLevel(5)
.setCellCol("s2_cell")

// Gather S2 CellId log
val gatherS2Cells = new Gather()
.setPrimaryKeyCols("cookie")
.setKeyCol("s2_cell")
.setValueCol("impressions")
.setOutputCol("s2_cells")

val gatheredCells = gatherS2Cells.transform(s2Transformer.transform(geoDf))

Cookie           | S2 Cells
-----------------|----------------------------------------------
wKgQaV0lHZanDrp  | [
                 |  { s2_cell: d5dgds, impressions: 5.0 }, 
                 |  { s2_cell: b8dsgd, impressions: 1.0 }
                 | ]
rfTZLbQDwbu5mXV  | [
                 |  { s2_cell: d5dgds, impressions: 12.0 }, 
                 |  { s2_cell: b8dsgd, impressions: 3.0 },
                 |  { s2_cell: g7aeg3, impressions: 5.0 }
                 | ]
 

Assembling Feature Vector

K-V pairs from the result of Gather are cool, and groups all the information about the cookie into a single row.  However, they cannot be used as input for machine learning. To be able to train a model, the predictors data need to be represented as a vector of doubles. This is easy to do if all the features are continuous and numeric.  But if some of them are categorical or in gathered shape, this is not a trivial task.

Gather Encoder

Encodes categorical key-value pairs using dummy variables.

// Encode S2 Cell data
val encodeS2Cells = new GatherEncoder()
.setInputCol("s2_cells")
.setOutputCol("s2_cells_f")
.setKeyCol("s2_cell")
.setValueCol("impressions")
.setCover(0.95) // dimensionality reduction

Cookie           | S2 Cells
-----------------|----------------------------------------------
wKgQaV0lHZanDrp  | [
                 |  { s2_cell: d5dgds, impressions: 5.0 }, 
                 |  { s2_cell: b8dsgd, impressions: 1.0 }
                 | ]
rfTZLbQDwbu5mXV  | [
                 |  { s2_cell: d5dgds, impressions: 12.0 }, 
                 |  { s2_cell: g7aeg3, impressions: 5.0 }
                 | ]
 

Transformed into

Cookie           | S2 Cells Features
-----------------|------------------------
wKgQaV0lHZanDrp  | [ 5.0  ,  1.0 , 0   ]
rfTZLbQDwbu5mXV  | [ 12.0 ,  0   , 5.0 ]
 

Note that it’s 3 unique cell id values, that gives 3 columns in the final feature vector.

Optionally apply dimensionality reduction using top transformation:

  • Top coverage, is selecting categorical values by computing the count of distinct users for each value, sorting the values in descending order by the count of users, and choosing the top values from the resulting list such that the sum of the distinct user counts over these values covers c percent of all users (e.g. selecting top sites covering 99% of users).

Spark ML Pipelines

Spark ML Pipeline - is new high-level API for Spark MLLib.

A practical ML pipeline often involves a sequence of data pre-processing, feature extraction, model fitting, and validation stages. For example, classifying text documents might involve text segmentation and cleaning, extracting features, and training a classification model with cross-validation. Read More.

In Spark ML it’s possible to split an ML pipeline into multiple independent stages, group them together in a single pipeline and run it with Cross-Validation and Parameter Grid to find the best set of parameters.

Put It All together with Spark ML Pipelines

Gather encoder is a natural fit into Spark ML Pipeline API.

// Encode site data
val encodeSites = new GatherEncoder()
.setInputCol("sites")
.setOutputCol("sites_f")
.setKeyCol("site")
.setValueCol("impressions")

// Encode S2 Cell data
val encodeS2Cells = new GatherEncoder()
.setInputCol("s2_cells")
.setOutputCol("s2_cells_f")
.setKeyCol("s2_cell")
.setValueCol("impressions")
.setCover(0.95)

// Assemble feature vectors together
val assemble = new VectorAssembler()
.setInputCols(Array("sites_f", "s2_cells_f"))
.setOutputCol("features")

// Build logistic regression
val lr = new LogisticRegression()
.setFeaturesCol("features")
.setLabelCol("response")
.setProbabilityCol("probability")

// Define pipeline with 4 stages
val pipeline = new Pipeline()
.setStages(Array(encodeSites, encodeS2Cells, assemble, lr))

val evaluator = new BinaryClassificationEvaluator()
.setLabelCol(Response.response)

val crossValidator = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)

val paramGrid = new ParamGridBuilder()
.addGrid(lr.elasticNetParam, Array(0.1, 0.5))
.build()

crossValidator.setEstimatorParamMaps(paramGrid)
crossValidator.setNumFolds(2)

println(s"Train model on train set")
val cvModel = crossValidator.fit(trainSet)

Conclusion

The Spark ML API makes machine learning much easier. Spark Ext is a good example of how it is possible to create custom transformers/estimators that later can be used as a part of a bigger pipeline, and can be easily shared/reused by multiple projects.

Full code for example application is available on Github.