NB01: Data Prep H3-Geometry Approaches(Python)

Loading...

Notebook-01: Data Prep H3-Geometry Approaches

Data engineering for comparison of Geometry-Centric, H3-Centric, and H3-Geometry Hybrid approaches using various combinations of photonized H3 product and Mosaic functions.

Cluster

  • Running on DBR 11.3 LTS + Photon.
  • Parts of this data engineering is compute heavy, so using m5d.4xlarge AWS workers (favors compute over memory) with autoscaling, e.g. from 2 to 10.

Author: Michael Johns mjohns@databricks.com
Last Modified: 29 NOV 2022

Setup

%pip install databricks-mosaic==0.3.4 --quiet
Python interpreter will be restarted. Python interpreter will be restarted.
from pyspark.databricks.sql.functions import *

from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col
from pyspark.sql.types import *
from pyspark.sql import Window

import os
import pathlib
import requests

import mosaic as mos
mos.enable_mosaic(spark, dbutils)

from keplergl import KeplerGl

def display_kepler(kmap:KeplerGl, height=800, width=1200) -> None:
  """
  Convenience function to render map in kepler.gl
  - use this when cannot directly render or
    want to go beyond the %%mosaic_kepler magic.
  """
  displayHTML(
    kmap
      ._repr_html_()
      .decode("utf-8")
      .replace(".height||400", f".height||{height}")
      .replace(".width||400", f".width||{width}")
  )
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
db_name = username.split('.')[0].replace("@","_") + "_h3_geometry"
sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
sql(f"USE {db_name}")
os.environ['username'] = username
os.environ['db_name'] = db_name
print(f"...username: '{username}', db_name: '{db_name}' (create & use)")

raw_path = f"dbfs:/tmp/mosaic/{username}"
print(f"...raw data will be stored in {raw_path}")
...username: 'mjohns@databricks.com', db_name: 'mjohns_databricks_h3_geometry' (create & use) ...raw data will be stored in dbfs:/tmp/mosaic/mjohns@databricks.com

Table trip

Built-In Dataset available to Databricks Platforms, stored as Delta Lake -- there are 1.6B trips in this data.

%sql 
drop table if exists trip;
create table if not exists trip using DELTA location "/databricks-datasets/nyctaxi/tables/nyctaxi_yellow";
select * from trip limit 10;
 
vendor_id
pickup_datetime
dropoff_datetime
passenger_count
trip_distance
pickup_longitude
pickup_latitude
rate_code_id
1
2
3
4
5
6
7
8
9
10
VTS
2009-11-07T19:44:00.000+0000
2009-11-07T19:49:00.000+0000
2
0.74
-73.992127
40.734658
null
VTS
2009-11-08T02:06:00.000+0000
2009-11-08T02:12:00.000+0000
1
1.04
-74.008553
40.719682
null
VTS
2009-11-08T03:57:00.000+0000
2009-11-08T04:09:00.000+0000
1
4.05
-74.007742
40.717097
null
VTS
2009-11-05T12:56:00.000+0000
2009-11-05T12:58:00.000+0000
1
0.33
-73.992995
40.71258
null
VTS
2009-11-05T08:35:00.000+0000
2009-11-05T08:41:00.000+0000
1
0.6
-74.00921
40.712933
null
VTS
2009-11-05T08:15:00.000+0000
2009-11-05T08:24:00.000+0000
2
1.37
-74.00004
40.719218
null
VTS
2009-11-08T13:48:00.000+0000
2009-11-08T14:02:00.000+0000
1
3.03
-73.99134
40.730082
null
VTS
2009-11-08T19:26:00.000+0000
2009-11-08T19:38:00.000+0000
1
2.32
-73.997913
40.716873
null
VTS
2009-11-05T22:25:00.000+0000
2009-11-05T22:40:00.000+0000
1
4.35
-74.008048
40.723218
null
VTS
2009-11-05T19:10:00.000+0000
2009-11-05T19:18:00.000+0000
1
1.63
-74.000868
40.733013
null
Showing all 10 rows.
%sql select format_number(count(1),0) from trip
df_trip_kepler = (
  spark.table("trip")
    .sample(.00001)
    .withColumn("pickup_wkt", mos.st_point("pickup_longitude", "pickup_latitude"))
)
print(f"count? {df_trip_kepler.count():,}")
count? 15,858

After importing uncomment the following %%mosaic_kepler cell to render.

# %%mosaic_kepler
# df_trip_kepler "pickup_wkt" "geometry" 20_000

Here is a reduced-size screenshot of what is rendered in the imported notebook.

Show code

Table taxi_zone

GeoJSON -> Delta Lake -- there are 263 taxi zones.

Note: for airports "Newark Airport" = 1, "LaGuardia Airport" = 138, "JFK Airport" = 132

raw_taxi_zones_path = f"{raw_path}/taxi_zones" 
# --- UNCOMMENT TO RE-RUN ---
fuse_taxi_zones_path = pathlib.Path(raw_taxi_zones_path.replace('dbfs:/', '/dbfs/'))
fuse_taxi_zones_path.mkdir(parents=True, exist_ok=True)
 
req = requests.get('https://data.cityofnewyork.us/api/geospatial/d3c5-ddgc?method=export&format=GeoJSON')
with open(fuse_taxi_zones_path / f'nyc_taxi_zones.geojson', 'wb') as f:
  f.write(req.content)
  
spark.sql(f"drop table if exists taxi_zone_raw;")
spark.sql(
  f"""
     create table if not exists taxi_zone_raw
     using json
     options (multiline = true)
     location "{raw_taxi_zones_path}";
     """
)

display(dbutils.fs.ls(raw_taxi_zones_path))
%sql 
drop table if exists taxi_zone;
create table if not exists taxi_zone as (
  select
    type,
    feature.properties as properties,
    st_astext(st_geomfromgeojson(to_json(feature.geometry))) as geom_wkt
  from
    (
      select
        type,
        explode(features) as feature
      from
        taxi_zone_raw
    )
);

-- no longer need the raw table
drop table if exists taxi_zone_raw;
select * from taxi_zone limit 10;
%sql select format_number(count(1),0) from taxi_zone

After importing uncomment the following %%mosaic_kepler cell to render.

# %%mosaic_kepler
# "taxi_zone" "geom_wkt" "geometry"

Here is a reduced-size screenshot of what is rendered in the imported notebook.

Show code

Setup H3

Table trip_h3

This is point table for all h3 approaches, using h3_lonlatash3

NOTES:

  • We are using resolution 12 for our more fine-grained analysis (each cell is about 60ft diameter in NYC), but let's capture resolution 10 and 8 using h3_toparent product API for additional analysis later.
  • Z-Order on (pickup_cell) for focus of this effort, could have combined z-ordering, e.g. zorder by (pickup_cell, dropoff_cell) or a separate table for zorder by (dropoff_cell) depending on your analysis.
%sql 
-- this can take ~10+ mins to prepare, e.g. with 8 worker nodes
-- uncomment the drop & optimize statements to run
drop table if exists trip_h3;
create table if not exists trip_h3 as (
  select
    h3_longlatash3(pickup_longitude, pickup_latitude, 12) as pickup_cell,
    h3_toparent(h3_longlatash3(pickup_longitude, pickup_latitude, 12), 10) as pickup_cell_10,
    h3_toparent(h3_longlatash3(pickup_longitude, pickup_latitude, 12), 8) as pickup_cell_8,
    
    h3_longlatash3(dropoff_longitude, dropoff_latitude, 12) as dropoff_cell,
    h3_toparent(h3_longlatash3(dropoff_longitude, dropoff_latitude, 12), 10) as dropoff_cell_10,
    h3_toparent(h3_longlatash3(dropoff_longitude, dropoff_latitude, 12), 8) as dropoff_cell_8,
    *
  from (select /*+ REPARTITION(4096) */ * from trip)
);
optimize trip_h3 zorder by (pickup_cell);
select * from trip_h3 where isnotnull(pickup_cell) limit 5;
 
pickup_cell
pickup_cell_10
pickup_cell_8
dropoff_cell
dropoff_cell_10
dropoff_cell_8
vendor_id
pickup_datetime
1
2
3
4
5
631243949853819900
622236750599094300
613229551345991700
631243922759820300
622236723505102800
613229524250787800
VTS
2014-02-25T19:31:00.000+0000
631243949853837800
622236750599127000
613229551345991700
631243935105496600
622236735850774500
613229536596721700
2
2015-09-30T15:38:58.000+0000
631243949853836800
622236750599127000
613229551345991700
631243922718860300
622236723464142800
613229524210942000
2
2015-09-23T06:42:32.000+0000
631243949853837300
622236750599127000
613229551345991700
631243949967534100
622236750712799200
613229551459237900
2
2015-09-30T14:25:43.000+0000
631243949853809200
622236750599094300
613229551345991700
631243922690444800
622236723435733000
613229524181581800
CMT
2014-02-14T15:51:46.000+0000
Showing all 5 rows.
df_trip_h3_kepler = (
  spark.table("trip_h3")
  .drop("pickup_latitude", "pickup_longitude", "dropoff_cell", "dropoff_cell_10", "dropoff_cell_8", "dropoff_latitude", "dropoff_longitude")
  .dropna(how="any", subset=["pickup_cell"])
  .sample(.00005)
)
print(f"count? {df_trip_h3_kepler.count():,}")
count? 62,778

After importing uncomment the following %%mosaic_kepler cell to render.

# %%mosaic_kepler
# df_trip_h3_kepler "pickup_cell" "h3" 100_000

Here is a reduced-size screenshot of what is rendered in the imported notebook.

Show code

Table taxi_zone_h3

Using h3_polyfillash3 for H3-centric approach

NOTE: Z-Order on (cell)

%sql 
drop table if exists taxi_zone_h3;
create table if not exists taxi_zone_h3 as (
  select
    explode(h3_polyfillash3(geometry, 12)) as cell,
    properties.borough,
    properties.location_id,
    properties.zone
  from
    taxi_zone
);
optimize taxi_zone_h3 zorder by (cell);
select * from taxi_zone_h3 where location_id > 1 limit 5;
df_taxi_zone_h3_kepler = spark.table("taxi_zone_h3").filter("location_id = 1")
print(f"count? {df_taxi_zone_h3_kepler.count():,}")
count? 23,780

After importing uncomment the following %%mosaic_kepler cell to render.

# %%mosaic_kepler
# df_taxi_zone_h3_kepler "cell" "h3" 25_000

Here is a reduced-size screenshot of what is rendered in the imported notebook.

Show code

Table taxi_zone_h3_chip

Using Mosaic grid-tessellateexplode for H3-Geometry hybrid approach.

Cuts the original geometry into several pieces along the grid index borders at the specified resolution. Returns the set of Mosaic chips covering the input geometry at resolution. A Mosaic chip is a struct type composed of:

  • is_core: Identifies if the chip is fully contained within the geometry: Boolean
  • index_id: Index ID of the configured spatial indexing (default H3): Integer
  • wkb: Geometry in WKB format equal to the intersection of the index shape and the original geometry: Binary

NOTE: Z-Order on (cell).

%sql 
-- this can take a few mins to prepare, e.g. with multiple worker nodes
-- uncomment the drop & optimize statements to run
drop table if exists taxi_zone_h3_chip;
create table if not exists taxi_zone_h3_chip as (
  select
    index.index_id as cell,
    * except(index),
    index
  from (
    select 
      grid_tessellateexplode(geometry, 12),
      properties.borough,
      properties.location_id,
      properties.zone
    from (select /*+ REPARTITION(128) */ * from taxi_zone)
    )
);
optimize taxi_zone_h3_chip zorder by (cell);
select * from taxi_zone_h3_chip limit 5;
# -- use the following to get location_ids around zip code 10004
# - this is part of the Financial District 
display( 
spark.table("taxi_zone_h3_chip")
  .withColumn("center_geojson", h3_centerasgeojson("cell"))
  .withColumn("center_x", F.split(F.split(F.split('center_geojson','\[')[1],'\]')[0],',')[0].cast("float")) # <- lon
  .withColumn("center_y", F.split(F.split(F.split('center_geojson','\[')[1],'\]')[0],',')[1].cast("float")) # <- lat
  .filter("center_x >= -74.020") # <- lon
  .filter("center_x <= -73.998") # <-lon  
  .filter("center_y <= 40.712")  # <- lat
  .filter("center_y >= 40.698")  # <- lat
  .select("location_id","zone").distinct().orderBy("location_id").display()
)
 
location_id
zone
1
2
3
4
5
6
7
8
9
12
Battery Park
13
Battery Park City
209
Seaport
231
TriBeCa/Civic Center
261
World Trade Center
33
Brooklyn Heights
45
Chinatown
87
Financial District North
88
Financial District South
Showing all 9 rows.
df_taxi_zone_h3_kepler_chip = (
  spark.table("taxi_zone_h3_chip")
    .filter(col("location_id").isin([12,13,209,231,261,33,45,87,88]))
    .select(
      "location_id",
      "zone",
      h3_h3tostring("cell").alias("h3_idx"),
      "index.is_core",
      mos.st_astext("index.wkb").alias("geom_wkt")
    )
)
print(f"count? {df_taxi_zone_h3_kepler_chip.count():,}")
display(df_taxi_zone_h3_kepler_chip.limit(1))
count? 16,027
 
location_id
zone
h3_idx
is_core
geom_wkt
1
12
Battery Park
8c2a1072800abff
true
POLYGON ((-74.01591841528555 40.703001969672805, -74.01600845245785 40.703071410641435, -74.01613850325775 40.703049881947464, -74.01617851645919 40.70295891237668, -74.01608847923931 40.70288947163751, -74.01595842886557 40.70291100023967, -74.01591841528555 40.703001969672805))
Showing 1 row.

After importing uncomment the following display_kepler cell to render.

# display_kepler(
#   KeplerGl(
#     config={ 
#       'version': 'v1', 
#       'mapState': {
#         'latitude': 40.7, 
#         'longitude': -74.0, 
#         'zoom': 14
#       }, 
#       'mapStyle': {'styleType': 'dark'},
#       'options': {'readOnly': False, 'centerMap': True}
#     },
#     data={
#       'hexring_5_10': df_taxi_zone_h3_kepler_chip.toPandas()
#     },
#     show_docs=False,
#   )
# )

Here is a reduced-size screenshot of what is rendered in the imported notebook.

Show code

Show Tables

Optional: Look at what will be used in comparisons

# %sql show tables