# install required libraries dbutils.library.installPyPI('FBProphet', version='0.5') # find latest version of fbprophet here: https://pypi.org/project/fbprophet/ dbutils.library.installPyPI('holidays',version='0.9.12') # this line is in response to this issue with fbprophet 0.5: https://github.com/facebook/prophet/issues/1293 dbutils.library.installPyPI('mlflow') dbutils.library.restartPython()
import mlflow import mlflow.sklearn import shutil from pyspark.sql.types import * from pyspark.sql.functions import pandas_udf, PandasUDFType import pandas as pd import logging logging.getLogger('py4j').setLevel(logging.ERROR) from fbprophet import Prophet # structure of the dataset returned by the function result_schema =StructType([ StructField('station_id',IntegerType()), StructField('ds',TimestampType()), StructField('y', FloatType()), StructField('yhat', FloatType()), StructField('yhat_lower', FloatType()), StructField('yhat_upper', FloatType()), StructField('trend',FloatType()), StructField('trend_lower', FloatType()), StructField('trend_upper', FloatType()), StructField('multiplicative_terms', FloatType()), StructField('multiplicative_terms_lower', FloatType()), StructField('multiplicative_terms_upper', FloatType()), StructField('daily', FloatType()), StructField('daily_lower', FloatType()), StructField('daily_upper', FloatType()), StructField('weekly', FloatType()), StructField('weekly_lower', FloatType()), StructField('weekly_upper', FloatType()), StructField('yearly', FloatType()), StructField('yearly_lower', FloatType()), StructField('yearly_upper', FloatType()), StructField('additive_terms', FloatType()), StructField('additive_terms_lower', FloatType()), StructField('additive_terms_upper', FloatType()), StructField('holidays', FloatType()), StructField('holidays_lower', FloatType()), StructField('holidays_upper', FloatType()) ]) # forecast function ( result_schema, PandasUDFType.GROUPED_MAP ) def get_forecast(keys, group_pd): # DATA PREP # --------------------------------- # identify station id and hours to forecast station_id = keys[0] hours_to_forecast=keys[1] # extract valid historical data history_pd = group_pd[group_pd['is_historical']==1].dropna() # acquire holidays holidays_pd=holidays_broadcast.value # --------------------------------- # TRAIN MODEL # --------------------------------- # configure model model = Prophet( interval_width=0.80, growth='linear', daily_seasonality=True, weekly_seasonality=True, yearly_seasonality=True, holidays=holidays_pd ) # train model model.fit( history_pd ) # save models for potential later use model_path = '/dbfs/mnt/citibike/timeseries/{0}'.format(station_id) shutil.rmtree(model_path, ignore_errors=True) mlflow.sklearn.save_model( model, model_path) # --------------------------------- # FORECAST # --------------------------------- # make forecast dataframe future_pd = model.make_future_dataframe( periods=hours_to_forecast, freq='H' ) # retrieve forecast forecast_pd = model.predict(future_pd) # --------------------------------- # PREPARE RESULTS # --------------------------------- # merge forecast with history results_pd = forecast_pd.merge( history_pd[['ds','y']], how='left', on='ds', sort=True, suffixes=('_l','_r') ) # assign station to results results_pd['station_id']=station_id # --------------------------------- return results_pd[ ['station_id', 'ds', 'y', 'yhat', 'yhat_lower', 'yhat_upper', 'trend', 'trend_lower', 'trend_upper', 'multiplicative_terms', 'multiplicative_terms_lower', 'multiplicative_terms_upper', 'daily', 'daily_lower', 'daily_upper', 'weekly', 'weekly_lower', 'weekly_upper', 'yearly', 'yearly_lower', 'yearly_upper', 'additive_terms', 'additive_terms_lower', 'additive_terms_upper', 'holidays', 'holidays_lower', 'holidays_upper'] ]
ERROR:fbprophet:Importing plotly failed. Interactive plots will not work.
/databricks/spark/python/pyspark/sql/types.py:1624: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
arrow_type = pa.struct(fields)
# identify hours that should be treated as aligned with holidays holidays_pd = spark.sql(''' SELECT b.hour as ds, a.holiday as holiday FROM citibike.holidays a INNER JOIN citibike.periods b ON a.date=to_date(b.hour) ''').toPandas() # replicate a copy of the holidays dataset to each node holidays_broadcast = sc.broadcast(holidays_pd)
from pyspark.sql.functions import lit # define number of hours to forecast hours_to_forecast = 36 # assemble historical dataset for training inputs = spark.sql(''' SELECT a.station_id, a.hour as ds, COALESCE(b.rentals,0) as y, a.is_historical -- this field will be important when we bring in weather data FROM ( -- all rental hours by currently active stations SELECT y.station_id, x.hour, CASE WHEN x.hour <= y.end_date THEN 1 ELSE 0 END as is_historical FROM citibike.periods x INNER JOIN citibike.stations_most_active y ON x.hour BETWEEN y.start_date AND (y.end_date + INTERVAL {0} HOURS) ) a LEFT OUTER JOIN citibike.rentals b ON a.station_id=b.station_id AND a.hour=b.hour '''.format(hours_to_forecast) ) # forecast generation logic forecast = ( inputs .groupBy(inputs.station_id, lit(hours_to_forecast)) .apply(get_forecast) ) forecast.createOrReplaceTempView('forecast_timeseries')
from datetime import datetime # construct a visualization of the forecast predict_fig = model.plot(forecast_pd, xlabel='hour', ylabel='rentals') # adjust the x-axis to focus on a limited date range xlim = predict_fig.axes[0].get_xlim() new_xlim = (datetime.strptime('2020-01-15','%Y-%m-%d'), datetime.strptime('2020-02-03','%Y-%m-%d')) predict_fig.axes[0].set_xlim(new_xlim) # display the chart display(predict_fig)
%sql -- per station SELECT e.station_id, e.error_sum/n as MAE, e.error_sum_abs/n as MAD, e.error_sum_sqr/n as MSE, POWER(e.error_sum_sqr/n, 0.5) as RMSE, e.error_sum_abs_prop_y/n as MAPE FROM ( SELECT -- error base values x.station_id, COUNT(*) as n, SUM(x.yhat-x.y) as error_sum, SUM(ABS(x.yhat-x.y)) as error_sum_abs, SUM(POWER((x.yhat-x.y),2)) as error_sum_sqr, SUM(ABS((x.yhat-x.y)/x.y_corrected)) as error_sum_abs_prop_y, SUM(ABS((x.yhat-x.y)/x.yhat)) as error_sum_abs_prop_yhat, SUM(x.y) as sum_y, SUM(x.yhat) as sum_yhat FROM ( -- actuals vs. forecast SELECT a.station_id, a.ds as ds, CAST(COALESCE(a.y,0) as float) as y, CAST(COALESCE(a.y,1) as float) as y_corrected, a.yhat FROM citibike.forecast_timeseries a INNER JOIN citibike.stations b ON a.station_id = b.station_id AND a.ds <= b.end_date ) x GROUP BY x.station_id ) e ORDER BY e.station_id
%sql -- all stations SELECT e.error_sum/n as MAE, e.error_sum_abs/n as MAD, e.error_sum_sqr/n as MSE, POWER(e.error_sum_sqr/n, 0.5) as RMSE, e.error_sum_abs_prop_y/n as MAPE FROM ( SELECT -- error base values COUNT(*) as n, SUM(x.yhat-x.y) as error_sum, SUM(ABS(x.yhat-x.y)) as error_sum_abs, SUM(POWER((x.yhat-x.y),2)) as error_sum_sqr, SUM(ABS((x.yhat-x.y)/x.y_corrected)) as error_sum_abs_prop_y, SUM(ABS((x.yhat-x.y)/x.yhat)) as error_sum_abs_prop_yhat, SUM(x.y) as sum_y, SUM(x.yhat) as sum_yhat FROM ( -- actuals vs. forecast SELECT a.ds as ds, CAST(COALESCE(a.y,0) as float) as y, CAST(COALESCE(a.y,1) as float) as y_corrected, a.yhat FROM citibike.forecast_timeseries a INNER JOIN citibike.stations b ON a.station_id = b.station_id AND a.ds <= b.end_date ) x ) e
Forecasting using Time Series Analysis
In this notebook, we will develop an hourly forecast for each station using a scale-out pattern described here. To do this, we will use Facebook Prophet, a popular timeseries forecasting library. We will also make use of mlflow, a popular framework for the management of machine learning models, to enable the persistence of the models we generate:
Last refresh: Never