Ir al contenido principal
Código abierto

Presentamos las UDFs de Arrow en PySpark: Un reemplazo más rápido y eficiente para las UDFs de Pandas

Defina UDFs más eficientes con facilidad.

por Ruifeng Zheng y Yicong Huang

  • Presentamos las UDFs nativas de Arrow, que operan directamente con datos de Arrow, eliminando la sobrecarga de conversión de Pandas/Arrow en las UDFs de Pandas para una ejecución más rápida y un menor uso de memoria.
  • También describimos los tipos de UDFs de Arrow para casos de uso escalar y de agregación, y las UDTFs de Arrow para transformaciones de tabla de entrada y tabla de salida, con ejemplos de código tanto en Python como en SQL.
  • Los puntos de referencia muestran que las UDFs de Arrow son aproximadamente un 10% más rápidas y utilizan aproximadamente un 40% menos de memoria que las UDFs de Pandas, con un mejor soporte para tipos de datos complejos.

Introducción

Las funciones definidas por el usuario (UDFs) de Python son un mecanismo de extensibilidad esencial, pero tradicionalmente han sufrido de una sobrecarga alta debido a la ejecución basada en filas. En Apache Spark™, las UDFs de Pandas abordaron parte de este problema al introducir la serialización basada en Arrow y el procesamiento por lotes, mejorando significativamente el rendimiento en comparación con las UDFs escalares de Python.

Sin embargo, las UDFs de Pandas aún tienen limitaciones fundamentales:

  • La conversión de datos de Pandas/Arrow introduce copias de datos adicionales. Los enfoques de cero copias solo son posibles en ciertos casos específicos. Por ejemplo, las columnas con valores NULL activarán copias profundas.
  • Los tipos de datos complejos no son bien compatibles. Por ejemplo, las instancias anidadas de StructType no son compatibles para el tipo de salida en casos de uso de agregación.
Flujos de datos de la ejecución de UDFs de Pandas en Apache Spark

Al eliminar la conversión de datos de Pandas/Arrow, las UDFs de Arrow se ejecutan más rápido que las UDFs de Pandas, consumen menos memoria y ofrecen un mejor soporte para tipos de datos.

UDFs Nativas de Arrow

Nos complace presentar las UDFs Nativas de Arrow a partir de Databricks Runtime 18.0 (notas de la versión), un emocionante avance para la ejecución de UDFs de alto rendimiento.

Las UDFs Nativas de Arrow operan directamente sobre datos de Arrow sin convertir las entradas en objetos de Pandas o NumPy. Esto preserva el diseño columnar de extremo a extremo, evita copias de datos innecesarias y permite que las UDFs utilicen procesamiento vectorizado aprovechando el modelo nativo de computación y memoria de Arrow.

Para definir una UDF de Arrow, los usuarios pueden usar un nuevo decorador de Python @arrow_udf, con un tipo de retorno especificado y un tipo de evaluación opcional. Por ejemplo:

Los usuarios también pueden definirla con el decorador existente @udf con sugerencias de tipo completas. Por ejemplo:

Nota: La definición de la función debe incluir sugerencias de tipo para todos los argumentos y el valor de retorno.
Este diseño se alinea con las interfaces de las UDFs escalares de Python, proporcionando una experiencia consistente e intuitiva para los usuarios ya familiarizados con las UDFs escalares de Python.

A continuación se demuestra cómo usar la UDF de Arrow:

Uso en Python:

Uso en SQL:

Ofrecemos soporte para variantes de interfaces de UDFs de Arrow. Incluyendo Funciones Escalares, Funciones Agregadas y Funciones de Tabla. En la API de data frame también proporcionamos mapInArrow y applyInArrow para usar UDFs de Arrow. A continuación, las presentaremos una por una.

Funciones Escalares de Arrow

Las Funciones Escalares de Arrow realizan transformaciones fila por fila. Son el equivalente en Arrow de las UDFs escalares de Pandas y se pueden usar en cualquier lugar donde se espere una expresión de columna, como df.select() o df.withColumn(). Se admiten tres modos de entrada: directo, iterador e iterador de múltiples arrays. Las variantes de iterador son útiles cuando la UDF requiere una inicialización costosa única (por ejemplo, cargar un modelo o compilar un patrón de expresión regular), ya que el costo de configuración se amortiza en todos los lotes. En todos los casos, el recuento de filas de salida debe coincidir con el recuento de filas de entrada.

  • Arrays a Array: recibe uno o más pyarrow.Array y devuelve uno pyarrow.Array. El array de entrada y salida debe tener el mismo número de valores.
  • Iterador de Arrays a Iterador de Arrays: recibe un iterador de pyarrow.Array y devuelve un iterador de pyarrow.Array. Este tipo es útil cuando la ejecución de la UDF requiere una inicialización costosa.
  • Iterador de Múltiples Arrays a Iterador de Arrays: recibe un iterador de una tupla de múltiples pyarrow.Array y devuelve un iterador de pyarrow.Array.

Funciones Agregadas de Arrow

Las Funciones Agregadas de Arrow toman una o más entradas pyarrow.Array y devuelven un valor escalar, reduciendo un grupo de filas a un único resultado. Son el equivalente en Arrow de las UDFs de Pandas agregadas agrupadas y se usan con groupBy().agg() o con operaciones de Ventana. Al igual que las funciones escalares, las funciones agregadas también admiten tres modos de entrada.

Arrays a Escalar: recibe pyarrow.Array y devuelve un valor escalar.

  • Iterador de Arrays a Escalar: recibe un iterador de pyarrow.Array y devuelve un valor escalar. Esto es útil para procesar grandes volúmenes de datos en operaciones de estilo agregación.

Iterador de Múltiples Arrays a Escalar: recibe un iterador de una tupla de múltiples pyarrow.Array y devuelve un valor escalar. Se pueden definir agregaciones más complejas.

Funciones de Tabla de Arrow

Las Funciones de Tabla de Arrow, también conocidas como UDTFs de Arrow (Funciones de Tabla Definidas por el Usuario), aceptan un pyarrow.RecordBatch o múltiples pa.Array como entrada y producen un pyarrow.Table como salida. Esto representa el patrón predominante para transformaciones de tabla a tabla implementadas en Python utilizando ejecución columnar. Las UDTFs de Arrow tienen la capacidad de:

  • Devolver múltiples columnas
  • Producir cero, una o múltiples filas
  • Ejecutar transformaciones de tabla vectorizadas empleando los kernels de cómputo de Arrow

En consecuencia, son óptimamente adecuadas para operaciones como el filtrado, la expansión de filas, la reestructuración de datos y la generación de columnas derivadas.

La arrow_udtf interfaz está diseñada para la simplicidad, empleando una sintaxis de decorador donde se define el tipo de retorno usando una cadena con formato DDL. En esta configuración, el método eval toma objetos PyArrow como entrada y se espera que produzca tablas PyArrow o RecordBatches. La interfaz admite dos modos de entrada. Al procesar argumentos de tabla, el método eval recibe un objeto pa.RecordBatch que encapsula todas las columnas de la tabla de entrada:

Para argumentos escalares, el método recibe objetos pa.Array, uno por cada entrada escalar:

Aquí hay otro ejemplo:

Esta UDTF puede funcionar de dos maneras distintas:

Uso en Python:

Uso en SQL:

Soporte para DataFrame mapInArrow y applyInArrow

Además de las Funciones Definidas por el Usuario (UDFs) y las Funciones de Tabla Definidas por el Usuario (UDTFs), PySpark proporciona APIs de funciones Arrow que facilitan la aplicación directa de funciones nativas de Python a datos Arrow a nivel de DataFrame. Estas APIs operan de forma análoga a sus contrapartes de Pandas (mapInPandas, applyInPandas) pero utilizan pyarrow.RecordBatch y pyarrow.Table en lugar de DataFrames de Pandas, evitando así la sobrecarga de conversión entre los formatos Pandas y Arrow.

  • Map. DataFrame.mapInArrow transforma un iterador de pyarrow.RecordBatch en otro iterador de pyarrow.RecordBatch, permitiendo operaciones a nivel de fila como filtrado, transformación o expansión.
  • Grouped Map. groupBy().applyInArrow() aplica una función especificada a cada grupo, aceptando y devolviendo un pyarrow.Table. Esta funcionalidad resulta beneficiosa para transformaciones por grupo, como la normalización de datos.
  • Co-grouped Map. cogroup().applyInArrow() permite la cogrupación de dos DataFrames basándose en una clave compartida, aplicando posteriormente una función a cada cogrupo. La función recibe dos entradas pyarrow.Table y se espera que devuelva una única pyarrow.Table.

Rendimiento

Al eliminar la costosa conversión de datos de Pandas/Arrow, las UDFs de Arrow generalmente se ejecutan más rápido que las UDFs de Pandas, con un menor uso de memoria. Comparemos las dos UDFs simples:

La UDF de Arrow es aproximadamente un 10% más rápida que la UDF de Pandas, y el perfilador de memoria muestra que se ahorra aproximadamente un 40% de memoria en la ejecución.

Conclusión

Databricks Runtime 18.0 introduce las UDFs nativas de Arrow, ofreciendo una alternativa más rápida y eficiente a las UDFs de Pandas para una ejecución de UDFs de Python de alto rendimiento en PySpark. Al operar directamente con datos Arrow y eliminar la sobrecarga de conversión de Pandas/Arrow, las UDFs de Arrow ofrecen una ejecución ~10% más rápida, ~40% menos uso de memoria y un mejor soporte para tipos de datos complejos, todo con una sintaxis de decorador familiar e intuitiva.

¿Listo para explorar más? Pruebe las UDFs nativas de Arrow hoy mismo en Databricks como parte de Databricks Runtime 18.0. Para empezar, simplemente reemplace sus UDFs de Pandas existentes con UDFs de Arrow. En la mayoría de los casos, solo se necesitan unas pocas líneas de cambio para obtener mejoras de rendimiento inmediatas. Consulte la documentación de Arrow UDF y la documentación de Arrow UDTF para la referencia completa de la API y ejemplos adicionales.

(Esta entrada del blog ha sido traducida utilizando herramientas basadas en inteligencia artificial) Publicación original

Recibe las últimas publicaciones en tu bandeja de entrada

Suscríbete a nuestro blog y recibe las últimas publicaciones directamente en tu bandeja de entrada.