Passa al contenuto principale
Open source

Introduzione alle Arrow UDFs in PySpark: Un sostituto più veloce e leggero per le Pandas UDFs

Definisci UDF più performanti con facilità.

di Ruifeng Zheng e Yicong Huang

  • Introduciamo le UDF Arrow native, che operano direttamente sui dati Arrow, eliminando l'overhead di conversione Pandas/Arrow nelle UDF Pandas per un'esecuzione più rapida e un minore utilizzo di memoria.
  • Descriviamo anche i tipi di UDF Arrow per casi d'uso scalari e di aggregazione, e le UDTF Arrow per trasformazioni table-in, table-out, con esempi di codice sia in Python che in SQL.
  • I benchmark mostrano che le UDF Arrow sono circa il 10% più veloci e utilizzano circa il 40% in meno di memoria rispetto alle UDF Pandas, con un migliore supporto per i tipi di dati complessi.

Introduzione

Le funzioni definite dall'utente (UDF) Python sono un meccanismo di estensibilità essenziale, ma tradizionalmente hanno sofferto di un elevato overhead dovuto all'esecuzione basata su righe. In Apache Spark™, le UDF Pandas hanno affrontato parte di questo problema introducendo la serializzazione basata su Arrow e l'elaborazione batch, migliorando significativamente il throughput rispetto alle UDF Python scalari.

Tuttavia, le UDF Pandas presentano ancora limitazioni fondamentali:

  • La conversione dei dati Pandas/Arrow introduce copie di dati aggiuntive. Gli approcci a copia zero sono possibili solo in alcuni casi specifici. Ad esempio, le colonne con valori NULL attiveranno copie profonde.
  • I tipi di dati complessi non sono ben supportati. Ad esempio, le istanze di StructType nidificate non sono supportate per il tipo di output con casi d'uso di aggregazione.
Flussi di dati dell'esecuzione delle UDF Pandas in Apache Spark

Eliminando la conversione dei dati Pandas/Arrow, le UDF Arrow vengono eseguite più velocemente delle UDF Pandas, consumano meno memoria e offrono un migliore supporto per i tipi di dati.

UDF Arrow Native

Siamo entusiasti di presentare le UDF Arrow Native a partire da Databricks Runtime 18.0 (note di rilascio), un entusiasmante passo avanti per l'esecuzione performante delle UDF.

Le UDF Arrow Native operano direttamente sui dati Arrow senza convertire gli input in oggetti Pandas o NumPy. Ciò preserva il layout colonnare end-to-end, evita copie di dati non necessarie e consente alle UDF di utilizzare l'elaborazione vettorializzata sfruttando il modello di calcolo e memoria nativo di Arrow.

Per definire una UDF Arrow, gli utenti possono utilizzare un nuovo decoratore Python @arrow_udf, con tipo di ritorno specificato e tipo di valutazione opzionale. Ad esempio:

Gli utenti possono anche definirla con il decoratore esistente @udf con suggerimenti di tipo completi. Ad esempio:

Nota: la definizione della funzione dovrebbe includere suggerimenti di tipo per tutti gli argomenti e il valore di ritorno.
Questo design si allinea con le interfacce delle UDF Python scalari, fornendo un'esperienza coerente e intuitiva per gli utenti già familiari con le UDF Python scalari.

Quanto segue dimostra come utilizzare la UDF Arrow:

Utilizzo Python:

Utilizzo SQL:

Forniamo supporto per varianti delle interfacce UDF Arrow. Incluse funzioni scalari, funzioni di aggregazione e funzioni di tabella. Nell'API del dataframe forniamo anche mapInArrow e applyInArrow per utilizzare le UDF Arrow. Le introdurremo una per una.

Funzioni Scalari Arrow

Le funzioni scalari Arrow eseguono trasformazioni riga per riga. Sono l'equivalente Arrow delle UDF Pandas scalari e possono essere utilizzate ovunque sia prevista un'espressione di colonna, come df.select() o df.withColumn(). Sono supportate tre modalità di input: diretta, iteratore e iteratore di array multipli. Le varianti dell'iteratore sono utili quando la UDF richiede un'inizializzazione costosa una tantum (ad esempio, il caricamento di un modello o la compilazione di un pattern regex), poiché il costo di configurazione viene ammortizzato su tutti i batch. In tutti i casi, il numero di righe di output deve corrispondere al numero di righe di input.

  • Da Array ad Array: riceve uno o più pyarrow.Array e restituisce un pyarrow.Array. L'array di input e di output deve avere lo stesso numero di valori.
  • Da Iteratore di Array a Iteratore di Array: riceve un iteratore di pyarrow.Array e restituisce un iteratore di pyarrow.Array. Questo tipo è utile quando l'esecuzione della UDF richiede un'inizializzazione costosa.
  • Da Iteratore di Array Multipli a Iteratore di Array: riceve un iteratore di una tupla di più pyarrow.Array e restituisce un iteratore di pyarrow.Array.

Funzioni di Aggregazione Arrow

Le funzioni di aggregazione Arrow accettano uno o più input pyarrow.Array e restituiscono un valore scalare, riducendo un gruppo di righe in un singolo risultato. Sono l'equivalente Arrow delle UDF Pandas aggregate raggruppate e vengono utilizzate con groupBy().agg() o operazioni Window. Similmente alle funzioni scalari, le funzioni di aggregazione supportano anche tre modalità di input.

Da Array a Scalare: riceve pyarrow.Array e restituisce un valore scalare.

  • Da Iteratore di Array a Scalare: riceve un iteratore di pyarrow.Array e restituisce un valore scalare. Questo è utile per elaborare grandi volumi di dati in operazioni di aggregazione.

Da Iteratore di Array Multipli a Scalare: riceve un iteratore di una tupla di più pyarrow.Array e restituisce un valore scalare. È possibile definire aggregazioni più complesse.

Funzioni di Tabella Arrow

Le funzioni di tabella Arrow, note anche come UDTF Arrow (User-Defined Table Functions), accettano un pyarrow.RecordBatch o più pa.Array come input e producono un pyarrow.Table come output. Questo rappresenta il modello predominante per le trasformazioni table-in, table-out implementate in Python utilizzando l'esecuzione colonnare. Le UDTF Arrow possiedono la capacità di:

  • Restituire più colonne
  • Produrre zero, una o più righe
  • Eseguire trasformazioni di tabella vettorializzate impiegando kernel di calcolo Arrow

Di conseguenza, sono ottimamente adatte per operazioni come il filtraggio, l'espansione delle righe, la ristrutturazione dei dati e la generazione di colonne derivate.

L'interfaccia arrow_udtf è progettata per la semplicità, impiegando una sintassi a decoratore in cui si definisce il tipo di ritorno usando una stringa formattata DDL. In questa configurazione, il metodo eval accetta oggetti PyArrow come input e si prevede che produca tabelle PyArrow o RecordBatches. L'interfaccia supporta due modalità di input. Quando si elaborano argomenti di tabella, al metodo eval viene fornito un oggetto pa.RecordBatch che incapsula tutte le colonne dalla tabella di input:

Per gli argomenti scalari, il metodo riceve oggetti pa.Array, uno per ogni input scalare:

Ecco un altro esempio:

Questa UDTF può funzionare in due modi distinti:

Utilizzo Python:

Utilizzo SQL:

Supporto per DataFrame mapInArrow e applyInArrow

Oltre alle User-Defined Functions (UDF) e alle User-Defined Table Functions (UDTF), PySpark fornisce API di funzione Arrow che facilitano l'applicazione diretta di funzioni native Python ai dati Arrow a livello di DataFrame. Queste API operano in modo analogo alle loro controparti Pandas (mapInPandas, applyInPandas) ma utilizzano pyarrow.RecordBatch e pyarrow.Table invece dei DataFrame Pandas, aggirando così l'overhead di conversione tra i formati Pandas e Arrow.

  • Mappa. DataFrame.mapInArrow trasforma un iteratore di pyarrow.RecordBatch in un altro iteratore di pyarrow.RecordBatch, consentendo operazioni a livello di riga come il filtraggio, la trasformazione o l'espansione.
  • Mappa raggruppata. groupBy().applyInArrow() applica una funzione specificata a ogni gruppo, accettando e restituendo un pyarrow.Table. Questa funzionalità si rivela utile per le trasformazioni per gruppo, come la normalizzazione dei dati.
  • Mappa co-raggruppata. cogroup().applyInArrow() consente il co-raggruppamento di due DataFrame basati su una chiave condivisa, applicando successivamente una funzione a ogni co-gruppo. La funzione riceve due pyarrow.Table input e si prevede che restituisca un singolo pyarrow.Table.

Prestazioni

Eliminando la costosa conversione dei dati Pandas/Arrow, le UDF Arrow generalmente vengono eseguite più velocemente delle UDF Pandas, con un minore utilizzo di memoria. Confrontiamo le due semplici UDF:

L'UDF Arrow è circa il 10% più veloce dell'UDF Pandas, e il profiler di memoria mostra che circa il 40% di memoria viene risparmiato nell'esecuzione.

Conclusione

Databricks Runtime 18.0 introduce le UDF Arrow native, offrendo un'alternativa più veloce e snella alle UDF Pandas per un'esecuzione performante delle UDF Python in PySpark. Operando direttamente sui dati Arrow ed eliminando l'overhead di conversione Pandas/Arrow, le UDF Arrow offrono un'esecuzione circa il 10% più veloce, circa il 40% in meno di utilizzo di memoria e un migliore supporto per i tipi di dati complessi, il tutto con una sintassi a decoratore familiare e intuitiva.

Pronto a esplorare di più? Prova oggi stesso le UDF Arrow native su Databricks come parte di Databricks Runtime 18.0. Per iniziare, sostituisci semplicemente le tue UDF Pandas esistenti con le UDF Arrow. Nella maggior parte dei casi, bastano poche righe di codice per sbloccare guadagni di prestazioni immediati. Consulta la documentazione delle UDF Arrow e la documentazione delle UDTF Arrow per il riferimento API completo e ulteriori esempi.

(Questo post sul blog è stato tradotto utilizzando strumenti basati sull'intelligenza artificiale) Post originale

Ricevi gli ultimi articoli nella tua casella di posta

Iscriviti al nostro blog e ricevi gli ultimi articoli direttamente nella tua casella di posta.