OSA 03: On-Shelf Availability(Python)

Loading...

The purpose of this notebook is to forecast sales over an historical period and then use those forecasted values to identify potential on-shelf availability concerns. This notebook has been developed by Tredence in partnership with Databricks using a Databricks 8.2 cluster.

from pyspark.sql.types import *
import pyspark.sql.functions as f
 
import pandas as pd
import numpy as np
import math
from datetime import timedelta
    
from statsmodels.tsa.holtwinters import SimpleExpSmoothing

Step 1: Access Data

In the last notebook, we identified inventory problems through the detection of excessive phantom inventory, stocking levels below safety stock thresholds and unexpected numbers of consecutive days with zero sales. We might label these out-of-stock issues in that they identify scenarios where product is simply not available to be sold.

In this notebook, we want to add a fourth inventory scenario, one where insufficient stock may not fully prevent sales but where they may cause us to miss our sales expectations. Misplacement of product in the store or displays which give the customer a sense a product is not available are both examples of the kinds of issues we might describe as on-shelf availability problems.

With this fourth scenario, we will generate a forecast for sales and identify historical sales values that were depressed relative to what was expected. These periods of lower than expected sales may then be investigated as periods potentially experiencing OSA challenges. To generate this forecast, we must first access our historical sales data:

inventory_flagged = spark.table('osa.inventory_flagged')

Step 2: Generate Forecast

Unlike most forecasting exercises, our goal is not to predict future values but instead to generate expected values for the historical period. To do this, we may make use of a variety of forecasting techniques. Most enterprises already have established preferences for sales forecasting so instead of wading into the conversation about which techniques are best in different scenarios, we will make use of a simple exponential smoothing as a placeholder technique so that we might focus on the analysis against the forecasted values in later steps.

Our challenge now is to generate a forecast for each store-SKU combination in our dataset. Leveraging a forecast scaling technique previously demonstrated, we will write a function capable of generating a forecast for a given store-SKU combination and then apply it to all store-SKU combinations in our dataset in a scalable, distributed manner:

alpha_value = 0.8 # smoothing factor
 
# function to generate a forecast for a store-sku
def get_forecast(keys, inventory_pd: pd.DataFrame) -> pd.DataFrame:
  
  # identify store and sku
  store_id = keys[0]
  sku = keys[1]
  
  # identify date range for predictions
  history_start = inventory_pd['date'].min()
  history_end = inventory_pd['date'].max()
  
  # organize data for model training
  timeseries = (
    inventory_pd
      .set_index('date', drop=True, append=False) # move date to index
      .sort_index() # sort on date-index
    )['total_sales_units'] # just need this one field
  
  # fit model to timeseries
  model = SimpleExpSmoothing(timeseries, initialization_method='heuristic').fit(smoothing_level=alpha_value)
  
  # predict sales across historical period
  predictions = model.predict(start=history_start, end=history_end)
  
  # convert timeseries to dataframe for return
  predictions_pd = predictions.to_frame(name='predicted_sales_units').reset_index() # convert to df
  predictions_pd.rename(columns={'index':'date'}, inplace=True) # rename 'index' column to 'date'
  predictions_pd['store_id'] = store_id # assign store id
  predictions_pd['sku'] = sku # assign sku
  
  return predictions_pd[['date', 'store_id', 'sku', 'predicted_sales_units']]
 
# structure of forecast function output
forecast_schema = StructType([
  StructField('date', DateType()), 
  StructField('store_id', IntegerType()), 
  StructField('sku', IntegerType()), 
  StructField('predicted_sales_units', FloatType())
  ])
# get forecasted values for each store-sku combination
forecast = (
  inventory_flagged
    .groupby(['store_id','sku'])
      .applyInPandas(
        get_forecast, 
        schema=forecast_schema
        )
    .withColumn('predicted_sales_units', f.expr('ROUND(predicted_sales_units,0)')) # round values to nearest integer
    )
 
display(forecast)
 
date
store_id
sku
predicted_sales_units
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2019-01-01
37
64
0
2019-01-02
37
64
0
2019-01-03
37
64
0
2019-01-04
37
64
4
2019-01-05
37
64
1
2019-01-06
37
64
3
2019-01-07
37
64
2
2019-01-08
37
64
2
2019-01-09
37
64
0
2019-01-10
37
64
0
2019-01-11
37
64
0
2019-01-12
37
64
1
2019-01-13
37
64
0
2019-01-14
37
64
0
2019-01-15
37
64
0
2019-01-16
37
64
2
2019-01-17
37
64
0

Truncated results, showing first 1000 rows.

(
  forecast
    .write
    .format('delta')
    .mode('overwrite')
    .option('overwriteSchema', 'true')
    .saveAsTable('osa.inventory_forecast')
  )

Step 3: Identify Off Sales Issues

With forecasts in-hand, we will now look for historical periods where there is not only a lower than expected number of sales (relative to our forecasts) but where this difference grows over a number of days. Identifying these periods may help us identify on-shelf availability (OSA) concerns we may need to address.

Of course, not every missed sales target is an OSA event. To focus our attention, we will look for periods of sustained misses where the miss is sizeable relative to our expectations. In the code that follows, we require 4-days of increasing misses with an average daily miss of 20% or more of the expected sales. Some organizations may wish to increase or decrease these threshold requirements depending on the nature of their business:

inventory_forecast = spark.table('osa.inventory_forecast')
 
osa_flag_output = (
  
  inventory_flagged.alias('inv')
    .join(inventory_forecast.alias('for'), on=['store_id','sku','date'], how='leftouter')
    .selectExpr(
      'inv.*',
      'for.predicted_sales_units'
      )
             
    # calculating difference between forecasted and actual sales units
    .withColumn('units_difference', f.expr('predicted_sales_units - total_sales_units'))
    .withColumn('units_difference', f.expr('COALESCE(units_difference, 0)'))
 
    # check whether deviation has been increasing over past 4 days
    .withColumn('osa_alert_inc_deviation', f.expr('''
      CASE 
        WHEN units_difference > LAG(units_difference, 1) OVER(PARTITION BY store_id, sku ORDER BY date) AND 
             LAG(units_difference, 1) OVER(PARTITION BY store_id, sku ORDER BY date) > LAG(units_difference, 2) OVER(PARTITION BY store_id, sku ORDER BY date) AND 
             LAG(units_difference, 2) OVER(PARTITION BY store_id, sku ORDER BY date) > LAG(units_difference, 3) OVER(PARTITION BY store_id, sku ORDER BY date)
             THEN 1
        ELSE 0 
        END'''))
    .withColumn('osa_alert_inc_deviation', f.expr('COALESCE(osa_alert_inc_deviation, 0)'))
 
    # rolling 4 day average of sales units
    .withColumn('sales_4day_avg', f.expr('AVG(total_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)'))
 
    # rolling 4 day average of forecasted units
    .withColumn('predictions_4day_avg', f.expr('AVG(predicted_sales_units) OVER(PARTITION BY store_id, sku ORDER BY date ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)'))
 
    # calculating deviation in rolling average of sales and forecast units
    .withColumn('deviation', f.expr('(predictions_4day_avg - sales_4day_avg) / (predictions_4day_avg+1)'))
    .withColumn('deviation', f.expr('COALESCE(deviation, 0)'))
 
    # Considering 20% deviation as the threshold for OSA flag
    .withColumn('off_sales_alert', f.expr('''
      CASE 
        WHEN deviation > 0.20  AND osa_alert_inc_deviation = 1 THEN 1
        ELSE 0
        END'''))
 
    .select('date', 
            'store_id', 
            'sku', 
            'predicted_sales_units', 
            'off_sales_alert',
            'oos_alert', 
            'zero_sales_flag', 
            'phantom_inventory', 
            'phantom_inventory_ind')
    )
 
display(osa_flag_output)
 
date
store_id
sku
predicted_sales_units
off_sales_alert
oos_alert
zero_sales_flag
phantom_inventory
phantom_inventory_ind
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2019-01-01
37
64
0
0
0
0
null
0
2019-01-02
37
64
0
0
0
0
null
0
2019-01-03
37
64
0
0
0
0
0
0
2019-01-04
37
64
4
0
0
0
0
0
2019-01-05
37
64
1
0
0
0
-3
0
2019-01-06
37
64
3
0
0
0
0
0
2019-01-07
37
64
2
0
0
0
-2
0
2019-01-08
37
64
2
0
0
0
2
0
2019-01-09
37
64
0
0
0
0
0
0
2019-01-10
37
64
0
0
0
0
0
0
2019-01-11
37
64
0
0
0
0
10
1
2019-01-12
37
64
1
0
0
0
1
0
2019-01-13
37
64
0
0
0
0
0
0
2019-01-14
37
64
0
0
0
0
0
0
2019-01-15
37
64
0
0
0
0
-6
1
2019-01-16
37
64
2
0
0
0
3
0
2019-01-17
37
64
0
0
0
0
0
0

Truncated results, showing first 1000 rows.

© 2021 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the Databricks License. All included or referenced third party libraries are subject to the licenses set forth below.