Definieren Sie performantere UDFs mit Leichtigkeit.
von Ruifeng Zheng und Yicong Huang
Python benutzerdefinierte Funktionen (UDFs) sind ein wesentlicher Erweiterungsmechanismus, litten jedoch traditionell unter hohem Overhead aufgrund der zeilenbasierten Ausführung. In Apache Spark™ lösten Pandas UDFs einen Teil dieses Problems durch die Einführung von Arrow-basierter Serialisierung und Stapelverarbeitung, die den Durchsatz im Vergleich zu skalaren Python UDFs erheblich verbesserten.
Pandas UDFs weisen jedoch immer noch grundlegende Einschränkungen auf:
Durch das Weglassen der Pandas/Arrow-Datenkonvertierung führen die Arrow UDFs schneller aus als Pandas UDFs, verbrauchen weniger Speicher und bieten eine bessere Datentypunterstützung.
Wir freuen uns, die Einführung von Native Arrow UDFs ab Databricks Runtime 18.0 (Versionshinweise) bekannt zu geben, ein aufregender Fortschritt für die performante UDF-Ausführung.
Native Arrow UDFs arbeiten direkt mit Arrow-Daten, ohne Eingaben in Pandas- oder NumPy-Objekte zu konvertieren. Dies bewahrt das spaltenbasierte Layout durchgängig, vermeidet unnötige Datenkopien und ermöglicht UDFs die Nutzung von vektorisierter Verarbeitung durch die Ausnutzung von Arrows nativem Berechnungs- und Speichermodell.
Um eine Arrow UDF zu definieren, können Benutzer einen neuen Python-Decorator @arrow_udf verwenden, mit einem angegebenen Rückgabetyp und einem optionalen Auswertungstyp. Zum Beispiel:
Benutzer können sie auch mit dem bestehenden Decorator @udf und vollständigen Typ-Hinweisen definieren. Zum Beispiel:
Hinweis: Die Funktionsdefinition sollte Typ-Hinweise für alle Argumente und den Rückgabewert enthalten.
Dieses Design stimmt mit den Schnittstellen skalarer Python UDFs überein und bietet eine konsistente und intuitive Erfahrung für Benutzer, die bereits mit skalaren Python UDFs vertraut sind.
Das Folgende zeigt, wie man die Arrow UDF verwendet:
Python-Nutzung:
SQL-Nutzung:
Wir bieten Unterstützung für Varianten von Arrow UDF-Schnittstellen. Dazu gehören Skalarfunktionen, Aggregatfunktionen und Tabellenfunktionen. In der DataFrame API stellen wir auch mapInArrow und applyInArrow zur Verfügung, um Arrow UDFs zu verwenden. Wir werden sie im Folgenden einzeln vorstellen.
Arrow Skalarfunktionen führen zeilenweise Transformationen durch. Sie sind das Arrow-Äquivalent von skalaren Pandas UDFs und können überall dort verwendet werden, wo ein Spaltenausdruck erwartet wird, wie zum Beispiel df.select() oder df.withColumn(). Es werden drei Eingabemodi unterstützt: direkt, Iterator und Iterator mehrerer Arrays. Die Iterator-Varianten sind nützlich, wenn die UDF eine aufwendige einmalige Initialisierung erfordert (z. B. das Laden eines Modells oder das Kompilieren eines Regex-Musters), da die Einrichtungskosten über alle Batches amortisiert werden. In allen Fällen muss die Anzahl der Ausgabezellen mit der Anzahl der Eingabezellen übereinstimmen.
pyarrow.Array und gibt ein pyarrow.Array zurück. Das Eingabe- und Ausgabe-Array muss die gleiche Anzahl von Werten haben.pyarrow.Array und gibt einen Iterator von pyarrow.Array zurück. Dieser Typ ist nützlich, wenn die UDF-Ausführung eine aufwendige Initialisierung erfordert. pyarrow.Array und gibt einen Iterator von pyarrow.Array zurück.Arrow Aggregatfunktionen nehmen eine oder mehrere pyarrow.Array-Eingaben entgegen und geben einen Skalarwert zurück, wobei eine Gruppe von Zeilen zu einem einzigen Ergebnis reduziert wird. Sie sind das Arrow-Äquivalent von gruppierten Aggregat-Pandas UDFs und werden mit groupBy().agg() oder Window-Operationen verwendet. Ähnlich wie Skalarfunktionen unterstützen Aggregatfunktionen ebenfalls drei Eingabemodi.
Arrays zu Skalar: Empfängt pyarrow.Array und gibt einen Skalarwert zurück.
pyarrow.Array und gibt einen Skalarwert zurück. Dies ist nützlich für die Verarbeitung großer Datenmengen in aggregationsartigen Operationen.Iterator mehrerer Arrays zu Skalar: Empfängt einen Iterator eines Tupels mehrerer pyarrow.Array und gibt einen Skalarwert zurück. Komplexere Aggregationen können definiert werden.
Arrow Tabellenfunktionen, auch bekannt als Arrow UDTFs (benutzerdefinierte Tabellenfunktionen), akzeptieren ein pyarrow.RecordBatch oder mehrere pa.Array als Eingabe und erzeugen ein pyarrow.Table als Ausgabe. Dies stellt das vorherrschende Muster für Tabellen-in, Tabellen-out-Transformationen dar, die in Python unter Verwendung von spaltenbasierter Ausführung implementiert werden. Arrow UDTFs bieten die Möglichkeit,:
Folglich sind sie optimal für Operationen wie Filtern, Zeilenerweiterung, Datenrestrukturierung und die Generierung abgeleiteter Spalten geeignet.
Die arrow_udtf Schnittstelle ist auf Einfachheit ausgelegt und verwendet eine Decorator-Syntax, bei der Sie den Rückgabetyp mithilfe eines DDL-formatierten Strings definieren. In diesem Setup akzeptiert die eval Methode PyArrow-Objekte als Eingabe und soll PyArrow Tables oder RecordBatches liefern. Die Schnittstelle unterstützt zwei Eingabemodi. Bei der Verarbeitung von Tabellenargumenten erhält die eval Methode ein pa.RecordBatch Objekt, das alle Spalten der Eingabetabelle kapselt:
Für skalare Argumente empfängt die Methode pa.Array-Objekte, eines für jede skalare Eingabe:
Hier ist ein weiteres Beispiel:
Diese UDTF kann auf zwei verschiedene Arten funktionieren:
Python-Nutzung:
SQL-Nutzung:
Zusätzlich zu benutzerdefinierten Funktionen (UDFs) und benutzerdefinierten Tabellenfunktionen (UDTFs) bietet PySpark Arrow Function APIs, die die direkte Anwendung nativer Python-Funktionen auf Arrow-Daten auf DataFrame-Ebene ermöglichen. Diese APIs funktionieren analog zu ihren Pandas-Pendants (mapInPandas, applyInPandas) verwenden jedoch pyarrow.RecordBatch und pyarrow.Table anstelle von Pandas DataFrames, wodurch der Konvertierungsaufwand zwischen Pandas- und Arrow-Formaten umgangen wird.
DataFrame.mapInArrow transformiert einen Iterator von pyarrow.RecordBatch in einen anderen Iterator von pyarrow.RecordBatch, wodurch zeilenbasierte Operationen wie Filtern, Transformation oder Erweiterung ermöglicht werden.groupBy().applyInArrow() wendet eine angegebene Funktion auf jede Gruppe an, die ein pyarrow.Table akzeptiert und zurückgibt. Diese Funktionalität erweist sich als vorteilhaft für gruppenweise Transformationen, wie z.B. Daten-Normalisierung.cogroup().applyInArrow() ermöglicht das Cogrouping von zwei DataFrames basierend auf einem gemeinsamen Schlüssel und wendet anschließend eine Funktion auf jede Cogroup an. Die Funktion empfängt zwei pyarrow.Table Eingaben und soll eine einzelne pyarrow.Table zurückgeben.Durch das Entfernen der aufwendigen Pandas/Arrow-Datenkonvertierung werden Arrow UDFs im Allgemeinen schneller ausgeführt als Pandas UDFs, mit geringerem Speicherverbrauch. Vergleichen wir die beiden einfachen UDFs:
Die Arrow UDF ist ~10% schneller als die Pandas UDF, und der Speicherprofiler zeigt, dass ~40% Speicher bei der Ausführung eingespart werden.
Databricks Runtime 18.0 führt native Arrow UDFs ein, die eine schnellere, schlankere Alternative zu Pandas UDFs für die performante Ausführung von Python UDFs in PySpark bieten. Durch die direkte Verarbeitung von Arrow-Daten und die Eliminierung des Pandas/Arrow-Konvertierungsaufwands ermöglichen Arrow UDFs eine ~10% schnellere Ausführung, ~40% weniger Speicherverbrauch und eine bessere Unterstützung für komplexe Datentypen – alles mit einer vertrauten, intuitiven Decorator-Syntax.
Bereit, mehr zu entdecken? Probieren Sie Native Arrow UDFs noch heute auf Databricks als Teil von Databricks Runtime 18.0 aus. Um zu beginnen, ersetzen Sie einfach Ihre bestehenden Pandas UDFs durch Arrow UDFs. In den meisten Fällen sind nur wenige Zeilen Codeänderung erforderlich, um sofortige Leistungssteigerungen zu erzielen. Weitere Informationen finden Sie in der Arrow UDF-Dokumentation und der Arrow UDTF-Dokumentation für die vollständige API-Referenz und zusätzliche Beispiele.
(Dieser Blogbeitrag wurde mit KI-gestützten Tools übersetzt.) Originalbeitrag
Abonnieren Sie unseren Blog und erhalten Sie die neuesten Beiträge direkt in Ihren Posteingang.