Reproducible Anything: Machine Learning Meets Lakehouse(Python)

Loading...

Reproducible Anything: Machine Learning Meets Lakehouse

You can run this notebook using the following cluster config:

  • Cluster mode: single node
  • Databricks Runtime: 8.1ML
import mlflow
import mlflow.sklearn
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import pyspark.sql.functions as F
from delta.tables import *
import numpy as np
## Reading in data 
path = '/databricks-datasets/wine-quality/winequality-white.csv'
wine_df = (spark.read
           .option('header', 'true')
           .option('inferSchema', 'true')
           .option('sep', ';')
           .csv(path))
wine_df_clean = wine_df.select([F.col(col).alias(col.replace(' ', '_')) for col in wine_df.columns])
display(wine_df_clean)
 
fixed_acidity
volatile_acidity
citric_acid
residual_sugar
chlorides
free_sulfur_dioxide
total_sulfur_dioxide
density
pH
sulphates
alcohol
quality
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
7
0.27
0.36
20.7
0.045
45
170
1.001
3
0.45
8.8
6
6.3
0.3
0.34
1.6
0.049
14
132
0.994
3.3
0.49
9.5
6
8.1
0.28
0.4
6.9
0.05
30
97
0.9951
3.26
0.44
10.1
6
7.2
0.23
0.32
8.5
0.058
47
186
0.9956
3.19
0.4
9.9
6
7.2
0.23
0.32
8.5
0.058
47
186
0.9956
3.19
0.4
9.9
6
8.1
0.28
0.4
6.9
0.05
30
97
0.9951
3.26
0.44
10.1
6
6.2
0.32
0.16
7
0.045
30
136
0.9949
3.18
0.47
9.6
6
7
0.27
0.36
20.7
0.045
45
170
1.001
3
0.45
8.8
6
6.3
0.3
0.34
1.6
0.049
14
132
0.994
3.3
0.49
9.5
6
8.1
0.22
0.43
1.5
0.044
28
129
0.9938
3.22
0.45
11
6
8.1
0.27
0.41
1.45
0.033
11
63
0.9908
2.99
0.56
12
5
8.6
0.23
0.4
4.2
0.035
17
109
0.9947
3.14
0.53
9.7
5
7.9
0.18
0.37
1.2
0.04
16
75
0.992
3.18
0.63
10.8
5
6.6
0.16
0.4
1.5
0.044
48
143
0.9912
3.54
0.52
12.4
7
8.3
0.42
0.62
19.25
0.04
41
172
1.0002
2.98
0.67
9.7
5
6.6
0.17
0.38
1.5
0.032
28
112
0.9914
3.25
0.55
11.4
7
6.3
0.48
0.04
1.1
0.046
30
99
0.9928
3.24
0.36
9.6
6

Showing the first 1000 rows.

%fs mkdirs /tmp/reproducible_ml_blog
res0: Boolean = true
## Write it out as a delta table 
write_path = 'dbfs:/tmp/reproducible_ml_blog/wine_quality_white.delta'
wine_df_clean.write.format('delta').mode('overwrite').save(write_path)
## Insert a new row 
new_row = spark.createDataFrame([[7, 0.27, 0.36, 1.6, 0.045, 45, 170, 1.001, 3, 0.45, 8.8, 6]])
wine_df_extra_row = wine_df_clean.union(new_row)
display(wine_df_extra_row)
 
fixed_acidity
volatile_acidity
citric_acid
residual_sugar
chlorides
free_sulfur_dioxide
total_sulfur_dioxide
density
pH
sulphates
alcohol
quality
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
7
0.27
0.36
20.7
0.045
45
170
1.001
3
0.45
8.8
6
6.3
0.3
0.34
1.6
0.049
14
132
0.994
3.3
0.49
9.5
6
8.1
0.28
0.4
6.9
0.05
30
97
0.9951
3.26
0.44
10.1
6
7.2
0.23
0.32
8.5
0.058
47
186
0.9956
3.19
0.4
9.9
6
7.2
0.23
0.32
8.5
0.058
47
186
0.9956
3.19
0.4
9.9
6
8.1
0.28
0.4
6.9
0.05
30
97
0.9951
3.26
0.44
10.1
6
6.2
0.32
0.16
7
0.045
30
136
0.9949
3.18
0.47
9.6
6
7
0.27
0.36
20.7
0.045
45
170
1.001
3
0.45
8.8
6
6.3
0.3
0.34
1.6
0.049
14
132
0.994
3.3
0.49
9.5
6
8.1
0.22
0.43
1.5
0.044
28
129
0.9938
3.22
0.45
11
6
8.1
0.27
0.41
1.45
0.033
11
63
0.9908
2.99
0.56
12
5
8.6
0.23
0.4
4.2
0.035
17
109
0.9947
3.14
0.53
9.7
5
7.9
0.18
0.37
1.2
0.04
16
75
0.992
3.18
0.63
10.8
5
6.6
0.16
0.4
1.5
0.044
48
143
0.9912
3.54
0.52
12.4
7
8.3
0.42
0.62
19.25
0.04
41
172
1.0002
2.98
0.67
9.7
5
6.6
0.17
0.38
1.5
0.032
28
112
0.9914
3.25
0.55
11.4
7
6.3
0.48
0.04
1.1
0.046
30
99
0.9928
3.24
0.36
9.6
6

Showing the first 1000 rows.

## Write it out to delta location 
wine_df_extra_row.write.format('delta').mode('overwrite').option('overwriteSchema', 'true').save(write_path)
from delta.tables import *
 
deltaTable = DeltaTable.forPath(spark, write_path)
fullHistoryDF = deltaTable.history()    # get the full history of the table to pick the version
 
display(fullHistoryDF)
 
version
timestamp
userId
userName
operation
operationParameters
job
notebook
clusterId
readVersion
isolationLevel
isBlindAppend
operationMetrics
userMetadata
1
2
1
2021-04-21T22:02:54.000+0000
3363424249366131
marygrace.moesta@databricks.com
WRITE
{"mode": "Overwrite", "partitionBy": "[]"}
null
{"notebookId": "3874049492059911"}
0421-213622-doc453
0
WriteSerializable
false
{"numFiles": "2", "numOutputBytes": "74682", "numOutputRows": "4899"}
null
0
2021-04-21T22:02:50.000+0000
3363424249366131
marygrace.moesta@databricks.com
WRITE
{"mode": "Overwrite", "partitionBy": "[]"}
null
{"notebookId": "3874049492059911"}
0421-213622-doc453
null
WriteSerializable
false
{"numFiles": "1", "numOutputBytes": "71205", "numOutputRows": "4898"}
null

Showing all 2 rows.

## Specifying data version to use for model training
version = 1 
wine_df_delta = spark.read.format('delta').option('versionAsOf', version).load(write_path).toPandas()
display(wine_df_delta)
 
fixed_acidity
volatile_acidity
citric_acid
residual_sugar
chlorides
free_sulfur_dioxide
total_sulfur_dioxide
density
pH
sulphates
alcohol
quality
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
7
0.27
0.36
20.7
0.045
45
170
1.001
3
0.45
8.8
6
6.3
0.3
0.34
1.6
0.049
14
132
0.994
3.3
0.49
9.5
6
8.1
0.28
0.4
6.9
0.05
30
97
0.9951
3.26
0.44
10.1
6
7.2
0.23
0.32
8.5
0.058
47
186
0.9956
3.19
0.4
9.9
6
7.2
0.23
0.32
8.5
0.058
47
186
0.9956
3.19
0.4
9.9
6
8.1
0.28
0.4
6.9
0.05
30
97
0.9951
3.26
0.44
10.1
6
6.2
0.32
0.16
7
0.045
30
136
0.9949
3.18
0.47
9.6
6
7
0.27
0.36
20.7
0.045
45
170
1.001
3
0.45
8.8
6
6.3
0.3
0.34
1.6
0.049
14
132
0.994
3.3
0.49
9.5
6
8.1
0.22
0.43
1.5
0.044
28
129
0.9938
3.22
0.45
11
6
8.1
0.27
0.41
1.45
0.033
11
63
0.9908
2.99
0.56
12
5
8.6
0.23
0.4
4.2
0.035
17
109
0.9947
3.14
0.53
9.7
5
7.9
0.18
0.37
1.2
0.04
16
75
0.992
3.18
0.63
10.8
5
6.6
0.16
0.4
1.5
0.044
48
143
0.9912
3.54
0.52
12.4
7
8.3
0.42
0.62
19.25
0.04
41
172
1.0002
2.98
0.67
9.7
5
6.6
0.17
0.38
1.5
0.032
28
112
0.9914
3.25
0.55
11.4
7
6.3
0.48
0.04
1.1
0.046
30
99
0.9928
3.24
0.36
9.6
6

Showing the first 1000 rows.

## Split the data into training and test sets. (0.75, 0.25) split.
seed = 1111
train, test = train_test_split(wine_df_delta, train_size=0.75, random_state=seed)
 
## The target column is "quality" which is a scalar from [3, 9]
X_train = train.drop(['quality'], axis=1)
X_test = test.drop(['quality'], axis=1)
y_train = train[['quality']]
y_test = test[['quality']]