%pip install pystan==2.19.1.1 # per https://github.com/facebook/prophet/commit/82f3399409b7646c49280688f59e5a3d2c936d39#comments
%pip install fbprophet==0.6
from pyspark.sql.types import *
# structure of the training data set
train_schema = StructType([
StructField('date', DateType()),
StructField('store', IntegerType()),
StructField('item', IntegerType()),
StructField('sales', IntegerType())
])
# read the training file into a dataframe
train = spark.read.csv(
'dbfs:/FileStore/demand_forecast/train/train.csv',
header=True,
schema=train_schema
)
# make the dataframe queriable as a temporary view
train.createOrReplaceTempView('train')
# show data
display(train)
%sql
SELECT
YEAR(date) as year,
(
CASE
WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0
WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1
WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2
WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3
WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4
WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5
WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6
END
) % 7 as weekday,
AVG(sales) as sales
FROM (
SELECT
date,
SUM(sales) as sales
FROM train
GROUP BY date
) x
GROUP BY year, weekday
ORDER BY year, weekday;
# query to aggregate data to date (ds) level
sql_statement = '''
SELECT
CAST(date as date) as ds,
sales as y
FROM train
WHERE store=1 AND item=1
ORDER BY ds
'''
# assemble dataset in Pandas dataframe
history_pd = spark.sql(sql_statement).toPandas()
# drop any missing records
history_pd = history_pd.dropna()
The objective of this notebook is to illustrate how we might generate a large number of fine-grained forecasts at the store-item level in an efficient manner leveraging the distributed computational power of Databricks. This is a Spark 3.x update to a previously published notebook which had been developed for Spark 2.x. UPDATE marks in this notebook indicate changes in the code intended to reflect new functionality in either Spark 3.x or the Databricks platform.