04 Time Series with Regressors(Python)

Forecasting using Time Series Analysis with Weather Regressors

In this notebook, we will build on the work of the previous one, incorporating hourly temperature and precipitation measurements as regressors into our time series model. As before, we will start by installing the libraries needed for this work:

# load fbprophet library
dbutils.library.installPyPI('FBProphet', version='0.5') # find latest version of fbprophet here: https://pypi.org/project/fbprophet/
dbutils.library.installPyPI('holidays', version='0.9.12') # this line is in response to this issue with fbprophet 0.5: https://github.com/facebook/prophet/issues/1293
dbutils.library.installPyPI('mlflow')
dbutils.library.restartPython()

And now we will define our function. Please note that in order to generate a prediction, future (predicted) values for temperature and precipitation must be provided. The DataFrame expected by this function includes records for both historical and future periods, with the former identified by a value of 1 in the is_historical field:

import mlflow
import mlflow.sklearn
import shutil

from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType

import pandas as pd

import logging
logging.getLogger('py4j').setLevel(logging.ERROR)

from fbprophet import Prophet

# structure of the dataset returned by the function
result_schema =StructType([
  StructField('station_id',IntegerType()),
  StructField('ds',TimestampType()),
  StructField('y', FloatType()),
  StructField('yhat', FloatType()),
  StructField('yhat_lower', FloatType()),
  StructField('yhat_upper', FloatType()),
  StructField('trend',FloatType()),
  StructField('trend_lower', FloatType()),
  StructField('trend_upper', FloatType()),
  StructField('multiplicative_terms', FloatType()),
  StructField('multiplicative_terms_lower', FloatType()),
  StructField('multiplicative_terms_upper', FloatType()),
  StructField('daily', FloatType()),
  StructField('daily_lower', FloatType()),
  StructField('daily_upper', FloatType()),
  StructField('weekly', FloatType()),
  StructField('weekly_lower', FloatType()),
  StructField('weekly_upper', FloatType()),
  StructField('yearly', FloatType()),
  StructField('yearly_lower', FloatType()),
  StructField('yearly_upper', FloatType()),
  StructField('additive_terms', FloatType()),
  StructField('additive_terms_lower', FloatType()),
  StructField('additive_terms_upper', FloatType()),
  StructField('holidays', FloatType()),
  StructField('holidays_lower', FloatType()), 
  StructField('holidays_upper', FloatType())
  ])

# forecast function
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def get_forecast(keys, group_pd):
  
  # DATA PREP
  # ---------------------------------
  # identify station id and hours to forecast
  station_id = keys[0]
  hours_to_forecast=keys[1]
  
  # extract valid historical data
  history_pd = group_pd[group_pd['is_historical']==1].dropna()
  
  # acquire holidays
  holidays_pd=holidays_broadcast.value
  # ---------------------------------  
  
  # TRAIN MODEL
  # ---------------------------------  
  # configure model
  model = Prophet(
    interval_width=0.80,
    growth='linear',
    daily_seasonality=True,
    weekly_seasonality=True,
    yearly_seasonality=True,
    holidays=holidays_pd
    )
  
  # identify the weather regressors
  model.add_regressor('temp_f', mode='multiplicative')
  model.add_regressor('precip_in', mode='multiplicative')

  # train model
  model.fit( history_pd )

  # save models for potential later use
  model_path = '/dbfs/mnt/citibike/timeseries_regressors/{0}'.format(station_id)
  shutil.rmtree(model_path, ignore_errors=True)
  mlflow.sklearn.save_model( model, model_path)
  # ---------------------------------
  
  # FORECAST
  # ---------------------------------  
  # assemble regressors
  regressors_pd = group_pd[['ds', 'temp_f', 'precip_in']]

  # assemble timeseries
  timeseries_pd = model.make_future_dataframe(
    periods=hours_to_forecast, 
    freq='H'
    )
  
  # merge timeseries with regressors to form forecast dataframe
  future_pd = timeseries_pd.merge(
    regressors_pd,
    how='left',
    on='ds',
    sort=True,
    suffixes=('_l','_r')
    )
  
  # generate forecast
  forecast_pd = model.predict(future_pd)
  # ---------------------------------
  
  # PREPARE RESULTS
  # ---------------------------------
  # merge forecast with history
  results_pd = forecast_pd.merge(
    history_pd[['ds','y']], 
    how='left', 
    on='ds',
    sort=True,
    suffixes=('_l','_r')
   )
 
  # assign station to results
  results_pd['station_id']=station_id
  # ---------------------------------
  
  return results_pd[
      ['station_id', 'ds', 
       'y', 'yhat', 'yhat_lower', 'yhat_upper',
       'trend', 'trend_lower', 'trend_upper', 
       'multiplicative_terms', 'multiplicative_terms_lower', 'multiplicative_terms_upper', 
       'daily', 'daily_lower', 'daily_upper',
       'weekly', 'weekly_lower', 'weekly_upper', 
       'yearly', 'yearly_lower', 'yearly_upper', 
       'additive_terms', 'additive_terms_lower', 'additive_terms_upper',
       'holidays', 'holidays_lower', 'holidays_upper']
        ]
ERROR:fbprophet:Importing plotly failed. Interactive plots will not work. /databricks/spark/python/pyspark/sql/types.py:1624: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working arrow_type = pa.struct(fields)

As before, we will define and replicate our holidays DataFrame:

# identify hours that should be treated as aligned with holidays
holidays_pd = spark.sql('''
    SELECT
      b.hour as ds,
      a.holiday as holiday
    FROM citibike.holidays a
    INNER JOIN citibike.periods b
      ON a.date=to_date(b.hour)
    ''').toPandas()

# replicate a copy of the holidays dataset to each node
holidays_broadcast = sc.broadcast(holidays_pd)

And now we can define the dataset and logic for generating the forecasts:

from pyspark.sql.functions import lit

# define number of hours to forecast
hours_to_forecast = 36

# assemble historical dataset for training
inputs = spark.sql('''
  SELECT
    a.station_id,
    a.hour as ds, 
    COALESCE(b.rentals,0) as y,
    c.avg_temp_f as temp_f,
    COALESCE(c.precip_in,0) as precip_in,
    a.is_historical  
  FROM ( -- all rental hours by currently active stations
    SELECT
      y.station_id,
      x.hour,
      CASE WHEN x.hour <= y.end_date THEN 1 ELSE 0 END as is_historical
    FROM citibike.periods x
    INNER JOIN citibike.stations_most_active y
     ON x.hour BETWEEN y.start_date AND (y.end_date + INTERVAL {0} HOURS)
    ) a
  LEFT OUTER JOIN citibike.rentals b
    ON a.station_id=b.station_id AND a.hour=b.hour
  LEFT OUTER JOIN citibike.weather c
    ON a.hour=c.hour
  '''.format(hours_to_forecast)
  )

# generate forecast
forecast = (
   inputs
    .groupBy('station_id', lit(hours_to_forecast))
    .apply(get_forecast)
  )
forecast.createOrReplaceTempView('forecast_timeseries_with_regressors')

We can now trigger the execution of our logic and load the resulting forecasts to a table for long-term persistence:

%sql
DROP TABLE IF EXISTS citibike.forecast_timeseries_with_regressors;

CREATE TABLE citibike.forecast_timeseries_with_regressors 
USING DELTA
AS
  SELECT *
  FROM forecast_timeseries_with_regressors;
OK

And now we can release our holidays DataFrame from memory:

holidays_broadcast.unpersist(blocking=True)

Revisiting the forecast for Station 518, E 39 St & 2 Ave:

# extract the forecast from our persisted dataset
forecast_pd = (
  spark
    .table('citibike.forecast_timeseries_with_regressors')
    .filter('station_id=518')
    ).toPandas()
# retrieve the model for this station 
model = mlflow.sklearn.load_model('/dbfs/mnt/citibike/timeseries_regressors/518')
trends_fig = model.plot_components(forecast_pd)
display(trends_fig)
from datetime import datetime

# construct a visualization of the forecast
predict_fig = model.plot(forecast_pd, xlabel='hour', ylabel='rentals')

# adjust the x-axis to focus on a limited date range
xlim = predict_fig.axes[0].get_xlim()
new_xlim = (datetime.strptime('2020-01-15','%Y-%m-%d'), datetime.strptime('2020-02-03','%Y-%m-%d'))
predict_fig.axes[0].set_xlim(new_xlim)

# display the chart
display(predict_fig)

Again, we generate our per-station evaluation metrics along with a summary metric for comparison with our other modeling techniques:

%sql -- per station
SELECT
  e.station_id,
  e.error_sum/n as MAE,
  e.error_sum_abs/n as MAD,
  e.error_sum_sqr/n as MSE,
  POWER(e.error_sum_sqr/n, 0.5) as RMSE,
  e.error_sum_abs_prop_y/n as MAPE
FROM (
  SELECT -- error base values 
    x.station_id,
    COUNT(*) as n,
    SUM(x.yhat-x.y) as error_sum,
    SUM(ABS(x.yhat-x.y)) as error_sum_abs,
    SUM(POWER((x.yhat-x.y),2)) as error_sum_sqr,
    SUM(ABS((x.yhat-x.y)/x.y_corrected)) as error_sum_abs_prop_y,
    SUM(ABS((x.yhat-x.y)/x.yhat)) as error_sum_abs_prop_yhat,
    SUM(x.y) as sum_y,
    SUM(x.yhat) as sum_yhat
  FROM ( -- actuals vs. forecast
    SELECT
      a.station_id,
      a.ds as ds,
      CAST(COALESCE(a.y,0) as float) as y,
      CAST(COALESCE(a.y,1) as float) as y_corrected,
      a.yhat
    FROM citibike.forecast_timeseries_with_regressors a
    INNER JOIN citibike.stations b
      ON a.station_id = b.station_id AND
         a.ds <= b.end_date
     ) x
   GROUP BY x.station_id
  ) e
ORDER BY e.station_id
72-0.0025313475578456362.61362645597580114.4054917394405323.79545672343138830.6361909529936978
116-0.0302400656313530053.216403502404255719.3598339549019734.3999811311984030.537796958897676
127-0.0023836752290052733.85337908400932928.829541238850755.3693147829914720.7604846070279573
128-0.0111581531954049253.677772342916597425.5772005724650035.0573906881380050.7604348703915806
151-0.0096831723388634084.12817659456724634.416381283550235.8665476460649520.7952986405155138
161-0.0436457747041875262.834061019597722314.061383199775483.74985109034685270.6145931151625206
167-0.0024320187049119063.101270926972384619.4148076676024284.4062237423447340.65850682183287
168-0.017835870402919864.16424199521493335.473277137486075.9559446889209840.7105272388032213
173-0.002327169786782273.734152231779059427.7069209459072785.2637364054355230.7618224926869134
174-0.00165268967840455752.58686594811047415.045138367589693.8788063070472710.6638078041957765
195-0.00091185627289688252.83515454075899817.104387310603384.1357450732127310.6367696730129683
212-0.041362593884519382.94731433655597516.6879924657720484.0850939359789570.624992056413543
223-0.0090741348571296442.495841744802482311.5595677844534883.3999364382960880.5083728779916487
229-0.021421552129489123.73699532358160525.4240668017934445.0422283567678130.7126829412845748
236-0.000137862119916157783.09442910574332819.0959673362960064.36989328660277550.7064860344542223
2370.0014101155771031573.07950299693201818.003043462057434.2429993474024280.6973308372419224
251-0.0054985334216233343.014743639195935517.0402760122071264.1279869200625050.6691440931688556
252-0.0070903597052224982.21234561453970859.0325604176895633.00542183689570660.5624905994743866
257-0.0117189342407374462.370929823186110212.1380103445852523.4839647450261680.5828373735949307
265-0.0068707681506507352.517977877312062712.3482058795552783.51400140574179030.6253770797913696
268-0.00727228620004610052.469925071074406312.5869942830776763.5478154240430370.581789255955278
280-0.005908131873426682.03318975742747557.4836311727782532.7356226298190790.5073707853279796
281-0.0533676899824882055.29732695449005666.972087242115728.1836475511910771.0063533762818244
284-0.019319917594228743.991922373105665728.0269388771176975.2940474947924010.6641442543893434
285-0.0139721867471510024.42882581152648936.3571216275642266.0296866939803950.7655146144444271
293-0.028021053942864844.85665672698893650.32950819038677.0943292981357090.7936256489879947
297-0.0066263061997019212.96233221078380115.8755630610613263.9844150211870910.6522624770830419
301-0.0018668979769344942.955321046797078721.1295612890112284.5966902537598970.7476807651917888
303-0.020821293904476392.412365090756674512.9377973853512053.5969149816684860.5478883318408649
304-0.0258332772633471653.350282196078502730.5421186188462925.5264924336188410.6906712065738211
305-0.00312260057783281172.709853091212654614.687039317332163.83236732547027130.6033485470106906
307-0.0120324513283711632.775663229762628715.24732789938243.90478269553920270.6784578435684812
312-0.0044369968143492782.944173588557081416.464350594335634.0576286910381080.7302458229517977
315-0.0108654748402291263.273679039010922522.7855346717148034.7734195993768240.6275117704847409
317-0.00196854115435642252.671510592765524315.4020532899581183.9245449787151270.6787382587328026
319-0.0299059068087473532.566518407020564714.2243189215375523.77151414176555240.5153515085839753
325-0.00084865470205636512.350181800544263410.6207449407835973.2589484409520190.6131091860456089
326-0.0022942619831880612.754949402830184614.3082665354455833.7826269358007780.6636380376051059
327-0.0089607533566272494.65885872867408143.80189300470236.6182998575693370.848151561331495
328-0.0059143664515962552.341828895364702410.8697390439873763.29692872898207720.5638366035893823
334-0.0050762964515483852.90140568741192216.1359459609500144.0169573013600740.6376599581918272
335-0.00196932783653793042.434583221003509710.9568184394666233.31010852382012160.5692528912866123
336-0.0073842667578694432.601917577322076612.1608434505664283.48724009075463970.6264264740823718
346-0.00275143533766950482.34701112171102310.3177048702401073.21211843963452060.5888388294690834
347-0.0353586831506884843.62929273603732927.836997597423925.2760778611980250.7042469495839689
355-0.0039767475912145962.06124028913995137.85566747929348352.80279636778940540.5325486237169635
358-0.0110920335600114084.82416931569082750.291530516912727.0916521711737050.8988950185010973
359-0.0167475026553013256.792028880520577146.3207275740019212.0963104942789031.2555987067696455
361-0.0023141373248461122.58586478775820813.0318262810924073.60996208859489350.6463443132518922
363-0.0128216780323965592.906990349420662617.0075217124292244.1240176663575560.638044511687129
368-0.005394606798373314.061654145221516529.9128724815320065.4692661739516760.7653904547617517
377-0.0247846564330503653.11891331827819727.8191209182020535.2743834633255520.7314091163511408
379-0.0083859390362105524.90870641747734747.7966787390959956.913514210522461.009219462576605
380-0.00448312861392928363.179204975073689718.3687542785956564.2858784722149620.6965561045509492
383-0.00471194655884622962.62620601811589512.698278364086643.5634643767107650.6053348376128355
387-0.033345780094069394.1859474179984537.9810121301819456.1628736909157840.839384774802301
388-0.0112339808858338743.279566409804840324.2688690729425574.9263443924417790.7144313022780764
3940.00124223348619803832.534158160123975514.3949758148343683.79407113992797560.657976817741848
401-0.00121972073147795662.85220776418957515.4673067935211463.93284970390697050.6705478563958467
402-0.0173658022627323466.442482831087864120.5836790315020310.9810600140196861.0142440741572567
403-0.0117042465078919762.43305561628197111.6315578924984743.41050698467228970.5299114666114819
405-0.059304494029666463.49718668862730322.901633336793634.7855651010924120.7235631800623961
410-0.00057165004041163962.736206397789685315.060501943837913.88078625330459430.6799055417634492
412-0.0100569644128318222.279017994106949.8504957236044593.13854993963844060.5781037631629878
417-0.009673074962322813.74770267189359928.365031423930555.3258831590573360.7897328947653961
426-0.0557555899165195566.56191111571149290.202001682684729.4974734367980581.0683238168638738
432-0.0029470303691566583.870660242577468335.3465770466312965.9452987348518740.8545895919522826
434-0.0212629610337788382.679169575133786714.4646750572037723.8032453322397930.509007950534333
435-0.0061871552510739985.012268243086837547.856004374815966.9178034356879460.7803511072477368
438-0.0101446551491765832.96283259246605720.7001713254497874.5497440945013370.7200854367162124
439-0.0264733024064877922.7806371668023215.0008343808056263.8730910628083130.6136420323503864
442-0.0062135776263207213.852897494868359628.8215627202817545.36857175795217940.7670382704372806
445-0.00118092071988254333.925552443700963238.773748842795166.226857059769010.8611300558078194
446-0.006321004085185513.361164842797106421.877231873444444.6773103246892270.6528604120406366
447-0.00135136961383581732.655322460939548313.3867559412740093.6587915957695660.6489227963910184
448-0.0048029241159685312.664939282008156813.6963541408462393.70085856806852840.6068651936731355
450-0.010408945477388243.143096123430307618.8015262650181654.3360726775525990.7030283292240725
453-0.0087591532667626453.12870988695319519.0026433991842154.35920215167686550.6893744348511887
458-0.0275120615205185743.402855713325433624.9221456965658174.99220849890765450.7515230005561051
459-0.01784854025956175.10177770989639349.680906888086227.0484684072560210.7641649337667126
461-0.00255283829554874343.291805958110977822.9058037604081124.7860008107404360.7196394707013432
462-0.0119406260822238353.64408466657759824.8865289712064564.9886399921427940.6724631906400895
465-0.0025971121100607084.49805507680132542.407122140859246.5120751025198750.893229993767752
466-0.000560885070829823.295519600071010321.033656007793684.586246396323870.7044385402002165
4680.00060329101803235033.553159326187816726.4747742297769835.1453643437347550.7415200978419503
469-0.0064313041053625442.78453914362435919.9985498814166154.4719738238742650.6518297577099936
470-0.0061724998587230582.64827835601910113.6217638487816533.69076737939166980.6355564022192369
472-0.00262330620207503053.721454272416057828.1249549954754565.3032966158301440.7848058841542775
474-0.019887540201942393.1735166730087920.5529329082400134.533534262387350.6563924783029743
476-0.006454545764048872.73549350284063314.7794467455906843.8444046022226490.6465896830331842
477-0.0115107150655271675.59229580855355870.26553123886538.3824537719491961.0961231202777917
478-0.0052156296967878152.7753382189111515.0975344581049353.88555458822867860.7285078789275449
479-0.0057327080783750453.151653267191615518.9596688866975684.3542701899052570.7213536982665689
480-0.000484566891286090262.352304208462888510.8528250149519573.29436261133348160.5698936362246023
482-0.0133535527057756023.116806414771440318.49526082708614.3006116805736020.6807919152964258
483-0.00365907324693904573.06454081384150817.512965119525884.1848494739388040.6150100681397718
484-0.0066832751205292453.259046611436064626.0319332943745045.1021498698464850.6191094501292969
485-0.0058019765016739382.41383132071369811.5986369538749873.4056771652455530.5508172257880294
486-0.0107124281258139943.467368640316971523.130894648498444.80945887273177950.7497880073546925
487-0.00361617309876002173.076739480606170322.0667943017909564.6975306600160640.7486048295663995
%sql -- all stations

SELECT
  e.error_sum/n as MAE,
  e.error_sum_abs/n as MAD,
  e.error_sum_sqr/n as MSE,
  POWER(e.error_sum_sqr/n, 0.5) as RMSE,
  e.error_sum_abs_prop_y/n as MAPE
FROM (
  SELECT -- error base values 
    COUNT(*) as n,
    SUM(x.yhat-x.y) as error_sum,
    SUM(ABS(x.yhat-x.y)) as error_sum_abs,
    SUM(POWER((x.yhat-x.y),2)) as error_sum_sqr,
    SUM(ABS((x.yhat-x.y)/x.y_corrected)) as error_sum_abs_prop_y,
    SUM(ABS((x.yhat-x.y)/x.yhat)) as error_sum_abs_prop_yhat,
    SUM(x.y) as sum_y,
    SUM(x.yhat) as sum_yhat
  FROM ( -- actuals vs. forecast
    SELECT
      a.ds as ds,
      CAST(COALESCE(a.y,0) as float) as y,
      CAST(COALESCE(a.y,1) as float) as y_corrected,
      a.yhat
    FROM citibike.forecast_timeseries_with_regressors a
    INNER JOIN citibike.stations b
      ON a.station_id = b.station_id AND
         a.ds <= b.end_date
     ) x
  ) e
-0.012720451710913913.443772949077439428.627146231170335.350434209591810.7171733147873334