03 Time Series(Python)

Forecasting using Time Series Analysis

In this notebook, we will develop an hourly forecast for each station using a scale-out pattern described here. To do this, we will use Facebook Prophet, a popular timeseries forecasting library. We will also make use of mlflow, a popular framework for the management of machine learning models, to enable the persistence of the models we generate:

# install required libraries
dbutils.library.installPyPI('FBProphet', version='0.5') # find latest version of fbprophet here: https://pypi.org/project/fbprophet/
dbutils.library.installPyPI('holidays',version='0.9.12') # this line is in response to this issue with fbprophet 0.5: https://github.com/facebook/prophet/issues/1293
dbutils.library.installPyPI('mlflow')
dbutils.library.restartPython()

We will now implement a function for the training of models and generating of forecasts on a per-station basis. Notice this function, unlike the functions defined in the previously reference blog post, accepts the keys on which our data is grouped, i.e. the station id and the hours for which to produce the forecast. Notice too that the incoming dataset consists of historical records on which the model will be trained and future records for which forecasts should be generated. In this specific scenario, this approach is unnecessary, but in later scenarios we will need to structure our data this way so that weather predictions aligned with future periods can more easily be passed into the function. The inclusion of these data and the subsequent filtering for historical data is therefore simply for consistency purposes with future notebooks:

import mlflow
import mlflow.sklearn
import shutil

from pyspark.sql.types import *
from pyspark.sql.functions import pandas_udf, PandasUDFType

import pandas as pd

import logging
logging.getLogger('py4j').setLevel(logging.ERROR)

from fbprophet import Prophet

# structure of the dataset returned by the function
result_schema =StructType([
  StructField('station_id',IntegerType()),
  StructField('ds',TimestampType()),
  StructField('y', FloatType()),
  StructField('yhat', FloatType()),
  StructField('yhat_lower', FloatType()),
  StructField('yhat_upper', FloatType()),
  StructField('trend',FloatType()),
  StructField('trend_lower', FloatType()),
  StructField('trend_upper', FloatType()),
  StructField('multiplicative_terms', FloatType()),
  StructField('multiplicative_terms_lower', FloatType()),
  StructField('multiplicative_terms_upper', FloatType()),
  StructField('daily', FloatType()),
  StructField('daily_lower', FloatType()),
  StructField('daily_upper', FloatType()),
  StructField('weekly', FloatType()),
  StructField('weekly_lower', FloatType()),
  StructField('weekly_upper', FloatType()),
  StructField('yearly', FloatType()),
  StructField('yearly_lower', FloatType()),
  StructField('yearly_upper', FloatType()),
  StructField('additive_terms', FloatType()),
  StructField('additive_terms_lower', FloatType()),
  StructField('additive_terms_upper', FloatType()),
  StructField('holidays', FloatType()),
  StructField('holidays_lower', FloatType()), 
  StructField('holidays_upper', FloatType())
  ])

# forecast function
@pandas_udf( result_schema, PandasUDFType.GROUPED_MAP )
def get_forecast(keys, group_pd):
  
  # DATA PREP
  # ---------------------------------
  # identify station id and hours to forecast
  station_id = keys[0]
  hours_to_forecast=keys[1]
  
  # extract valid historical data
  history_pd = group_pd[group_pd['is_historical']==1].dropna()
  
  # acquire holidays
  holidays_pd=holidays_broadcast.value
  # ---------------------------------  
  
  # TRAIN MODEL
  # --------------------------------- 
  # configure model
  model = Prophet(
      interval_width=0.80,
      growth='linear',
      daily_seasonality=True,
      weekly_seasonality=True,
      yearly_seasonality=True,
      holidays=holidays_pd
      )
      
  # train model
  model.fit( history_pd )

  # save models for potential later use
  model_path = '/dbfs/mnt/citibike/timeseries/{0}'.format(station_id)
  shutil.rmtree(model_path, ignore_errors=True)
  mlflow.sklearn.save_model( model, model_path)
  # ---------------------------------
  
  # FORECAST
  # ---------------------------------  
  # make forecast dataframe
  future_pd = model.make_future_dataframe(
    periods=hours_to_forecast, 
    freq='H'
   )
  
  # retrieve forecast
  forecast_pd = model.predict(future_pd)
  # ---------------------------------
  
  # PREPARE RESULTS
  # ---------------------------------
  # merge forecast with history
  results_pd = forecast_pd.merge(
    history_pd[['ds','y']], 
    how='left', 
    on='ds',
    sort=True,
    suffixes=('_l','_r')
   )
 
  # assign station to results
  results_pd['station_id']=station_id
  # ---------------------------------
  
  return results_pd[
      ['station_id', 'ds', 
       'y', 'yhat', 'yhat_lower', 'yhat_upper',
       'trend', 'trend_lower', 'trend_upper', 
       'multiplicative_terms', 'multiplicative_terms_lower', 'multiplicative_terms_upper', 
       'daily', 'daily_lower', 'daily_upper',
       'weekly', 'weekly_lower', 'weekly_upper', 
       'yearly', 'yearly_lower', 'yearly_upper', 
       'additive_terms', 'additive_terms_lower', 'additive_terms_upper',
       'holidays', 'holidays_lower', 'holidays_upper']
        ]
ERROR:fbprophet:Importing plotly failed. Interactive plots will not work. /databricks/spark/python/pyspark/sql/types.py:1624: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working arrow_type = pa.struct(fields)

In the function, we are using Prophet as our timeseries learner. This model type has the ability to accomodate holiday information. When dates are identified as holidays, the values on those dates are not used to train the more general timeseries model and a holiday-specific model is produced to deal with anomolous behavior which may be associated with those points in time.

The holiday dataset passed to Prophet is extracted from a variable called holidays_broadcast which has yet to be defined. We'll define that now as a pandas DataFrame deployed as a Spark broadcast variable which makes a replica of the DataFrame available on each worker node. Managing the variable this way will minimize the amount of data transfer required to send holiday information to our function:

# identify hours that should be treated as aligned with holidays
holidays_pd = spark.sql('''
    SELECT
      b.hour as ds,
      a.holiday as holiday
    FROM citibike.holidays a
    INNER JOIN citibike.periods b
      ON a.date=to_date(b.hour)
    ''').toPandas()

# replicate a copy of the holidays dataset to each node
holidays_broadcast = sc.broadcast(holidays_pd)

With everything in place, we can now define the historical dataset from which we will generate our forecasts.

Please note there are much easier ways to generate the dataset required for this analysis. We are writing our query in this manner in preperation for adding weather data to the dataset in our next notebook:

from pyspark.sql.functions import lit

# define number of hours to forecast
hours_to_forecast = 36

# assemble historical dataset for training
inputs = spark.sql('''
   SELECT
    a.station_id,
    a.hour as ds, 
    COALESCE(b.rentals,0) as y,
    a.is_historical  -- this field will be important when we bring in weather data
  FROM ( -- all rental hours by currently active stations
    SELECT
      y.station_id,
      x.hour,
      CASE WHEN x.hour <= y.end_date THEN 1 ELSE 0 END as is_historical
    FROM citibike.periods x
    INNER JOIN citibike.stations_most_active y
     ON x.hour BETWEEN y.start_date AND (y.end_date + INTERVAL {0} HOURS)
    ) a
  LEFT OUTER JOIN citibike.rentals b
    ON a.station_id=b.station_id AND a.hour=b.hour
  '''.format(hours_to_forecast)
  )

# forecast generation logic
forecast = (
  inputs
    .groupBy(inputs.station_id, lit(hours_to_forecast))
    .apply(get_forecast)
  )
forecast.createOrReplaceTempView('forecast_timeseries')

With everything in place, we will now trigger the execution of our logic and load the resulting forecasts to a table for long-term persistence:

%sql

DROP TABLE IF EXISTS citibike.forecast_timeseries;

CREATE TABLE citibike.forecast_timeseries 
USING DELTA
AS
  SELECT *
  FROM forecast_timeseries;
OK

With our training work completed, we can release our holidays DataFrame, currently pinned in the memory of our worker nodes:

holidays_broadcast.unpersist(blocking=True)

With model training and forecasting completed, let's examine the forecast generated for one of the more popular stations, Station 518 at E 39 St & 2 Ave:

# extract the station's forecast from our persisted dataset
forecast_pd = (
  spark
    .table('citibike.forecast_timeseries')
    .filter('station_id=518')
    ).toPandas()
# retrieve the model for this station 
model = mlflow.sklearn.load_model('/dbfs/mnt/citibike/timeseries/518')

Here, we can examine the model components:

trends_fig = model.plot_components(forecast_pd)
display(trends_fig)

And here we can visualize the forecast:

from datetime import datetime

# construct a visualization of the forecast
predict_fig = model.plot(forecast_pd, xlabel='hour', ylabel='rentals')

# adjust the x-axis to focus on a limited date range
xlim = predict_fig.axes[0].get_xlim()
new_xlim = (datetime.strptime('2020-01-15','%Y-%m-%d'), datetime.strptime('2020-02-03','%Y-%m-%d'))
predict_fig.axes[0].set_xlim(new_xlim)

# display the chart
display(predict_fig)

Of course, a better way to evaluate the accuracy of our models is through the calculation of evaluation metrics. We can do this per station or in aggregate across all stations with a little SQL:

%sql -- per station
SELECT
  e.station_id,
  e.error_sum/n as MAE,
  e.error_sum_abs/n as MAD,
  e.error_sum_sqr/n as MSE,
  POWER(e.error_sum_sqr/n, 0.5) as RMSE,
  e.error_sum_abs_prop_y/n as MAPE
FROM (
  SELECT -- error base values 
    x.station_id,
    COUNT(*) as n,
    SUM(x.yhat-x.y) as error_sum,
    SUM(ABS(x.yhat-x.y)) as error_sum_abs,
    SUM(POWER((x.yhat-x.y),2)) as error_sum_sqr,
    SUM(ABS((x.yhat-x.y)/x.y_corrected)) as error_sum_abs_prop_y,
    SUM(ABS((x.yhat-x.y)/x.yhat)) as error_sum_abs_prop_yhat,
    SUM(x.y) as sum_y,
    SUM(x.yhat) as sum_yhat
  FROM ( -- actuals vs. forecast
    SELECT
      a.station_id,
      a.ds as ds,
      CAST(COALESCE(a.y,0) as float) as y,
      CAST(COALESCE(a.y,1) as float) as y_corrected,
      a.yhat
    FROM citibike.forecast_timeseries a
    INNER JOIN citibike.stations b
      ON a.station_id = b.station_id AND
         a.ds <= b.end_date
     ) x
   GROUP BY x.station_id
  ) e
ORDER BY e.station_id
72-0.000119885611132684222.65433851594426314.829426802909023.85089947972016280.6458016098715942
1160.000031031870306077633.297175860157310520.3691409071179384.513218464368630.5485715092678346
1270.000068163758254454483.918241798154838329.8738891541268365.4657011585090190.7812522701510267
1280.000056250860022319413.755890643351206526.7630342917463545.1733001354789340.7821018344714777
151-0.000027233898534518074.24608396578486436.354771666355586.0294918248850440.821874206890073
161-0.000035160822708314012.931848799998239715.2075224178122983.89968234832175750.6313183410056851
167-0.000087994469409604953.12910798596990319.791730771505664.44878980976913050.667282897594496
168-0.0000445700329589994164.21202691057912136.811245576835086.067227173663030.7256295385013559
173-0.000196960371379366673.78047226641246728.3855895115069825.3278128262455860.7704396143716804
174-8.931353836156645e-72.605921240101957415.2529683577204663.9055048787218880.6684446951372169
1950.000117058522919171472.862097742241873617.502312809560044.1835765571529870.6427450366406864
2120.000116424028238015073.0151283822867817.6824053974193164.2050452313166990.6402957337803526
2230.0000371104529012053442.560450201363282412.0813343137948923.47582138692351260.5191453255303174
229-0.0000101660994629963573.819786917905173826.7306340263708725.170167698089770.7321264472304823
236-0.00012229665618090733.1486095620513319.679113824267334.4361147217207230.7230445401245561
237-0.00035880262850933413.13645006214484818.6108044408693554.3140241585866620.713463373633336
2510.0000423228358677880563.10006662599862817.976306270230874.2398474347823970.6922856033672304
252-0.0000300036209416353362.26712318500434369.472132061309533.07768290460689450.5769428723294143
2570.0000185356804797278752.405464862964007612.5318989385802523.54004222271151070.5905857996879993
2650.000024234783425491642.571764379055051712.8765510440452253.5883911498114620.6403235464404562
2680.00020138413480883282.50948969257662313.0548894955496493.6131550610995990.5926419602357431
280-0.000024772204548171572.07021698091076367.7824693852541622.78970775982972130.5184604946609234
2810.00014050705722674265.38264755777051269.961654688554268.3643083807661151.0288886600627078
2840.000273176025248298274.11004370947536129.941790438286065.4719092132715490.6853226810961941
2850.000136408133616177184.53509407432564538.394032407217436.1962918271509310.7868087696482655
2930.000182989680034678254.98608121628057552.7758644709286467.2646998885658480.8164464849772184
297-0.00013781900317501893.02041868875148216.5314659487917844.0658905480585410.6669420998826642
301-0.000157677601963773242.995227518680870621.5050114106239754.6373496105667920.7604976902251358
3030.000092254663135347192.449470242772642213.5010819025925173.67438183951974650.5591666937868031
3040.0000384840509403916343.384115953071999531.5580954814123985.6176592528750260.7014943959380963
305-0.0000162237422595194032.74490417900468315.1402770034929083.89105088677762060.6146596420352128
3070.000047021857660911042.82775754190270615.7859741482265153.97315669817168350.6932174537670708
3120.00008178754624880373.00267346260672717.039089617933354.12784321624905550.7461236903259593
315-0.000077599344085927623.331148133767674523.797461479475594.8782641871341480.639316197053307
3170.00021624525083844132.698936687763830615.611772285664763.9511735327197110.6865060438694917
319-0.000034064579531618992.61926716407731614.8704241579947343.85621889394193130.522028433264878
3250.000006094910279626812.385183718103673510.9229181498999973.3049838350436750.6226603829235001
326-0.0000138437675680013472.80081222811359414.7481681372689873.8403343783151210.6769176739806178
327-0.0000460419268022677664.75641710805875345.707809843912496.760755123794420.8741973176790019
328-0.000302089865668454252.3847159731209711.3411344027051283.36766007825984830.5725844239198413
3340.0000117663337754568772.954638717106173716.8065512375505224.0995793976395340.6515788503483378
3350.000039613077080752392.46273907740427711.345636544355313.36832844959563140.5755043709833728
336-0.0000156583653924990932.672175520969830612.8486303216042283.58449861509308270.6458094658101508
3460.000050572644751036412.39286896775508410.7288198155784673.27548772178716430.6017224749450666
3470.000153724979866060063.695227998090788329.0703414419410935.3916918904867970.7200529834097599
355-0.00006127078531078312.09761157690830928.1757657441362372.8593295969748290.5435884782771557
358-0.000122401099715452734.92855820319978452.3508286183031767.23538724729389850.9287859215023331
3590.0000589978906608304666.785046747496446147.311485338546912.1371942943394831.2550283666861026
3610.000086575962631100852.6287771901837913.430280284820933.66473468136793330.658928590255346
363-0.000039764880594170442.98070120313113317.759690162533534.2142247403921790.6583831527174238
3680.000195961788775558684.18668529240209231.6754367920017585.62809353085054550.797127996877063
3770.000279571749472450573.129244122026253528.4333850945471025.3322964184811690.7323541374989254
3790.000029873061919354434.95255752750333448.854400922813296.9895923287995331.0215027942299555
3800.000059429201744039323.26007387862668519.252616369824624.3877803465789650.7221493481865116
383-0.000095999811875656432.685993803898890413.2453217607793333.63941228233066960.6248738462024097
3870.000022414615803185764.314963448933843540.1987100666305546.34024526864935640.8754014958276868
3880.0000083187152157614993.308650818449672624.7896474589111234.9789203105604260.7250711243252617
394-0.0000069132645181440112.560071579637862414.5745061308773463.8176571520865180.6659583438359248
401-0.000186911428484219682.904314904695785715.9761980486589653.99702364874902430.6858930099004095
402-0.00048906402350370556.480653252184288123.7017272897153511.1221278220363631.0262462059135715
403-0.000057368875344725262.494420672029349412.097824314649313.4781926793450230.5441449861371275
4050.000099041393458145573.608648420829232424.697745380428514.9696826237123540.7483801894092346
4100.0000031275276508200952.775640388650968415.4413360840215083.92954654941527660.6908139140968061
4120.0000139609602918430872.343631764097658310.4388912168072323.23092729983316570.5964056926345799
417-0.000083689221398409883.78032866128571529.065750291073415.3912661120624920.7992525336225161
4260.000064331541594965866.7753168881575396.481321056649669.8224905729987691.1225774541237745
4320.000204328317799874083.925439283086932535.960479224818475.9967056976992350.8698285602164758
434-0.000031408962268851382.749111145394382415.2235434224941873.9017359498682360.5108097921354059
435-0.000219923606915642325.11136992301805250.253024463696627.08893676538990.7975641096450921
438-0.0000066624610086901193.01333682751273321.1697877020282344.60106375765737850.7361036135330217
4390.00018696832556613032.8523448414231815.6487593399216463.955851278792170.629499494321835
442-0.000071359233551819553.895702206065283529.647588852962745.4449599496197160.7756406555474448
4450.000081092290598177823.98368934417981439.499341305292016.2848501418325020.8767402850314611
4460.000055337917486117893.424767360092983522.8725029612664174.78252056569194650.6707719932208213
4470.000068208526043437022.703469313017026613.8860919299536193.7264046921870440.6615802715155357
4480.00016451369109056962.70048976439095214.0585353627701973.74947134443913250.6165361728032513
450-0.00015318024893633453.188114251190373519.3819179462536484.4024899711701380.7147208864601213
4530.0000133346710175317043.18703703669885319.72512558304484.4412977363654420.7073165182597149
458-0.00000377822583314433073.481151926980373526.231423242412625.1216621562157560.7689060512273471
459-0.000238756374880200225.19797867120401151.9415038441256567.207045430974170.7835979144207996
4610.000104141858922734893.33532411072914723.456014316576584.8431409556791320.7337120519591142
462-0.000127270779096905423.730859641220039426.1660082912651265.1152720642469380.6978342704640198
4650.000169508792142791544.55210005743577943.3663337618352646.5853119714889190.9048975316829947
4660.000123115697503718223.337046669134625421.6540675294555634.6533931200206550.7171597452168547
468-0.000120654995424950263.60024962956504727.093088666299485.2051021763553770.7523125374365757
4690.0000442340967072433662.803851815095320420.3024809007708064.5058274379708340.6569003285939183
470-0.000074196209127778932.697266821559811714.1198444889438553.757638153008330.646415696717036
472-0.000206192261292865343.753155912936447328.6863198089392065.35596114707147650.7964280932577643
474-0.000145396439511755523.215951260472232221.248742989939594.6096358847461690.6628149078445584
476-0.000177383022331899822.783048490499502315.2971671333185383.91115930809760530.6614649362256142
477-0.000306650241762174575.64099368480908171.638684513280478.4639638771252131.1073783217026145
4780.00018768153182756432.821230553413360415.564319590300453.94516407647393660.7420466417320926
4790.000148687361740834333.217387504845976619.8095335125311574.4507902121456090.7376454580806308
4800.000076155767704121892.377065437456520311.0673148034103623.32675740074480950.5765928190810328
4820.00018778428940783653.185027588759161319.4005087407353444.40460086054745050.7002401214351399
483-0.000072487231317168453.12354786675157918.16768456716554.2623566916865960.6319804998926188
4840.000108104589234526743.27378620663795926.384643339566335.1365984210921460.6221645279353542
4850.0000503032905645143462.437112780887047411.8775331127731973.44637971105523940.5582616073622043
486-0.0000110136721787679733.519749185068516523.9483858262635184.8937088007219560.7626324781964144
487-0.0000085104756308890293.106707233307022722.424007632971094.7353994164136880.7560880528226078
%sql -- all stations

SELECT
  e.error_sum/n as MAE,
  e.error_sum_abs/n as MAD,
  e.error_sum_sqr/n as MSE,
  POWER(e.error_sum_sqr/n, 0.5) as RMSE,
  e.error_sum_abs_prop_y/n as MAPE
FROM (
  SELECT -- error base values 
    COUNT(*) as n,
    SUM(x.yhat-x.y) as error_sum,
    SUM(ABS(x.yhat-x.y)) as error_sum_abs,
    SUM(POWER((x.yhat-x.y),2)) as error_sum_sqr,
    SUM(ABS((x.yhat-x.y)/x.y_corrected)) as error_sum_abs_prop_y,
    SUM(ABS((x.yhat-x.y)/x.yhat)) as error_sum_abs_prop_yhat,
    SUM(x.y) as sum_y,
    SUM(x.yhat) as sum_yhat
  FROM ( -- actuals vs. forecast
    SELECT
      a.ds as ds,
      CAST(COALESCE(a.y,0) as float) as y,
      CAST(COALESCE(a.y,1) as float) as y_corrected,
      a.yhat
    FROM citibike.forecast_timeseries a
    INNER JOIN citibike.stations b
      ON a.station_id = b.station_id AND
         a.ds <= b.end_date
     ) x
  ) e
-0.0000054200487372667373.502618154425708229.6146560765872025.4419349570338680.7319380501505133