Defina UDFs más eficientes con facilidad.
por Ruifeng Zheng y Yicong Huang
Las funciones definidas por el usuario (UDFs) de Python son un mecanismo de extensibilidad esencial, pero tradicionalmente han sufrido de una sobrecarga alta debido a la ejecución basada en filas. En Apache Spark™, las UDFs de Pandas abordaron parte de este problema al introducir la serialización basada en Arrow y el procesamiento por lotes, mejorando significativamente el rendimiento en comparación con las UDFs escalares de Python.
Sin embargo, las UDFs de Pandas aún tienen limitaciones fundamentales:
Al eliminar la conversión de datos de Pandas/Arrow, las UDFs de Arrow se ejecutan más rápido que las UDFs de Pandas, consumen menos memoria y ofrecen un mejor soporte para tipos de datos.
Nos complace presentar las UDFs Nativas de Arrow a partir de Databricks Runtime 18.0 (notas de la versión), un emocionante avance para la ejecución de UDFs de alto rendimiento.
Las UDFs Nativas de Arrow operan directamente sobre datos de Arrow sin convertir las entradas en objetos de Pandas o NumPy. Esto preserva el diseño columnar de extremo a extremo, evita copias de datos innecesarias y permite que las UDFs utilicen procesamiento vectorizado aprovechando el modelo nativo de computación y memoria de Arrow.
Para definir una UDF de Arrow, los usuarios pueden usar un nuevo decorador de Python @arrow_udf, con un tipo de retorno especificado y un tipo de evaluación opcional. Por ejemplo:
Los usuarios también pueden definirla con el decorador existente @udf con sugerencias de tipo completas. Por ejemplo:
Nota: La definición de la función debe incluir sugerencias de tipo para todos los argumentos y el valor de retorno.
Este diseño se alinea con las interfaces de las UDFs escalares de Python, proporcionando una experiencia consistente e intuitiva para los usuarios ya familiarizados con las UDFs escalares de Python.
A continuación se demuestra cómo usar la UDF de Arrow:
Uso en Python:
Uso en SQL:
Ofrecemos soporte para variantes de interfaces de UDFs de Arrow. Incluyendo Funciones Escalares, Funciones Agregadas y Funciones de Tabla. En la API de data frame también proporcionamos mapInArrow y applyInArrow para usar UDFs de Arrow. A continuación, las presentaremos una por una.
Las Funciones Escalares de Arrow realizan transformaciones fila por fila. Son el equivalente en Arrow de las UDFs escalares de Pandas y se pueden usar en cualquier lugar donde se espere una expresión de columna, como df.select() o df.withColumn(). Se admiten tres modos de entrada: directo, iterador e iterador de múltiples arrays. Las variantes de iterador son útiles cuando la UDF requiere una inicialización costosa única (por ejemplo, cargar un modelo o compilar un patrón de expresión regular), ya que el costo de configuración se amortiza en todos los lotes. En todos los casos, el recuento de filas de salida debe coincidir con el recuento de filas de entrada.
pyarrow.Array y devuelve uno pyarrow.Array. El array de entrada y salida debe tener el mismo número de valores.pyarrow.Array y devuelve un iterador de pyarrow.Array. Este tipo es útil cuando la ejecución de la UDF requiere una inicialización costosa. pyarrow.Array y devuelve un iterador de pyarrow.Array.Las Funciones Agregadas de Arrow toman una o más entradas pyarrow.Array y devuelven un valor escalar, reduciendo un grupo de filas a un único resultado. Son el equivalente en Arrow de las UDFs de Pandas agregadas agrupadas y se usan con groupBy().agg() o con operaciones de Ventana. Al igual que las funciones escalares, las funciones agregadas también admiten tres modos de entrada.
Arrays a Escalar: recibe pyarrow.Array y devuelve un valor escalar.
pyarrow.Array y devuelve un valor escalar. Esto es útil para procesar grandes volúmenes de datos en operaciones de estilo agregación.Iterador de Múltiples Arrays a Escalar: recibe un iterador de una tupla de múltiples pyarrow.Array y devuelve un valor escalar. Se pueden definir agregaciones más complejas.
Las Funciones de Tabla de Arrow, también conocidas como UDTFs de Arrow (Funciones de Tabla Definidas por el Usuario), aceptan un pyarrow.RecordBatch o múltiples pa.Array como entrada y producen un pyarrow.Table como salida. Esto representa el patrón predominante para transformaciones de tabla a tabla implementadas en Python utilizando ejecución columnar. Las UDTFs de Arrow tienen la capacidad de:
En consecuencia, son óptimamente adecuadas para operaciones como el filtrado, la expansión de filas, la reestructuración de datos y la generación de columnas derivadas.
La arrow_udtf interfaz está diseñada para la simplicidad, empleando una sintaxis de decorador donde se define el tipo de retorno usando una cadena con formato DDL. En esta configuración, el método eval toma objetos PyArrow como entrada y se espera que produzca tablas PyArrow o RecordBatches. La interfaz admite dos modos de entrada. Al procesar argumentos de tabla, el método eval recibe un objeto pa.RecordBatch que encapsula todas las columnas de la tabla de entrada:
Para argumentos escalares, el método recibe objetos pa.Array, uno por cada entrada escalar:
Aquí hay otro ejemplo:
Esta UDTF puede funcionar de dos maneras distintas:
Uso en Python:
Uso en SQL:
Además de las Funciones Definidas por el Usuario (UDFs) y las Funciones de Tabla Definidas por el Usuario (UDTFs), PySpark proporciona APIs de funciones Arrow que facilitan la aplicación directa de funciones nativas de Python a datos Arrow a nivel de DataFrame. Estas APIs operan de forma análoga a sus contrapartes de Pandas (mapInPandas, applyInPandas) pero utilizan pyarrow.RecordBatch y pyarrow.Table en lugar de DataFrames de Pandas, evitando así la sobrecarga de conversión entre los formatos Pandas y Arrow.
DataFrame.mapInArrow transforma un iterador de pyarrow.RecordBatch en otro iterador de pyarrow.RecordBatch, permitiendo operaciones a nivel de fila como filtrado, transformación o expansión.groupBy().applyInArrow() aplica una función especificada a cada grupo, aceptando y devolviendo un pyarrow.Table. Esta funcionalidad resulta beneficiosa para transformaciones por grupo, como la normalización de datos.cogroup().applyInArrow() permite la cogrupación de dos DataFrames basándose en una clave compartida, aplicando posteriormente una función a cada cogrupo. La función recibe dos entradas pyarrow.Table y se espera que devuelva una única pyarrow.Table.Al eliminar la costosa conversión de datos de Pandas/Arrow, las UDFs de Arrow generalmente se ejecutan más rápido que las UDFs de Pandas, con un menor uso de memoria. Comparemos las dos UDFs simples:
La UDF de Arrow es aproximadamente un 10% más rápida que la UDF de Pandas, y el perfilador de memoria muestra que se ahorra aproximadamente un 40% de memoria en la ejecución.
Databricks Runtime 18.0 introduce las UDFs nativas de Arrow, ofreciendo una alternativa más rápida y eficiente a las UDFs de Pandas para una ejecución de UDFs de Python de alto rendimiento en PySpark. Al operar directamente con datos Arrow y eliminar la sobrecarga de conversión de Pandas/Arrow, las UDFs de Arrow ofrecen una ejecución ~10% más rápida, ~40% menos uso de memoria y un mejor soporte para tipos de datos complejos, todo con una sintaxis de decorador familiar e intuitiva.
¿Listo para explorar más? Pruebe las UDFs nativas de Arrow hoy mismo en Databricks como parte de Databricks Runtime 18.0. Para empezar, simplemente reemplace sus UDFs de Pandas existentes con UDFs de Arrow. En la mayoría de los casos, solo se necesitan unas pocas líneas de cambio para obtener mejoras de rendimiento inmediatas. Consulte la documentación de Arrow UDF y la documentación de Arrow UDTF para la referencia completa de la API y ejemplos adicionales.
(Esta entrada del blog ha sido traducida utilizando herramientas basadas en inteligencia artificial) Publicación original
Suscríbete a nuestro blog y recibe las últimas publicaciones directamente en tu bandeja de entrada.