Revenir au contenu principal
Open Source

Présentation des UDF Arrow dans PySpark : un remplacement plus rapide et plus léger pour les UDF Pandas

Définissez des UDFs plus performantes en toute simplicité.

par Ruifeng Zheng et Yicong Huang

  • Nous introduisons les UDFs Arrow natives, qui opèrent directement sur les données Arrow, éliminant la surcharge de conversion Pandas/Arrow dans les UDFs Pandas pour une exécution plus rapide et une utilisation réduite de la mémoire.
  • Nous décrivons également les types UDF Arrow pour les cas d'utilisation scalaires et d'agrégation, et les UDTF Arrow pour les transformations table-en, table-out, avec des exemples de code en Python et SQL.
  • Les benchmarks montrent que les UDFs Arrow sont environ 10% plus rapides et utilisent environ 40% moins de mémoire que les UDFs Pandas, avec un meilleur support pour les types de données complexes.

Introduction

Les fonctions définies par l'utilisateur (UDF) Python sont un mécanisme d'extensibilité essentiel, mais ont traditionnellement souffert d'une surcharge élevée due à l'exécution ligne par ligne. Dans Apache Spark™, les UDF Pandas ont résolu une partie de ce problème en introduisant la sérialisation basée sur Arrow et le traitement par lots, améliorant considérablement le débit par rapport aux UDF Python scalaires.

Cependant, les UDF Pandas présentent encore des limitations fondamentales :

  • La conversion de données Pandas/Arrow introduit des copies de données supplémentaires. Les approches sans copie ne sont possibles que dans certains cas spécifiques. Par exemple, les colonnes avec des valeurs NULL déclencheront des copies profondes.
  • Les types de données complexes ne sont pas bien pris en charge. Par exemple, les instances imbriquées de StructType ne sont pas prises en charge pour le type de sortie avec des cas d'utilisation d'agrégation.
Flux de données de l'exécution des UDF Pandas dans Apache Spark

En supprimant la conversion de données Pandas/Arrow, les UDF Arrow s'exécutent plus rapidement que les UDF Pandas, consomment moins de mémoire et offrent une meilleure prise en charge des types de données.

UDF Arrow natives

Nous sommes ravis de présenter les UDF Arrow natives à partir de Databricks Runtime 18.0 (notes de mise à jour), un bond en avant passionnant pour l'exécution performante des UDF.

Les UDF Arrow natives opèrent directement sur les données Arrow sans convertir les entrées en objets Pandas ou NumPy. Cela préserve la disposition en colonnes de bout en bout, évite les copies de données inutiles et permet aux UDF d'utiliser le traitement vectorisé en tirant parti du modèle de calcul et de mémoire natif d'Arrow.

Pour définir une UDF Arrow, les utilisateurs peuvent utiliser un nouveau décorateur Python @arrow_udf, avec le type de retour spécifié et le type d'évaluation facultatif. Par exemple :

Les utilisateurs peuvent également la définir avec le décorateur existant @udf avec des indications de type complètes. Par exemple :

Remarque : La définition de la fonction doit inclure des indications de type pour tous les arguments et la valeur de retour.
Cette conception s'aligne sur les interfaces des UDF Python scalaires, offrant une expérience cohérente et intuitive aux utilisateurs déjà familiarisés avec les UDF Python scalaires.

Ce qui suit montre comment utiliser l'UDF Arrow :

Utilisation Python :

Utilisation SQL :

Nous prenons en charge des variantes d'interfaces UDF Arrow. Y compris les fonctions scalaires, les fonctions d'agrégation et les fonctions de table. Dans l'API de data frame, nous fournissons également mapInArrow et applyInArrow pour utiliser les UDF Arrow. Nous allons ensuite les présenter une par une.

Fonctions scalaires Arrow

Les fonctions scalaires Arrow effectuent des transformations ligne par ligne. Ce sont l'équivalent Arrow des UDF Pandas scalaires et peuvent être utilisées partout où une expression de colonne est attendue, comme df.select() ou df.withColumn(). Trois modes d'entrée sont pris en charge : direct, itérateur et itérateur de plusieurs tableaux. Les variantes itérateurs sont utiles lorsque l'UDF nécessite une initialisation coûteuse unique (par exemple, le chargement d'un modèle ou la compilation d'un modèle regex), car le coût de configuration est amorti sur tous les lots. Dans tous les cas, le nombre de lignes de sortie doit correspondre au nombre de lignes d'entrée.

  • Tableaux vers Tableau : réception d'un ou plusieurs pyarrow.Array et retour d'un pyarrow.Array. Le tableau d'entrée et de sortie doit avoir le même nombre de valeurs.
  • Itérateur de tableaux vers Itérateur de tableaux : réception d'un itérateur de pyarrow.Array et retour d'un itérateur de pyarrow.Array. Ce type est utile lorsque l'exécution de l'UDF nécessite une initialisation coûteuse.
  • Itérateur de tableaux multiples vers Itérateur de tableaux : réception d'un itérateur d'un tuple de plusieurs pyarrow.Array et retour d'un itérateur de pyarrow.Array.

Fonctions d'agrégation Arrow

Les fonctions d'agrégation Arrow prennent une ou plusieurs entrées pyarrow.Array et retournent une valeur scalaire, réduisant un groupe de lignes en un seul résultat. Ce sont l'équivalent Arrow des UDF Pandas agrégées groupées et sont utilisées avec groupBy().agg() ou des opérations Window. Similaires aux fonctions scalaires, les fonctions d'agrégation prennent également en charge trois modes d'entrée.

Tableaux vers Scalaire : réception de pyarrow.Array et retour d'une valeur scalaire.

  • Itérateur de tableaux vers Scalaire : réception d'un itérateur de pyarrow.Array et retour d'une valeur scalaire. Ceci est utile pour traiter de grands volumes de données dans des opérations de style d'agrégation.

Itérateur de tableaux multiples vers Scalaire : réception d'un itérateur d'un tuple de plusieurs pyarrow.Array et retour d'une valeur scalaire. Des agrégations plus complexes peuvent être définies.

Fonctions de table Arrow

Les fonctions de table Arrow, également connues sous le nom de fonctions de table définies par l'utilisateur (UDTF) Arrow, acceptent un pyarrow.RecordBatch ou plusieurs pa.Array en entrée et produisent un pyarrow.Table en sortie. Cela représente le modèle prédominant pour les transformations table-en, table-out implémentées en Python utilisant l'exécution en colonnes. Les UDTF Arrow ont la capacité de :

  • Retourner plusieurs colonnes
  • Produire zéro, une ou plusieurs lignes
  • Exécuter des transformations de table vectorisées en utilisant les noyaux de calcul Arrow

Par conséquent, elles sont idéalement adaptées aux opérations telles que le filtrage, l'expansion de lignes, la restructuration de données et la génération de colonnes dérivées.

L'interface arrow_udtf est conçue pour la simplicité, utilisant une syntaxe de décorateur où vous définissez le type de retour à l'aide d'une chaîne formatée en DDL. Dans cette configuration, la méthode eval prend des objets PyArrow en entrée et est censée générer des PyArrow Tables ou RecordBatches. L'interface prend en charge deux modes d'entrée. Lors du traitement des arguments de table, la méthode eval reçoit un objet pa.RecordBatch qui encapsule toutes les colonnes de la table d'entrée :

Pour les arguments scalaires, la méthode reçoit des objets pa.Array, un pour chaque entrée scalaire :

Voici un autre exemple :

Cette UDTF peut fonctionner de deux manières distinctes :

Utilisation Python :

Utilisation SQL :

Prise en charge de DataFrame mapInArrow et applyInArrow

En plus des fonctions définies par l'utilisateur (UDF) et des fonctions de table définies par l'utilisateur (UDTF), PySpark fournit des API de fonctions Arrow qui facilitent l'application directe de fonctions Python natives aux données Arrow au niveau du DataFrame. Ces API fonctionnent de manière analogue à leurs homologues Pandas (mapInPandas, applyInPandas) mais utilisent pyarrow.RecordBatch et pyarrow.Table au lieu des DataFrames Pandas, évitant ainsi les frais de conversion entre les formats Pandas et Arrow.

  • Map. DataFrame.mapInArrow transforme un itérateur de pyarrow.RecordBatch en un autre itérateur de pyarrow.RecordBatch, permettant des opérations au niveau des lignes telles que le filtrage, la transformation ou l'expansion.
  • Grouped Map. groupBy().applyInArrow() applique une fonction spécifiée à chaque groupe, acceptant et retournant un pyarrow.Table. Cette fonctionnalité est utile pour les transformations par groupe, telles que la normalisation des données.
  • Co-grouped Map. cogroup().applyInArrow() permet le co-groupement de deux DataFrames sur la base d'une clé partagée, puis l'application d'une fonction à chaque co-groupe. La fonction reçoit deux entrées pyarrow.Table et est censée retourner une seule pyarrow.Table.

Performance

En supprimant la coûteuse conversion de données Pandas/Arrow, les UDF Arrow s'exécutent généralement plus rapidement que les UDF Pandas, avec une utilisation de mémoire réduite. Comparons les deux UDF simples :

L'UDF Arrow est environ 10 % plus rapide que l'UDF Pandas, et le profileur de mémoire montre qu'environ 40 % de mémoire est économisée lors de l'exécution.

Conclusion

Databricks Runtime 18.0 introduit les UDF Arrow natives, offrant une alternative plus rapide et plus légère aux UDF Pandas pour une exécution performante des UDF Python dans PySpark. En opérant directement sur les données Arrow et en éliminant les frais de conversion Pandas/Arrow, les UDF Arrow offrent une exécution environ 10 % plus rapide, une utilisation de mémoire environ 40 % inférieure et un meilleur support pour les types de données complexes, le tout avec une syntaxe de décorateur familière et intuitive.

Prêt à en découvrir davantage ? Essayez dès aujourd'hui les UDF Arrow natives sur Databricks dans le cadre de Databricks Runtime 18.0. Pour commencer, remplacez simplement vos UDF Pandas existantes par des UDF Arrow. Dans la plupart des cas, il suffit de quelques lignes de modification pour obtenir des gains de performance immédiats. Consultez la documentation des UDF Arrow et la documentation des UDTF pour la référence API complète et des exemples supplémentaires.

(Cet article de blog a été traduit à l'aide d'outils basés sur l'intelligence artificielle) Article original

Recevez les derniers articles dans votre boîte mail

Abonnez-vous à notre blog et recevez les derniers articles directement dans votre boîte mail.