file_urls_to_download = [
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/AMAZON_FASHION.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_AMAZON_FASHION.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/All_Beauty.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_All_Beauty.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Appliances.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Appliances.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Arts_Crafts_and_Sewing.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Arts_Crafts_and_Sewing.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Automotive.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Automotive.json.gz',
#'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Books.json.gz', # skip these files to avoid overtaxing the data provider
#'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Books.json.gz', # skip these files to avoid overtaxing the data provider
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/CDs_and_Vinyl.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_CDs_and_Vinyl.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Cell_Phones_and_Accessories.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Cell_Phones_and_Accessories.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Clothing_Shoes_and_Jewelry.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Clothing_Shoes_and_Jewelry.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Digital_Music.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Digital_Music.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Electronics.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Electronics.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Gift_Cards.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Gift_Cards.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Grocery_and_Gourmet_Food.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Grocery_and_Gourmet_Food.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Home_and_Kitchen.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Home_and_Kitchen.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Industrial_and_Scientific.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Industrial_and_Scientific.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Kindle_Store.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Kindle_Store.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Luxury_Beauty.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Luxury_Beauty.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Magazine_Subscriptions.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Magazine_Subscriptions.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Movies_and_TV.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Movies_and_TV.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Musical_Instruments.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Musical_Instruments.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Office_Products.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Office_Products.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Patio_Lawn_and_Garden.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Patio_Lawn_and_Garden.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Pet_Supplies.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Pet_Supplies.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Prime_Pantry.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Prime_Pantry.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Software.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Software.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Sports_and_Outdoors.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Sports_and_Outdoors.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Tools_and_Home_Improvement.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Tools_and_Home_Improvement.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Toys_and_Games.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Toys_and_Games.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/categoryFiles/Video_Games.json.gz',
'http://deepyeti.ucsd.edu/jianmo/amazon/metaFiles/meta_Video_Games.json.gz'
]
if perform_download:
# clean up directories from prior runs
try:
dbutils.fs.rm(download_path, recurse=True)
except:
pass
dbutils.fs.mkdirs(download_path)
try:
dbutils.fs.rm(metadata_path, recurse=True)
except:
pass
dbutils.fs.mkdirs(metadata_path)
try:
dbutils.fs.rm(reviews_path, recurse=True)
except:
pass
dbutils.fs.mkdirs(reviews_path)
if perform_download:
# for each file to download:
for file_url in file_urls_to_download:
print(file_url)
# extract file names from the url
gz_file_name = file_url.split('/')[-1]
json_file_name = gz_file_name[:-3]
# determine where to place unzipped json
if 'meta_' in json_file_name:
json_path = metadata_path
else:
json_path = reviews_path
# download the gzipped file
request = requests.get(file_url)
with open('/dbfs' + download_path + '/' + gz_file_name, 'wb') as f:
f.write(request.content)
# decompress the file
with gzip.open('/dbfs' + download_path + '/' + gz_file_name, 'rb') as f_in:
with open('/dbfs' + json_path + '/' + json_file_name, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
# common elements of interest from json docs (only import ones actually used later)
metadata_common_schema = StructType([
StructField('asin', StringType()),
StructField('category', ArrayType(StringType())),
StructField('description', ArrayType(StringType())),
StructField('title', StringType())
])
# read json to dataframe
raw_metadata = (
spark
.read
.json(
metadata_path,
schema=metadata_common_schema
)
)
display(raw_metadata)
# add id to enable de-duplication and more efficient lookups (cache to fix id values)
metadata_with_dupes = (
metadata
.withColumn('id', monotonically_increasing_id())
).cache()
# locate first entry for each asin
first_asin = (
metadata_with_dupes
.groupBy('asin')
.agg(min('id').alias('id'))
)
# join to eliminate unmatched entries
deduped_metadata = (
metadata_with_dupes
.join(first_asin, on='id', how='leftsemi')
)
deduped_metadata.count()
Out[12]: 11543411
# delete the old table if needed
_ = spark.sql('DROP TABLE IF EXISTS reviews.metadata')
# drop any old delta lake files that might have been created
shutil.rmtree('/dbfs/mnt/reviews/silver/metadata', ignore_errors=True)
# persist as delta table
(
deduped_metadata
.repartition(sc.defaultParallelism * 4)
.write
.format('delta')
.mode('overwrite')
.save('/mnt/reviews/silver/metadata')
)
# make table queriable
_ = spark.sql('''
CREATE TABLE IF NOT EXISTS reviews.metadata
USING DELTA
LOCATION '/mnt/reviews/silver/metadata'
''')
# show data
display(
spark.table('reviews.metadata')
)
%sql
MERGE INTO reviews.metadata x
USING (
SELECT -- remove HTML from feature array
a.id,
unescape_html(
REGEXP_REPLACE(a.title, '<.+?>', '')
) as title
FROM reviews.metadata a
WHERE a.title RLIKE '<.+?>|&\\w+;' -- contains html tags & chars
) y
ON x.id = y.id
WHEN MATCHED THEN
UPDATE SET x.title = y.title
OK
%sql
MERGE INTO reviews.metadata x
USING (
SELECT -- remove HTML from feature array
a.id,
COLLECT_LIST(
unescape_html(
REGEXP_REPLACE(b.d, '<.+?>', '')
)
) as description
FROM reviews.metadata a
LATERAL VIEW explode(a.description) b as d
WHERE b.d RLIKE '<.+?>|&\\w+;' -- contains html tags & chars
GROUP BY a.id
) y
ON x.id = y.id
WHEN MATCHED THEN
UPDATE SET x.description = y.description
OK
%sql
MERGE INTO reviews.metadata x
USING (
SELECT -- only keep elements prior to html
m.id,
COLLECT_LIST( unescape_html(o.c) ) as category
FROM reviews.metadata m
INNER JOIN (
SELECT -- find first occurance of html in categories
a.id,
MIN(b.index) as first_bad_index
FROM reviews.metadata a
LATERAL VIEW posexplode(a.category) b as index,c
WHERE b.c RLIKE '<.+?>' -- contains html tags
GROUP BY a.id
) n
ON m.id=n.id
LATERAL VIEW posexplode(m.category) o as index,c
WHERE o.index < n.first_bad_index
GROUP BY m.id
) y
ON x.id=y.id
WHEN MATCHED THEN
UPDATE SET x.category=y.category
OK
# common elements of interest from json docs
reviews_common_schema = StructType([
StructField('asin', StringType()),
StructField('overall', DoubleType()),
StructField('reviewerID', StringType()),
StructField('unixReviewTime', LongType())
])
# read json to dataframe
reviews = (
spark
.read
.json(
reviews_path,
schema=reviews_common_schema
)
)
# present data for review
display(
reviews
)
# tack on sequential ID
reviews_with_duplicates = (
reviews.withColumn('rid', monotonically_increasing_id())
).cache() # cache to fix the id in place
# locate last product review by reviewer
last_review_date = (
reviews_with_duplicates
.groupBy('asin', 'reviewerID')
.agg(max('unixReviewTime').alias('unixReviewTime'))
)
# deal with possible multiple entries on a given date
last_review = (
reviews_with_duplicates
.join(last_review_date, on=['asin','reviewerID','unixReviewTime'], how='leftsemi')
.groupBy('asin','reviewerID')
.agg(min('rid').alias('rid'))
)
# locate last product review by a user
deduped_reviews = (reviews_with_duplicates
.join(last_review, on=['asin','reviewerID','rid'], how='leftsemi')
.drop('rid')
)
display(deduped_reviews)
# delete the old table if needed
_ = spark.sql('DROP TABLE IF EXISTS reviews.reviews')
# drop any old delta lake files that might have been created
shutil.rmtree('/dbfs/mnt/reviews/silver/reviews', ignore_errors=True)
# persist reviews as delta table
(
deduped_reviews
.repartition(sc.defaultParallelism * 4)
.write
.format('delta')
.mode('overwrite')
.save('/mnt/reviews/silver/reviews')
)
# make table queriable
_ = spark.sql('''
CREATE TABLE IF NOT EXISTS reviews.reviews
USING DELTA
LOCATION '/mnt/reviews/silver/reviews'
''')
The purpose of this notebook is to prepare the dataset we will use to explore content-based filtering recommenders. This notebook should be run on a Databricks ML 7.3+ cluster.
Last refresh: Never