Definisci UDF più performanti con facilità.
Le funzioni definite dall'utente (UDF) Python sono un meccanismo di estensibilità essenziale, ma tradizionalmente hanno sofferto di un elevato overhead dovuto all'esecuzione basata su righe. In Apache Spark™, le UDF Pandas hanno affrontato parte di questo problema introducendo la serializzazione basata su Arrow e l'elaborazione batch, migliorando significativamente il throughput rispetto alle UDF Python scalari.
Tuttavia, le UDF Pandas presentano ancora limitazioni fondamentali:
Eliminando la conversione dei dati Pandas/Arrow, le UDF Arrow vengono eseguite più velocemente delle UDF Pandas, consumano meno memoria e offrono un migliore supporto per i tipi di dati.
Siamo entusiasti di presentare le UDF Arrow Native a partire da Databricks Runtime 18.0 (note di rilascio), un entusiasmante passo avanti per l'esecuzione performante delle UDF.
Le UDF Arrow Native operano direttamente sui dati Arrow senza convertire gli input in oggetti Pandas o NumPy. Ciò preserva il layout colonnare end-to-end, evita copie di dati non necessarie e consente alle UDF di utilizzare l'elaborazione vettorializzata sfruttando il modello di calcolo e memoria nativo di Arrow.
Per definire una UDF Arrow, gli utenti possono utilizzare un nuovo decoratore Python @arrow_udf, con tipo di ritorno specificato e tipo di valutazione opzionale. Ad esempio:
Gli utenti possono anche definirla con il decoratore esistente @udf con suggerimenti di tipo completi. Ad esempio:
Nota: la definizione della funzione dovrebbe includere suggerimenti di tipo per tutti gli argomenti e il valore di ritorno.
Questo design si allinea con le interfacce delle UDF Python scalari, fornendo un'esperienza coerente e intuitiva per gli utenti già familiari con le UDF Python scalari.
Quanto segue dimostra come utilizzare la UDF Arrow:
Utilizzo Python:
Utilizzo SQL:
Forniamo supporto per varianti delle interfacce UDF Arrow. Incluse funzioni scalari, funzioni di aggregazione e funzioni di tabella. Nell'API del dataframe forniamo anche mapInArrow e applyInArrow per utilizzare le UDF Arrow. Le introdurremo una per una.
Le funzioni scalari Arrow eseguono trasformazioni riga per riga. Sono l'equivalente Arrow delle UDF Pandas scalari e possono essere utilizzate ovunque sia prevista un'espressione di colonna, come df.select() o df.withColumn(). Sono supportate tre modalità di input: diretta, iteratore e iteratore di array multipli. Le varianti dell'iteratore sono utili quando la UDF richiede un'inizializzazione costosa una tantum (ad esempio, il caricamento di un modello o la compilazione di un pattern regex), poiché il costo di configurazione viene ammortizzato su tutti i batch. In tutti i casi, il numero di righe di output deve corrispondere al numero di righe di input.
pyarrow.Array e restituisce un pyarrow.Array. L'array di input e di output deve avere lo stesso numero di valori.pyarrow.Array e restituisce un iteratore di pyarrow.Array. Questo tipo è utile quando l'esecuzione della UDF richiede un'inizializzazione costosa. pyarrow.Array e restituisce un iteratore di pyarrow.Array.Le funzioni di aggregazione Arrow accettano uno o più input pyarrow.Array e restituiscono un valore scalare, riducendo un gruppo di righe in un singolo risultato. Sono l'equivalente Arrow delle UDF Pandas aggregate raggruppate e vengono utilizzate con groupBy().agg() o operazioni Window. Similmente alle funzioni scalari, le funzioni di aggregazione supportano anche tre modalità di input.
Da Array a Scalare: riceve pyarrow.Array e restituisce un valore scalare.
pyarrow.Array e restituisce un valore scalare. Questo è utile per elaborare grandi volumi di dati in operazioni di aggregazione.Da Iteratore di Array Multipli a Scalare: riceve un iteratore di una tupla di più pyarrow.Array e restituisce un valore scalare. È possibile definire aggregazioni più complesse.
Le funzioni di tabella Arrow, note anche come UDTF Arrow (User-Defined Table Functions), accettano un pyarrow.RecordBatch o più pa.Array come input e producono un pyarrow.Table come output. Questo rappresenta il modello predominante per le trasformazioni table-in, table-out implementate in Python utilizzando l'esecuzione colonnare. Le UDTF Arrow possiedono la capacità di:
Di conseguenza, sono ottimamente adatte per operazioni come il filtraggio, l'espansione delle righe, la ristrutturazione dei dati e la generazione di colonne derivate.
L'interfaccia arrow_udtf è progettata per la semplicità, impiegando una sintassi a decoratore in cui si definisce il tipo di ritorno usando una stringa formattata DDL. In questa configurazione, il metodo eval accetta oggetti PyArrow come input e si prevede che produca tabelle PyArrow o RecordBatches. L'interfaccia supporta due modalità di input. Quando si elaborano argomenti di tabella, al metodo eval viene fornito un oggetto pa.RecordBatch che incapsula tutte le colonne dalla tabella di input:
Per gli argomenti scalari, il metodo riceve oggetti pa.Array, uno per ogni input scalare:
Ecco un altro esempio:
Questa UDTF può funzionare in due modi distinti:
Utilizzo Python:
Utilizzo SQL:
Oltre alle User-Defined Functions (UDF) e alle User-Defined Table Functions (UDTF), PySpark fornisce API di funzione Arrow che facilitano l'applicazione diretta di funzioni native Python ai dati Arrow a livello di DataFrame. Queste API operano in modo analogo alle loro controparti Pandas (mapInPandas, applyInPandas) ma utilizzano pyarrow.RecordBatch e pyarrow.Table invece dei DataFrame Pandas, aggirando così l'overhead di conversione tra i formati Pandas e Arrow.
DataFrame.mapInArrow trasforma un iteratore di pyarrow.RecordBatch in un altro iteratore di pyarrow.RecordBatch, consentendo operazioni a livello di riga come il filtraggio, la trasformazione o l'espansione.groupBy().applyInArrow() applica una funzione specificata a ogni gruppo, accettando e restituendo un pyarrow.Table. Questa funzionalità si rivela utile per le trasformazioni per gruppo, come la normalizzazione dei dati.cogroup().applyInArrow() consente il co-raggruppamento di due DataFrame basati su una chiave condivisa, applicando successivamente una funzione a ogni co-gruppo. La funzione riceve due pyarrow.Table input e si prevede che restituisca un singolo pyarrow.Table.Eliminando la costosa conversione dei dati Pandas/Arrow, le UDF Arrow generalmente vengono eseguite più velocemente delle UDF Pandas, con un minore utilizzo di memoria. Confrontiamo le due semplici UDF:
L'UDF Arrow è circa il 10% più veloce dell'UDF Pandas, e il profiler di memoria mostra che circa il 40% di memoria viene risparmiato nell'esecuzione.
Databricks Runtime 18.0 introduce le UDF Arrow native, offrendo un'alternativa più veloce e snella alle UDF Pandas per un'esecuzione performante delle UDF Python in PySpark. Operando direttamente sui dati Arrow ed eliminando l'overhead di conversione Pandas/Arrow, le UDF Arrow offrono un'esecuzione circa il 10% più veloce, circa il 40% in meno di utilizzo di memoria e un migliore supporto per i tipi di dati complessi, il tutto con una sintassi a decoratore familiare e intuitiva.
Pronto a esplorare di più? Prova oggi stesso le UDF Arrow native su Databricks come parte di Databricks Runtime 18.0. Per iniziare, sostituisci semplicemente le tue UDF Pandas esistenti con le UDF Arrow. Nella maggior parte dei casi, bastano poche righe di codice per sbloccare guadagni di prestazioni immediati. Consulta la documentazione delle UDF Arrow e la documentazione delle UDTF Arrow per il riferimento API completo e ulteriori esempi.
(Questo post sul blog è stato tradotto utilizzando strumenti basati sull'intelligenza artificiale) Post originale
Iscriviti al nostro blog e ricevi gli ultimi articoli direttamente nella tua casella di posta.