CF 04: Build Item-Based Recommendations(Python)
Loading...

The purpose of this notebook is to build and evaluate item-based collaborative filtering recommendations. This notebook is designed to run on a Databricks 7.1+ cluster.

from pyspark.ml.linalg import Vectors, VectorUDT
 
from pyspark.sql.functions import col, udf, max, collect_list, lit, monotonically_increasing_id ,expr, coalesce, pow, sum, count
from pyspark.sql.types import *
from pyspark.sql import DataFrame
 
import pandas as pd
import numpy as np
 
import math
 
import shutil

Step 1: Build Product Comparisons Dataset

When we constructed our user-based collaborative filter, we built a vector for each user representing the implied ratings across all nearly 50,000 products in the product catalog. These vectors would serve as the basis for calculating similarities between users. With about 200,000 users in the system, this resulted in about 20-billion potential user comparisons which we short-cutted using Locale Sensitivity Hashing.

But consider that the only way a recommendation between two users could be made is if a given customer bought products A and B and the other customer bought either product A or B. This provides us another way to approach the problem of using user-derived ratings, one that limits the number of comparisons by focusing on points of overlap between users:

_ = spark.conf.set('spark.sql.shuffle.partitions',sc.defaultParallelism * 100)

Let's get started by examining the number of product pairs in our dataset:

%sql  CACHE TABLE instacart.user_ratings
OK
%sql
 
SELECT 
  COUNT(*) as comparisons
FROM (
  SELECT
     a.product_id as product_a,
     b.product_id as product_b,
     COUNT(*) as users
  FROM instacart.user_ratings a
  INNER JOIN instacart.user_ratings b
    ON  a.user_id = b.user_id AND 
        a.split = b.split
  WHERE a.product_id < b.product_id AND
        a.split = 'calibration'
  GROUP BY a.product_id, b.product_id
  HAVING COUNT(*) > 1 -- exclude purchase combinations found in association with only one user
  )    
 
comparisons
1
56862332

Showing all 1 rows.

While our product catalog consists of nearly 50,000 products which in theory could supply us 1.25-billion unique product pairs, the actual number of co-occurrences observed (where there is more than one customer involved) is closer to 56-million, less than a tenth of our theoretical number. By focusing on product pairs that actually occur, we limit our calculations to those that have the potential to be relevant and greatly reduce the complexity of our problem. This is the core insight behind item-based collaborative filtering.

But how exactly should we compare products in this scenario? In our previous collaborative filter, we built a feature vector containing an entry for each of our nearly 50,000 products. If we flip the structure around, should we build a feature vector with an entry for each of our 200,000+ users?

The short answer is, No. The longer answer is that a particular product comparison is performed because a user has purchased both products in a pair. As a result, each users associated with a product pair contributes an implied rating to each side of the evaluation. But most product pairs have a limited number of customers associated with it:

%sql 
 
SELECT
  users,
  COUNT(*) as occurances
FROM (
  SELECT
     a.product_id as product_a,
     b.product_id as product_b,
     COUNT(*) as users
  FROM instacart.user_ratings a
  INNER JOIN instacart.user_ratings b
    ON  a.user_id = b.user_id AND 
        a.split = b.split
  WHERE a.product_id < b.product_id AND
        a.split = 'calibration'
  GROUP BY a.product_id, b.product_id
  HAVING COUNT(*) > 1       -- exclude purchase combinations found in association with only one user
  )
GROUP BY users
0.000.010.020.030.040.050.060.070.080.090.100.110.002.0k4.0k6.0k8.0k10k12k14k16k18k20k22k24k26k28k30kusersDensity

Aggregated (by count) in the backend.

It is suprising that the highest number of users who have purchased a given product combination is less than 30,000 and that only occurs once. If we are concerned about extreme cases such as this, we could limit the user ratings considered to a random sampling of all the available ratings once the number of users associated with a pair exceeds a certain ratio. (This idea comes directly from the Amazon paper referenced above.) Still, most combinations occur only between a very small number of users so that for each pair we simply need to construct a feature vector of a modest size in order to measure product similarities.

This leads to an insteresting question: should we consider product pairs associated with just a few users? If we include combinations purchased by only 2, 3 or some other trivially small number of users, do we start introducing products into the recommendations that might not be commonly considered? Depending on our goals, the inclusion of unusual product combinations may be a good thing or may be a bad thing. Dealing with groceries, where novelty and suprise are not typically the goal, it seems to make sense that we might exclude products with too few co-occurances. Later, we'll work to determine exactly what that the cutoff should be, but for now, let's construct our product vectors so that we might proceed with the exercise:

def compare_products( data ):
  '''
  the incoming dataset is expected to have the following structure:
     product_a, product_b, size, values_a, values_b
  '''
  
  def normalize_vector(v):
    norm = Vectors.norm(v, 2)
    ret = v.copy()
    n = v.size
    for i in range(0,n):
      ret[i] /= norm
    return ret
  
  
  # list to hold results
  results = []
  
  # for each entry in this subset of data ...
  for row in data.itertuples(index=False):
    
    # retrieve data from incoming dataset
    # -----------------------------------------------------------
    product_a = row.product_a
    values_a = row.values_a
    
    product_b = row.product_b
    values_b = row.values_b
    
    size = row.size # this value is not used but is simply passed-through
    # -----------------------------------------------------------
    
    # construct data structures for user comparisons
    # -----------------------------------------------------------
    a = Vectors.dense(values_a)
    a_norm = normalize_vector(a)
    
    b = Vectors.dense(values_b)
    b_norm = normalize_vector(b)
    # -----------------------------------------------------------
    
    # calc distance and similarity
    # -----------------------------------------------------------
    distance = math.sqrt(Vectors.squared_distance(a_norm, b_norm))
    similarity = 1 / (1 + distance)
    similarity_min = 1 / (1 + math.sqrt(2))
    similarity_rescaled = (similarity - similarity_min)/(1 - similarity_min)
   # -----------------------------------------------------------
  
    # assemble results record
    results += [(
      product_a, 
      product_b, 
      size,
      distance,
      similarity_rescaled
      )]
  
  # return results
  return pd.DataFrame(results)
 
# assemble user ratings for each product in comparison
product_comp = (
  spark
    .sql('''
      SELECT
         a.product_id as product_a,
         b.product_id as product_b,
         COUNT(*) as size,
         COLLECT_LIST(a.normalized_purchases) as values_a,
         COLLECT_LIST(b.normalized_purchases) as values_b
      FROM instacart.user_ratings a
      INNER JOIN instacart.user_ratings b
        ON  a.user_id = b.user_id AND 
            a.split = b.split
      WHERE a.product_id < b.product_id AND
            a.split = 'calibration'
      GROUP BY a.product_id, b.product_id
      HAVING COUNT(*) > 1
    ''')
  )
 
# calculate product simularities
product_sim = (
  product_comp
    .withColumn('id', monotonically_increasing_id())
    .withColumn('subset_id', expr('id % ({0})'.format(sc.defaultParallelism * 10)))
    .groupBy('subset_id')
    .applyInPandas(
      compare_products, 
      schema='''
        product_a int,
        product_b int,
        size int,
        distance double,
        similarity double
        '''
      )
  )
 
# drop any old delta lake files that might have been created
shutil.rmtree('/dbfs/mnt/instacart/gold/product_sim', ignore_errors=True)
 
# persist data for future use
(
  product_sim
    .write
    .format('delta')
    .mode('overwrite')
    .save('/mnt/instacart/gold/product_sim')
  )
 
display(
  spark.table('DELTA.`/mnt/instacart/gold/product_sim`')
  )
 
product_a
product_b
size
distance
similarity
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
816
21711
2
0
1
2011
38164
2
0.31272035331475595
0.5933276768272439
2982
15820
2
0.44516727234080183
0.47414525368791427
4116
35594
2
0.2816356513610091
0.62486832373014
5089
44234
23
0.5513014994976178
0.3933284222470769
6139
36280
2
0
1
7361
15241
3
0
1
8467
15578
3
0.24275848603147165
0.666536449074458
9689
35094
4
0.2818727208061158
0.6246219883538635
10933
22328
4
0.5965469937897496
0.3621425348854838
12218
33791
35
0.8357628286660284
0.22281006565745903
13509
31242
2
0
1
14872
38635
2
0.30502249018476485
0.6009984767886277
16519
25476
2
1.0857293826602439
0.11136314850740826
18019
44356
2
0.32976277136087784
0.5766611343789456
19591
35395
2
1.056747422635666
0.12289634044077254
21392
36765
2
0
1
23243
31371
3
0.4296946000635326
0.4869291906291726

Showing the first 1000 rows.

At this point, our work in only about half complete. We've constructed pairs for product A and product B but excluded the product B to product A pairs as well as the product A to product A pairs. (The a.product_id < b.product_id portion of the queries above are where this occurs.) Let's insert these into our data set now:

# flip product A & product B
(
  spark
    .table('DELTA.`/mnt/instacart/gold/product_sim`')
    .selectExpr(
      'product_b as product_a',
      'product_a as product_b',
      'size',
      'distance',
      'similarity'
      )
    .write
    .format('delta')
    .mode('append')
    .save('/mnt/instacart/gold/product_sim')
  )
# record entries for product A to product A (sim = 1.0)
(
  spark
    .table('instacart.user_ratings')
    .filter("split='calibration'")
    .groupBy('product_id')
      .agg(count('*').alias('size'))
    .selectExpr(
      'product_id as product_a',
      'product_id as product_b',
      'cast(size as int) as size',
      'cast(0.0 as double) as distance',
      'cast(1.0 as double) as similarity'
      )
    .write
      .format('delta')
      .mode('append')
      .save('/mnt/instacart/gold/product_sim')
  )
%sql  UNCACHE TABLE instacart.user_ratings
OK

By rethinking the problem, we've enabled a more direct approach to our comparison challenge. But how exactly do we make recommendations using this data structure?

Step 2: Build Recommendations

With our user-based collaborative filter, we generated recommendations by calculating a weighted average of user-ratings extracted from similar users. Here, we'll retrieve the products purchased by a user in our calibration period. Those products will be used to retrieve all product pairs where product A is one of the products in our purchased set. Implied ratings and similarity scores will again be used to construct weighted averages which will serve as recommendation scores and a percent rank will then be calculated to sequence the recommendations. We'll demonstrate this here for a single user, user_id 148:

%sql  CACHE TABLE instacart.user_ratings
OK
%sql
 
SELECT
  user_id,
  product_id,
  recommendation_score,
  PERCENT_RANK() OVER (PARTITION BY user_id ORDER BY recommendation_score DESC) as rank_ui
FROM (
  SELECT
    x.user_id,
    y.product_b as product_id,
    SUM(x.normalized_purchases * y.similarity) / SUM(y.similarity) as recommendation_score
  FROM instacart.user_ratings x
  INNER JOIN DELTA.`/mnt/instacart/gold/product_sim` y
    ON x.product_id=y.product_a
  WHERE 
    x.split = 'calibration' AND x.user_id=148
  GROUP BY x.user_id, y.product_b
  )
ORDER BY user_id, rank_ui
 
user_id
product_id
recommendation_score
rank_ui
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
148
18776
0.3255153835084638
0
148
24183
0.28166239837831425
0.000023235820340637126
148
29222
0.27740597388385874
0.00004647164068127425
148
35265
0.24413653763134782
0.00006970746102191138
148
24728
0.24413653763134782
0.00006970746102191138
148
36460
0.24413653763134782
0.00006970746102191138
148
11444
0.2441365376313478
0.00013941492204382275
148
41219
0.215896753633471
0.0001626507423844599
148
47228
0.21224854765714596
0.000185886562725097
148
15586
0.21160107985012191
0.00020912238306573414
148
10601
0.21138583075016673
0.00023235820340637126
148
6331
0.2108894051177961
0.00025559402374700837
148
44685
0.2096933229903948
0.0002788298440876455
148
1451
0.20736449724217754
0.00030206566442828265
148
40160
0.20558726881362638
0.0003253014847689198
148
28235
0.2048520301548491
0.00034853730510955687
148
47702
0.2031260012406499
0.000371773125450194
148
14759
0.20021168683428245
0.00039500894579083115

Showing the first 1000 rows.

As before, we have recommendations but are they any good? Let's evaluate our recommendations against the evaluation data using the same mean percent rank evaluation metric employed in the last notebook. As before, we'll limit our evaluation to a 10% sample of our all users for expediency:

%sql
 
DROP VIEW IF EXISTS random_users;
 
CREATE TEMP VIEW random_users 
AS
  SELECT user_id
  FROM (
    SELECT DISTINCT 
      user_id
    FROM instacart.user_ratings
    ) 
  WHERE rand() <= 0.10;
  
CACHE TABLE random_users;
 
SELECT * FROM random_users;
 
user_id
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
156017
103357
202521
108738
40893
108594
10703
29177
97804
94025
102775
170130
145922
82233
140403
150892
178014
202794

Showing the first 1000 rows.

eval_set = (
  spark
    .sql('''
    SELECT 
      m.user_id,
      m.product_id,
      m.r_t_ui,
      n.rank_ui
    FROM (
      SELECT
        user_id,
        product_id,
        normalized_purchases as r_t_ui
      FROM instacart.user_ratings 
      WHERE split = 'evaluation' -- the test period
        ) m
    INNER JOIN (
      SELECT
        user_id,
        product_id,
        recommendation_score,
        PERCENT_RANK() OVER (PARTITION BY user_id ORDER BY recommendation_score DESC) as rank_ui
      FROM (
        SELECT
          x.user_id,
          y.product_b as product_id,
          SUM(x.normalized_purchases * y.similarity) / SUM(y.similarity) as recommendation_score
        FROM instacart.user_ratings x
        INNER JOIN DELTA.`/mnt/instacart/gold/product_sim` y
          ON x.product_id=y.product_a
        INNER JOIN random_users z
          ON x.user_id=z.user_id
        WHERE 
          x.split = 'calibration'
        GROUP BY x.user_id, y.product_b
        )
      ) n
      ON m.user_id=n.user_id AND m.product_id=n.product_id
      ''')
  )
 
display(
  eval_set
    .withColumn('weighted_r', col('r_t_ui') * col('rank_ui') )
    .groupBy()
      .agg(
        sum('weighted_r').alias('numerator'),
        sum('r_t_ui').alias('denominator')
        )
    .withColumn('mean_percent_rank', col('numerator')/col('denominator'))
    .select('mean_percent_rank')
  )
 
mean_percent_rank
1
0.5273955264323781

Showing all 1 rows.

Wow! Our evaluation score isn't just bad, it's worse than if we had made random suggestions. How could this be?!

The most likely reason is that we are leveraging all product combinations with no consideration of how many users may have actually purchased the two products that form a product pair (as discussed earlier in this notebook). If a combination happens to be highly rated for a very small number of users, it might shoot to the top of our rankings while products more popular (and therefore exposed to a wider range of ratings) might be pushed below it. With this in mind, we might limit the products we recommend to those with a minimum number of user ratings associated with a product pair.

In addition, we might recognize that even with a user-minimum, we may still have a large number of products to recommend. If we limit our recommendations to some maximum number of top ranked products, we might further improve our ratings. It's important to remember that our dataset includes product-self recommendations, i.e. product A is most similar to product A, so that when we set this limit we want to keep in mind that one recommendation is already baked in.

Let's see how adjustments to user-minimums and product maximums affect our evaluation metric:

NOTE This step will take a while to run depending on the resources allocated to your cluster.

_ = spark.sql("CACHE TABLE instacart.user_ratings")
_ = spark.sql("CACHE TABLE DELTA.`/mnt/instacart/gold/product_sim`")
 
results = []
 
for i in range(1,21,1):
  print('Starting size = {0}'.format(i))
  
  for j in [2,3,5,7,10]:
  
    rank = (
        spark
          .sql('''
            SELECT
              SUM(r_t_ui * rank_ui) / SUM(r_t_ui) as mean_percent_rank
            FROM (
              SELECT 
                m.user_id,
                m.product_id,
                m.r_t_ui,
                n.rank_ui
              FROM (
                SELECT
                  user_id,
                  product_id,
                  normalized_purchases as r_t_ui
                FROM instacart.user_ratings 
                WHERE split = 'evaluation' -- the test period
                  ) m
              INNER JOIN (
                SELECT
                  user_id,
                  product_id,
                  recommendation_score,
                  PERCENT_RANK() OVER (PARTITION BY user_id ORDER BY recommendation_score DESC) as rank_ui
                FROM (
                  SELECT
                    user_id,
                    product_id,
                    SUM(normalized_purchases * similarity) / SUM(similarity) as recommendation_score
                  FROM (
                    SELECT
                      x.user_id,
                      y.product_b as product_id,
                      x.normalized_purchases,
                      y.similarity,
                      RANK() OVER (PARTITION BY x.user_id, x.product_id ORDER BY y.similarity DESC) as product_rank
                    FROM instacart.user_ratings x
                    INNER JOIN DELTA.`/mnt/instacart/gold/product_sim` y
                      ON x.product_id=y.product_a
                    LEFT SEMI JOIN random_users z
                      ON x.user_id=z.user_id
                    WHERE 
                      x.split = 'calibration' AND
                      y.size >= {0}
                    )
                  WHERE product_rank <= {1}
                  GROUP BY user_id, product_id
                  )
                ) n
                ON m.user_id=n.user_id AND m.product_id=n.product_id
              )
            '''.format(i,j)
              ).collect()[0]['mean_percent_rank']
        )
 
    results += [(i, j, rank)]
 
  
display(
  spark
    .createDataFrame(results, schema="min_users int, max_products int, mean_percent_rank double")
    .orderBy('min_users','max_products')
  )
12345678910111213141516171819200.140.160.180.20.220.240.26
223355771010min_usersmean_percent_rankmax_productsmax_products

It appears that constraining our recommendations a bit leads to better results. We might consider moving these constraints into the construction of the product comparisons dataset to limit our ETL cycles and improve query performance. We are not performing at the levels we saw with our user-based collaborative filters, but the results are not bad (and may even be improved with further adjustments to our logic).

%sql  
UNCACHE TABLE instacart.user_ratings;
UNCACHE TABLE random_users;
UNCACHE TABLE DELTA.`/mnt/instacart/gold/product_sim`;
OK