alpha_value = 0.8 # smoothing factor
# function to generate a forecast for a store-sku
def get_forecast(keys, inventory_pd: pd.DataFrame) -> pd.DataFrame:
# identify store and sku
store_id = keys[0]
sku = keys[1]
# identify date range for predictions
history_start = inventory_pd['date'].min()
history_end = inventory_pd['date'].max()
# organize data for model training
timeseries = (
inventory_pd
.set_index('date', drop=True, append=False) # move date to index
.sort_index() # sort on date-index
)['total_sales_units'] # just need this one field
# fit model to timeseries
model = SimpleExpSmoothing(timeseries, initialization_method='heuristic').fit(smoothing_level=alpha_value)
# predict sales across historical period
predictions = model.predict(start=history_start, end=history_end)
# convert timeseries to dataframe for return
predictions_pd = predictions.to_frame(name='predicted_sales_units').reset_index() # convert to df
predictions_pd.rename(columns={'index':'date'}, inplace=True) # rename 'index' column to 'date'
predictions_pd['store_id'] = store_id # assign store id
predictions_pd['sku'] = sku # assign sku
return predictions_pd[['date', 'store_id', 'sku', 'predicted_sales_units']]
# structure of forecast function output
forecast_schema = StructType([
StructField('date', DateType()),
StructField('store_id', IntegerType()),
StructField('sku', IntegerType()),
StructField('predicted_sales_units', FloatType())
])
# get forecasted values for each store-sku combination
forecast = (
inventory_flagged
.groupby(['store_id','sku'])
.applyInPandas(
get_forecast,
schema=forecast_schema
)
.withColumn('predicted_sales_units', f.expr('ROUND(predicted_sales_units,0)')) # round values to nearest integer
)
display(forecast)
inventory_forecast = spark.table('osa.inventory_forecast')
osa_flag_output = (
inventory_flagged.alias('inv')
.join(inventory_forecast.alias('for'), on=['store_id','sku','date'], how='leftouter')
.selectExpr(
'inv.*',
'for.predicted_sales_units'
)
# calculating difference between forecasted and actual sales units
.withColumn('units_difference', f.expr('predicted_sales_units - total_sales_units'))
.withColumn('units_difference', f.expr('COALESCE(units_difference, 0)'))
# check whether deviation has been increasing over past 4 days
.withColumn('osa_alert_inc_deviation', f.expr('''
CASE
WHEN units_difference > LAG(units_difference, 1) OVER(PARTITION BY store_id, sku ORDER BY date) AND
LAG(units_difference, 1) OVER(PARTITION BY store_id, sku ORDER BY date) > LAG(units_difference, 2) OVER(PARTITION BY store_id, sku ORDER BY date) AND
LAG(units_difference, 2) OVER(PARTITION BY store_id, sku ORDER BY date) > LAG(units_difference, 3) OVER(PARTITION BY store_id, sku ORDER BY date)
THEN 1
ELSE 0
END'''))
.withColumn('osa_alert_inc_deviation', f.expr('COALESCE(osa_alert_inc_deviation, 0)'))
# rolling 4 day average of sales units
.withColumn('sales_4day_avg', f.expr('AVG(total_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)'))
# rolling 4 day average of forecasted units
.withColumn('predictions_4day_avg', f.expr('AVG(predicted_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)'))
# calculating deviation in rolling average of sales and forecast units
.withColumn('deviation', f.expr('(predictions_4day_avg - sales_4day_avg) / (predictions_4day_avg+1)'))
.withColumn('deviation', f.expr('COALESCE(deviation, 0)'))
# Considering 20% deviation as the threshold for OSA flag
.withColumn('off_sales_alert', f.expr('''
CASE
WHEN deviation > 0.20 AND osa_alert_inc_deviation = 1 THEN 1
ELSE 0
END'''))
.select('date',
'store_id',
'sku',
'predicted_sales_units',
'off_sales_alert',
'oos_alert',
'zero_sales_flag',
'phantom_inventory',
'phantom_inventory_ind')
)
display(osa_flag_output)
‹ Back to Table of Contents