use_gpu = True username = spark.sql("select current_user()").first()['current_user()'] tmp_dir = f"/tmp/{username}" tmp_experiment_dir = f"/Users/{username}/SDV" print(f"Using tmp_dir: {tmp_dir}") print(f"Using tmp_experiment_dir: {tmp_experiment_dir}")
Using tmp_dir: /tmp/sean.owen@databricks.com
Using tmp_experiment_dir: /Users/sean.owen@databricks.com/SDV
# Stick to reliable data in 2009-2016 years train_df, test_df = spark.read.format("delta").load("/databricks-datasets/nyctaxi/tables/nyctaxi_yellow").\ filter("YEAR(pickup_datetime) >= 2009 AND YEAR(pickup_datetime) <= 2016").\ drop("total_amount").\ sample(0.0005, seed=42).\ randomSplit([0.9, 0.1], seed=42) table_nyctaxi = train_df.toPandas().sample(frac=1, random_state=42, ignore_index=True) # random shuffle for good measure table_nyctaxi.head(5)
from sdv.metadata.dataset import Metadata from sdv.lite import TabularPreset metadata = Metadata() metadata.add_table(name="nyctaxi_yellow", data=table_nyctaxi) model = TabularPreset(name='FAST_ML', metadata=metadata.get_table_meta("nyctaxi_yellow")) model.fit(table_nyctaxi) model.sample(num_rows=5, randomize_samples=False)
from sdmetrics.reports.single_table import QualityReport report = QualityReport() report.generate(table_nyctaxi, model.sample(num_rows=10000, randomize_samples=False), metadata.get_table_meta("nyctaxi_yellow")) report.get_visualization("Column Pair Trends")
Creating report: 100%|██████████| 4/4 [00:12<00:00, 3.11s/it]
Overall Quality Score: 75.18%
Properties:
Column Shapes: 66.88%
Column Pair Trends: 83.47%
from pyspark.sql.functions import col, pandas_udf import numpy as np df = spark.read.format("delta").load("/databricks-datasets/nyctaxi/tables/nyctaxi_yellow").\ filter("YEAR(pickup_datetime) >= 2009 AND YEAR(pickup_datetime) <= 2016").\ drop("total_amount") for c in ["fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount"]: df = df.filter(f"{c} >= 0") df = df.filter("passenger_count > 0") df = df.filter("trip_distance > 0 AND trip_distance < 100") df = df.filter("dropoff_datetime > pickup_datetime") df = df.filter("CAST(dropoff_datetime AS long) < CAST(pickup_datetime AS long) + 12 * 60 * 60") for c in ["pickup_longitude", "dropoff_longitude"]: df = df.filter(f"{c} > -76 AND {c} < -72") for c in ["pickup_latitude", "dropoff_latitude"]: df = df.filter(f"{c} > 39 AND {c} < 43") # Define this as a standalone function for reuse later def haversine_dist_miles(from_lat_deg, from_lon_deg, to_lat_deg, to_lon_deg): to_lat = np.deg2rad(to_lat_deg) to_lon = np.deg2rad(to_lon_deg) from_lat = np.deg2rad(from_lat_deg) from_lon = np.deg2rad(from_lon_deg) # 3958.8 is avg earth radius in miles return 3958.8 * 2 * np.arcsin(np.sqrt( np.square(np.sin((to_lat - from_lat) / 2)) + np.cos(to_lat) * np.cos(from_lat) * np.square(np.sin((to_lon - from_lon) / 2)))) @pandas_udf('double') def haversine_dist_miles_udf(from_lat_deg, from_lon_deg, to_lat_deg, to_lon_deg): return haversine_dist_miles(from_lat_deg, from_lon_deg, to_lat_deg, to_lon_deg) # Allow 90% of min theoretical distance to account for rounding, inaccuracy df = df.filter(col("trip_distance") >= 0.9 * haversine_dist_miles_udf("pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude")) train_df, test_df = df.sample(0.0005, seed=42).randomSplit([0.9, 0.1], seed=42) train_df.cache() table_nyctaxi = train_df.toPandas().sample(frac=1, random_state=42, ignore_index=True)
metadata = Metadata() metadata.add_table(name="nyctaxi_yellow", data=table_nyctaxi) model = TabularPreset(name='FAST_ML', metadata=metadata.get_table_meta("nyctaxi_yellow")) model.fit(table_nyctaxi) report = QualityReport() report.generate(table_nyctaxi, model.sample(num_rows=10000, randomize_samples=False), metadata.get_table_meta("nyctaxi_yellow")) report.get_visualization("Column Pair Trends")
Creating report: 100%|██████████| 4/4 [00:09<00:00, 2.48s/it]
Overall Quality Score: 82.11%
Properties:
Column Shapes: 78.12%
Column Pair Trends: 86.1%
from sdv.constraints import FixedIncrements, Inequality, Positive, ScalarInequality, ScalarRange, create_custom_constraint import numpy as np # Add constraints mirroring those above constraints = [] # Distance shouldn't be (too) much less than straight line distance def is_trip_distance_valid(column_names, data): dist_col, from_lat, from_lon, to_lat, to_lon = column_names return data[dist_col] >= 0.9 * haversine_dist_miles(data[from_lat], data[from_lon], data[to_lat], data[to_lon]) TripDistanceValid = create_custom_constraint(is_valid_fn=is_trip_distance_valid) constraints += [TripDistanceValid(column_names=["trip_distance", "pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"])] # Dropoff shouldn't be more than 12 hours after pickup, or before pickup def is_duration_valid(column_names, data): pickup_col, dropoff_col = column_names return (data[dropoff_col] - data[pickup_col]) < np.timedelta64(12, 'h') DurationValid = create_custom_constraint(is_valid_fn=is_duration_valid) constraints += [DurationValid(column_names=["pickup_datetime", "dropoff_datetime"])] constraints += [Inequality(low_column_name="pickup_datetime", high_column_name="dropoff_datetime")] # Monetary amounts should be positive constraints += [ScalarInequality(column_name=c, relation=">=", value=0) for c in ["fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount"]] # Passengers should be a positive integer constraints += [FixedIncrements(column_name="passenger_count", increment_value=1)] constraints += [Positive(column_name="passenger_count")] # Distance should be positive and not (say) more than 100 miles constraints += [ScalarRange(column_name="trip_distance", low_value=0, high_value=100)] # Lat/lon should be in some credible range around New York City constraints += [ScalarRange(column_name=c, low_value=-76, high_value=-72) for c in ["pickup_longitude", "dropoff_longitude"]] constraints += [ScalarRange(column_name=c, low_value=39, high_value=43) for c in ["pickup_latitude", "dropoff_latitude"]]
import mlflow from mlflow.models import infer_signature from sdv.metadata.dataset import Metadata from sdv.tabular import TVAE from sdmetrics.reports.single_table import QualityReport import pandas as pd # Won't log 'internal' sklearn models fit by SDV mlflow.autolog(disable=True) # Wrapper convenience model that lets the SDV model "predict" new synthetic data class SynthesizeModel(mlflow.pyfunc.PythonModel): def __init__(self, model): self.model = model def predict(self, context, model_input): return self.model.sample(num_rows=len(model_input)) with mlflow.start_run(): metadata = Metadata() metadata.add_table(name="nyctaxi_yellow", data=table_nyctaxi) model = TVAE(constraints=constraints, batch_size=1000, epochs=500, cuda=use_gpu) model.fit(table_nyctaxi) sample = model.sample(num_rows=10000, randomize_samples=False) report = QualityReport() report.generate(table_nyctaxi, sample, metadata.get_table_meta("nyctaxi_yellow")) # Log metrics and plots with MLflow mlflow.log_metric("Quality Score", report.get_score()) for (prop, score) in report.get_properties().to_numpy().tolist(): mlflow.log_metric(prop, score) mlflow.log_dict(report.get_details(prop).to_dict(orient='records'), f"{prop}.json") prop_viz = report.get_visualization(prop) display(prop_viz) mlflow.log_figure(prop_viz, f"{prop}.png") # Log wrapper model for synthesis of data, if desired # Not strictly necessary; this model's .pkl serialization could have been logged as an artifact, # or not at all if use_gpu: # Assign model to CPU for later inference; GPU not really useful model._model.set_device('cpu') synthesize_model = SynthesizeModel(model) dummy_input = pd.DataFrame([True], columns=["dummy"]) # dummy value signature = infer_signature(dummy_input, synthesize_model.predict(None, dummy_input)) mlflow.pyfunc.log_model("model", python_model=synthesize_model, registered_model_name="sdv_synth_model", input_example=dummy_input, signature=signature)
Sampling rows: 100%|██████████| 10000/10000 [00:02<00:00, 4218.93it/s]
Creating report: 100%|██████████| 4/4 [00:07<00:00, 1.81s/it]
Overall Quality Score: 83.25%
Properties:
Column Shapes: 78.51%
Column Pair Trends: 88.0%
Sampling rows: 100%|██████████| 1/1 [00:00<00:00, 1.67it/s]
/databricks/python/lib/python3.9/site-packages/mlflow/models/signature.py:131: UserWarning:
Hint: Inferred schema contains integer column(s). Integer columns in Python cannot represent missing values. If your input data contains missing values at inference time, it will be encoded as floats and will cause a schema enforcement error. The best way to avoid this problem is to infer the model schema based on a realistic data sample (training dataset) that includes missing values. Alternatively, you can declare integer columns as doubles (float64) whenever these columns may have missing values. See `Handling Integers With Missing Values <https://www.mlflow.org/docs/latest/models.html#handling-integers-with-missing-values>`_ for more details.
/databricks/python/lib/python3.9/site-packages/_distutils_hack/__init__.py:30: UserWarning:
Setuptools is replacing distutils.
Successfully registered model 'sdv_synth_model'.
2023/01/25 01:47:12 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: sdv_synth_model, version 1
Created version '1' of model 'sdv_synth_model'.
import mlflow # Pick out the raw SDV model inside the wrapper sdv_model = mlflow.pyfunc.load_model("models:/sdv_synth_model/Production")._model_impl.python_model.model # Simple function to generate data from the model. The input could really be anything; here the input # is assumed to be the number of rows to generate. def synthesize_data(how_many_dfs): for how_many_df in how_many_dfs: # This will generate different data every run, note; can't be seeded, except to make it return # the same data every single call! # output_file_path='disable' is a workaround (?) for temp file errors yield sdv_model.sample(num_rows=how_many_df.sum().item(), output_file_path='disable') # Generate, for example, the same number of rows as in the input how_many = len(table_nyctaxi) partitions = 256 synth_df = spark.createDataFrame([(how_many // partitions,)] * partitions).\ repartition(partitions).\ mapInPandas(synthesize_data, schema=df.schema) display(synth_df)
2023/01/25 02:12:24 WARNING mlflow.pyfunc: Detected one or more mismatches between the model's dependencies and the current Python environment:
- dill (current: 0.3.6, required: dill==0.3.4)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.
Table
1,000 rows|Truncated data
from pandas_profiling import ProfileReport synth_data_df = spark.read.format("delta").load(synth_data_path).toPandas() original_report = ProfileReport(table_nyctaxi, title='Original Data', minimal=True) synth_report = ProfileReport(synth_data_df, title='Synthetic Data', minimal=True) compare_report = original_report.compare(synth_report) compare_report.config.html.navbar_show = False compare_report.config.html.full_width = True displayHTML(compare_report.to_html())
import databricks.automl databricks.automl.regress( spark.read.format("delta").load(synth_data_path), target_col="tip_amount", primary_metric="rmse", experiment_dir=tmp_experiment_dir, experiment_name="Synth models", timeout_minutes=120)
2023/01/26 14:29:57 INFO databricks.automl.client.manager: AutoML will optimize for root mean squared error metric, which is tracked as val_root_mean_squared_error in the MLflow experiment.
2023/01/26 14:29:58 INFO databricks.automl.client.manager: MLflow Experiment ID: 2408902617877312
2023/01/26 14:29:58 INFO databricks.automl.client.manager: MLflow Experiment: https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485#mlflow/experiments/2408902617877312
2023/01/26 14:31:44 INFO databricks.automl.client.manager: Data exploration notebook: https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485#notebook/2408902617877330
2023/01/26 15:23:42 INFO databricks.automl.client.manager: AutoML experiment completed successfully.
Out[5]: <databricks.automl.shared.result.AutoMLSummary at 0x7f7bbf3b3a00>
import mlflow from sklearn.metrics import mean_squared_error, r2_score from math import sqrt test_pd = test_df.toPandas() def print_metrics(exp_name): best_runs = mlflow.search_runs( experiment_names=[f"{tmp_experiment_dir}/{exp_name}"], order_by=["metrics.val_root_mean_squared_error"], max_results=1) run_id = best_runs['run_id'].item() model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") y_pred = model.predict(test_pd.drop("tip_amount", axis=1)) y_true = test_pd["tip_amount"] print(f"RMSE: {sqrt(mean_squared_error(y_true, y_pred))}") print(f"R^2: {r2_score(y_true, y_pred)}") print_metrics("Synth models")
/databricks/python/lib/python3.9/site-packages/sklearn/preprocessing/_encoders.py:170: UserWarning: Found unknown categories in columns [0] during transform. These unknown categories will be encoded as all zeros
warnings.warn(
RMSE: 1.525253299446487
R^2: 0.48683964004022906
import databricks.automl databricks.automl.regress( train_df, target_col="tip_amount", primary_metric="rmse", experiment_dir=tmp_experiment_dir, experiment_name="Actual data models", timeout_minutes=120)
2023/01/26 03:05:24 INFO databricks.automl.client.manager: AutoML will optimize for root mean squared error metric, which is tracked as val_root_mean_squared_error in the MLflow experiment.
2023/01/26 03:05:25 INFO databricks.automl.client.manager: MLflow Experiment ID: 2408902617760708
2023/01/26 03:05:25 INFO databricks.automl.client.manager: MLflow Experiment: https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485#mlflow/experiments/2408902617760708
2023/01/26 03:10:28 INFO databricks.automl.client.manager: Data exploration notebook: https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485#notebook/2408902617765065
2023/01/26 03:34:07 INFO databricks.automl.client.manager: AutoML experiment completed successfully.
Out[22]: <databricks.automl.shared.result.AutoMLSummary at 0x7f67e7fd0c70>
import mlflow best_runs = mlflow.search_runs( experiment_names=[f"{tmp_experiment_dir}/Synth models"], order_by=["metrics.val_root_mean_squared_error"], max_results=1) run_id = best_runs['run_id'].item() model_udf = mlflow.pyfunc.spark_udf(spark, f"runs:/{run_id}/model") synth_df = spark.read.format("delta").load(synth_data_path).drop("tip_amount") display(synth_df.withColumn("prediction", model_udf(*synth_df.drop("tip_amount").columns)).filter("prediction < 0 OR prediction > 100"))
2023/01/26 01:55:03 WARNING mlflow.pyfunc: Detected one or more mismatches between the model's dependencies and the current Python environment:
- cloudpickle (current: 2.2.1, required: cloudpickle==2.0.0)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.
2023/01/26 01:55:03 WARNING mlflow.pyfunc: Calling `spark_udf()` with `env_manager="local"` does not recreate the same environment that was used during training, which may lead to errors or inaccurate predictions. We recommend specifying `env_manager="conda"`, which automatically recreates the environment that was used to train the model and performs inference in the recreated environment.
2023/01/26 01:55:03 INFO mlflow.models.flavor_backend_registry: Selected backend for flavor 'python_function'
Table
1,000 rows|Truncated data
import databricks.automl databricks.automl.regress( train_df.union(spark.read.format("delta").load(synth_data_path)), target_col="tip_amount", primary_metric="rmse", experiment_dir=tmp_experiment_dir, experiment_name="Hybrid data models", timeout_minutes=120)
2023/01/26 03:35:19 INFO databricks.automl.client.manager: AutoML will optimize for root mean squared error metric, which is tracked as val_root_mean_squared_error in the MLflow experiment.
2023/01/26 03:35:20 INFO databricks.automl.client.manager: MLflow Experiment ID: 2408902617780328
2023/01/26 03:35:20 INFO databricks.automl.client.manager: MLflow Experiment: https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485#mlflow/experiments/2408902617780328
2023/01/26 03:41:34 INFO databricks.automl.client.manager: Data exploration notebook: https://e2-demo-field-eng.cloud.databricks.com/?o=1444828305810485#notebook/2408902617780345
2023/01/26 05:37:58 INFO databricks.automl.client.manager: AutoML experiment completed successfully.
Out[25]: <databricks.automl.shared.result.AutoMLSummary at 0x7f67e6936a60>
Synthesizing Data with Generative Models for Better MLOps
Generative models are all the rage, and flashy examples have dominated headlines recently -- DALL-E, ChatGPT, diffusion models. But does your business problem require concocting weird art or off-kilter poetry? Unlikely, unfortunately. Yet this new class of approaches, which generates more data from data, has valuable and more prosaic applications.
Given real business data, GANs (Generative Adversarial Networks) and VAEs (variational autoencoders) can produce synthetic data that resembles real data. Is fake data useful? It could be in cases where source data is sensitive and not readily shareable, yet something like the real data is needed for development or testing of pipelines on that data. Perhaps a third-party data science team will develop a new modeling pipeline, but, sharing sensitive data is not possible. Develop on synthetic data!
It's even possible that a bit of synthetic data alongside real data improves modeling outcomes.
This example explores use of the Python library SDV to generate synthetic data resembling a dataset, and then uses Auto ML to assess the quality of models built with synthetic data. SDV uses deep learning, GANs in particular, via its TVAE module.
With this example, you too can exploit generative AI!