from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import col, udf, max, collect_list, lit, monotonically_increasing_id ,expr, coalesce, pow, sum, count
from pyspark.sql.types import *
from pyspark.sql import DataFrame
import pandas as pd
import numpy as np
import math
import shutil
%sql
SELECT
COUNT(*) as comparisons
FROM (
SELECT
a.product_id as product_a,
b.product_id as product_b,
COUNT(*) as users
FROM instacart.user_ratings a
INNER JOIN instacart.user_ratings b
ON a.user_id = b.user_id AND
a.split = b.split
WHERE a.product_id < b.product_id AND
a.split = 'calibration'
GROUP BY a.product_id, b.product_id
HAVING COUNT(*) > 1 -- exclude purchase combinations found in association with only one user
)
%sql
SELECT
users,
COUNT(*) as occurances
FROM (
SELECT
a.product_id as product_a,
b.product_id as product_b,
COUNT(*) as users
FROM instacart.user_ratings a
INNER JOIN instacart.user_ratings b
ON a.user_id = b.user_id AND
a.split = b.split
WHERE a.product_id < b.product_id AND
a.split = 'calibration'
GROUP BY a.product_id, b.product_id
HAVING COUNT(*) > 1 -- exclude purchase combinations found in association with only one user
)
GROUP BY users
def compare_products( data ):
'''
the incoming dataset is expected to have the following structure:
product_a, product_b, size, values_a, values_b
'''
def normalize_vector(v):
norm = Vectors.norm(v, 2)
ret = v.copy()
n = v.size
for i in range(0,n):
ret[i] /= norm
return ret
# list to hold results
results = []
# for each entry in this subset of data ...
for row in data.itertuples(index=False):
# retrieve data from incoming dataset
# -----------------------------------------------------------
product_a = row.product_a
values_a = row.values_a
product_b = row.product_b
values_b = row.values_b
size = row.size # this value is not used but is simply passed-through
# -----------------------------------------------------------
# construct data structures for user comparisons
# -----------------------------------------------------------
a = Vectors.dense(values_a)
a_norm = normalize_vector(a)
b = Vectors.dense(values_b)
b_norm = normalize_vector(b)
# -----------------------------------------------------------
# calc distance and similarity
# -----------------------------------------------------------
distance = math.sqrt(Vectors.squared_distance(a_norm, b_norm))
similarity = 1 / (1 + distance)
similarity_min = 1 / (1 + math.sqrt(2))
similarity_rescaled = (similarity - similarity_min)/(1 - similarity_min)
# -----------------------------------------------------------
# assemble results record
results += [(
product_a,
product_b,
size,
distance,
similarity_rescaled
)]
# return results
return pd.DataFrame(results)
# assemble user ratings for each product in comparison
product_comp = (
spark
.sql('''
SELECT
a.product_id as product_a,
b.product_id as product_b,
COUNT(*) as size,
COLLECT_LIST(a.normalized_purchases) as values_a,
COLLECT_LIST(b.normalized_purchases) as values_b
FROM instacart.user_ratings a
INNER JOIN instacart.user_ratings b
ON a.user_id = b.user_id AND
a.split = b.split
WHERE a.product_id < b.product_id AND
a.split = 'calibration'
GROUP BY a.product_id, b.product_id
HAVING COUNT(*) > 1
''')
)
# calculate product simularities
product_sim = (
product_comp
.withColumn('id', monotonically_increasing_id())
.withColumn('subset_id', expr('id % ({0})'.format(sc.defaultParallelism * 10)))
.groupBy('subset_id')
.applyInPandas(
compare_products,
schema='''
product_a int,
product_b int,
size int,
distance double,
similarity double
'''
)
)
# drop any old delta lake files that might have been created
shutil.rmtree('/dbfs/mnt/instacart/gold/product_sim', ignore_errors=True)
# persist data for future use
(
product_sim
.write
.format('delta')
.mode('overwrite')
.save('/mnt/instacart/gold/product_sim')
)
display(
spark.table('DELTA.`/mnt/instacart/gold/product_sim`')
)
# record entries for product A to product A (sim = 1.0)
(
spark
.table('instacart.user_ratings')
.filter("split='calibration'")
.groupBy('product_id')
.agg(count('*').alias('size'))
.selectExpr(
'product_id as product_a',
'product_id as product_b',
'cast(size as int) as size',
'cast(0.0 as double) as distance',
'cast(1.0 as double) as similarity'
)
.write
.format('delta')
.mode('append')
.save('/mnt/instacart/gold/product_sim')
)
%sql
SELECT
user_id,
product_id,
recommendation_score,
PERCENT_RANK() OVER (PARTITION BY user_id ORDER BY recommendation_score DESC) as rank_ui
FROM (
SELECT
x.user_id,
y.product_b as product_id,
SUM(x.normalized_purchases * y.similarity) / SUM(y.similarity) as recommendation_score
FROM instacart.user_ratings x
INNER JOIN DELTA.`/mnt/instacart/gold/product_sim` y
ON x.product_id=y.product_a
WHERE
x.split = 'calibration' AND x.user_id=148
GROUP BY x.user_id, y.product_b
)
ORDER BY user_id, rank_ui
eval_set = (
spark
.sql('''
SELECT
m.user_id,
m.product_id,
m.r_t_ui,
n.rank_ui
FROM (
SELECT
user_id,
product_id,
normalized_purchases as r_t_ui
FROM instacart.user_ratings
WHERE split = 'evaluation' -- the test period
) m
INNER JOIN (
SELECT
user_id,
product_id,
recommendation_score,
PERCENT_RANK() OVER (PARTITION BY user_id ORDER BY recommendation_score DESC) as rank_ui
FROM (
SELECT
x.user_id,
y.product_b as product_id,
SUM(x.normalized_purchases * y.similarity) / SUM(y.similarity) as recommendation_score
FROM instacart.user_ratings x
INNER JOIN DELTA.`/mnt/instacart/gold/product_sim` y
ON x.product_id=y.product_a
INNER JOIN random_users z
ON x.user_id=z.user_id
WHERE
x.split = 'calibration'
GROUP BY x.user_id, y.product_b
)
) n
ON m.user_id=n.user_id AND m.product_id=n.product_id
''')
)
display(
eval_set
.withColumn('weighted_r', col('r_t_ui') * col('rank_ui') )
.groupBy()
.agg(
sum('weighted_r').alias('numerator'),
sum('r_t_ui').alias('denominator')
)
.withColumn('mean_percent_rank', col('numerator')/col('denominator'))
.select('mean_percent_rank')
)
_ = spark.sql("CACHE TABLE instacart.user_ratings")
_ = spark.sql("CACHE TABLE DELTA.`/mnt/instacart/gold/product_sim`")
results = []
for i in range(1,21,1):
print('Starting size = {0}'.format(i))
for j in [2,3,5,7,10]:
rank = (
spark
.sql('''
SELECT
SUM(r_t_ui * rank_ui) / SUM(r_t_ui) as mean_percent_rank
FROM (
SELECT
m.user_id,
m.product_id,
m.r_t_ui,
n.rank_ui
FROM (
SELECT
user_id,
product_id,
normalized_purchases as r_t_ui
FROM instacart.user_ratings
WHERE split = 'evaluation' -- the test period
) m
INNER JOIN (
SELECT
user_id,
product_id,
recommendation_score,
PERCENT_RANK() OVER (PARTITION BY user_id ORDER BY recommendation_score DESC) as rank_ui
FROM (
SELECT
user_id,
product_id,
SUM(normalized_purchases * similarity) / SUM(similarity) as recommendation_score
FROM (
SELECT
x.user_id,
y.product_b as product_id,
x.normalized_purchases,
y.similarity,
RANK() OVER (PARTITION BY x.user_id, x.product_id ORDER BY y.similarity DESC) as product_rank
FROM instacart.user_ratings x
INNER JOIN DELTA.`/mnt/instacart/gold/product_sim` y
ON x.product_id=y.product_a
LEFT SEMI JOIN random_users z
ON x.user_id=z.user_id
WHERE
x.split = 'calibration' AND
y.size >= {0}
)
WHERE product_rank <= {1}
GROUP BY user_id, product_id
)
) n
ON m.user_id=n.user_id AND m.product_id=n.product_id
)
'''.format(i,j)
).collect()[0]['mean_percent_rank']
)
results += [(i, j, rank)]
display(
spark
.createDataFrame(results, schema="min_users int, max_products int, mean_percent_rank double")
.orderBy('min_users','max_products')
)
The purpose of this notebook is to build and evaluate item-based collaborative filtering recommendations. This notebook is designed to run on a Databricks 7.1+ cluster.
Last refresh: Never