Ir para o conteúdo principal
Código aberto

Apresentando UDFs Arrow no PySpark: Uma Substituição Mais Rápida e Eficiente para UDFs Pandas

Defina UDFs mais performáticas com facilidade.

por Ruifeng Zheng e Yicong Huang

  • Introduzimos UDFs nativas do Arrow, que operam diretamente nos dados do Arrow, eliminando a sobrecarga de conversão Pandas/Arrow em UDFs Pandas para uma execução mais rápida e menor uso de memória.
  • Também descrevemos os tipos de UDFs do Arrow para casos de uso escalar e de agregação, e UDTFs do Arrow para transformações de tabela de entrada e saída, com exemplos de código em Python e SQL.
  • Benchmarks mostram que as UDFs do Arrow são ~10% mais rápidas e usam ~40% menos memória do que as UDFs Pandas, com melhor suporte para tipos de dados complexos.

Introdução

As Funções Definidas pelo Usuário (UDFs) em Python são um mecanismo essencial de extensibilidade, mas tradicionalmente sofriam de alta sobrecarga devido à execução baseada em linha. No Apache Spark™, as UDFs Pandas resolveram parte desse problema introduzindo serialização baseada em Arrow e processamento em lote, melhorando significativamente o throughput em comparação com as UDFs Python escalares.

No entanto, as UDFs Pandas ainda apresentam limitações fundamentais:

  • A conversão de dados Pandas/Arrow introduz cópias de dados adicionais. Abordagens de cópia zero são possíveis apenas em certos casos específicos. Por exemplo, colunas com valores NULL acionarão cópias profundas.
  • Tipos de dados complexos não são bem suportados. Por exemplo, instâncias aninhadas de StructType não são suportadas para o tipo de saída em casos de uso de agregação.
Fluxos de dados da execução de UDFs Pandas no Apache Spark

Ao eliminar a conversão de dados Pandas/Arrow, as UDFs Arrow executam mais rápido que as UDFs Pandas, consomem menos memória e oferecem melhor suporte a tipos de dados.

UDFs Arrow Nativas

Temos o prazer de apresentar as UDFs Arrow Nativas a partir do Databricks Runtime 18.0 (notas de lançamento), um avanço empolgante para a execução de UDFs de alto desempenho.

As UDFs Arrow Nativas operam diretamente em dados Arrow sem converter as entradas em objetos Pandas ou NumPy. Isso preserva o layout colunar de ponta a ponta, evita cópias de dados desnecessárias e permite que as UDFs usem processamento vetorizado, aproveitando o modelo nativo de computação e memória do Arrow.

Para definir uma UDF Arrow, os usuários podem usar um novo decorador Python @arrow_udf, com tipo de retorno especificado e tipo de avaliação opcional. Por exemplo:

Os usuários também podem defini-la com o decorador existente @udf com dicas de tipo completas. Por exemplo:

Nota: A definição da função deve incluir dicas de tipo para todos os argumentos e o valor de retorno.
Este design se alinha com as interfaces das UDFs Python escalares, proporcionando uma experiência consistente e intuitiva para usuários já familiarizados com UDFs Python escalares.

O seguinte demonstra como usar a UDF Arrow:

Uso em Python:

Uso em SQL:

Oferecemos suporte para variantes de interfaces de UDFs Arrow. Incluindo Funções Escalares, Funções de Agregação e Funções de Tabela. Na API de data frame, também fornecemos mapInArrow e applyInArrow para usar UDFs Arrow. Em seguida, as apresentaremos uma a uma.

Funções Escalares Arrow

As Funções Escalares Arrow realizam transformações linha a linha. Elas são o equivalente Arrow das UDFs Pandas escalares e podem ser usadas onde uma expressão de coluna é esperada, como df.select() ou df.withColumn(). Três modos de entrada são suportados: direto, iterador e iterador de múltiplos arrays. As variantes de iterador são úteis quando a UDF requer uma inicialização única e cara (por exemplo, carregar um modelo ou compilar um padrão de regex), já que o custo de configuração é amortizado em todos os lotes. Em todos os casos, a contagem de linhas de saída deve corresponder à contagem de linhas de entrada.

  • Arrays para Array: recebendo um ou mais pyarrow.Array e retornando um pyarrow.Array. O array de entrada e saída deve ter o mesmo número de valores.
  • Iterador de Arrays para Iterador de Arrays: recebendo um iterador de pyarrow.Array e retornando um iterador de pyarrow.Array. Este tipo é útil quando a execução da UDF requer uma inicialização cara.
  • Iterador de Múltiplos Arrays para Iterador de Arrays: recebendo um iterador de uma tupla de múltiplos pyarrow.Array e retornando um iterador de pyarrow.Array.

Funções de Agregação Arrow

As Funções de Agregação Arrow recebem uma ou mais entradas pyarrow.Array e retornam um valor escalar, reduzindo um grupo de linhas em um único resultado. Elas são o equivalente Arrow das UDFs Pandas de agregação agrupada e são usadas com groupBy().agg() ou operações de Janela. Semelhante às funções escalares, as funções de agregação também suportam três modos de entrada.

Arrays para Escalar: recebendo pyarrow.Array e retornando um valor escalar.

  • Iterador de Arrays para Escalar: recebendo um iterador de pyarrow.Array e retornando um valor escalar. Isso é útil para processar grandes volumes de dados em operações de estilo de agregação.

Iterador de Múltiplos Arrays para Escalar: recebendo um iterador de uma tupla de múltiplos pyarrow.Array e retornando um valor escalar. Agregações mais complexas podem ser definidas.

Funções de Tabela Arrow

As Funções de Tabela Arrow, também conhecidas como UDTFs Arrow (Funções de Tabela Definidas pelo Usuário), aceitam um pyarrow.RecordBatch ou múltiplos pa.Array como entrada e produzem um pyarrow.Table como saída. Isso representa o padrão predominante para transformações de tabela-entrada, tabela-saída implementadas em Python utilizando execução colunar. As UDTFs Arrow possuem a capacidade de:

  • Retornar múltiplas colunas
  • Produzir zero, uma ou múltiplas linhas
  • Executar transformações de tabela vetorizadas empregando kernels de computação Arrow

Consequentemente, são otimamente adequadas para operações como filtragem, expansão de linhas, reestruturação de dados e geração de colunas derivadas.

A interface arrow_udtf é projetada para simplicidade, empregando uma sintaxe de decorador onde você define o tipo de retorno usando uma string formatada em DDL. Nesta configuração, o método eval recebe objetos PyArrow como entrada e espera-se que produza PyArrow Tables ou RecordBatches. A interface acomoda dois modos de entrada. Ao processar argumentos de tabela, o método eval é fornecido com um objeto pa.RecordBatch que encapsula todas as colunas da tabela de entrada:

Para argumentos escalares, o método recebe objetos pa.Array, um para cada entrada escalar:

Aqui está outro exemplo:

Esta UDTF pode funcionar de duas maneiras distintas:

Uso em Python:

Uso em SQL:

Suporte a mapInArrow e applyInArrow do DataFrame

Além das Funções Definidas pelo Usuário (UDFs) e Funções de Tabela Definidas pelo Usuário (UDTFs), o PySpark oferece APIs de Função Arrow que facilitam a aplicação direta de funções nativas Python aos dados Arrow no nível do DataFrame. Essas APIs operam de forma análoga às suas contrapartes Pandas (mapInPandas, applyInPandas), mas utilizam pyarrow.RecordBatch e pyarrow.Table em vez de Pandas DataFrames, contornando assim a sobrecarga de conversão entre os formatos Pandas e Arrow.

  • Mapear. DataFrame.mapInArrow transforma um iterador de pyarrow.RecordBatch em outro iterador de pyarrow.RecordBatch, permitindo operações no nível da linha, como filtragem, transformação ou expansão.
  • Mapeamento Agrupado. groupBy().applyInArrow() aplica uma função especificada a cada grupo, aceitando e retornando um pyarrow.Table. Esta funcionalidade é benéfica para transformações por grupo, como normalização de dados.
  • Mapeamento Co-agrupado. cogroup().applyInArrow() permite o co-agrupamento de dois DataFrames com base em uma chave compartilhada, aplicando subsequentemente uma função a cada co-grupo. A função recebe duas pyarrow.Table entradas e espera-se que retorne uma única pyarrow.Table.

Desempenho

Ao remover a cara conversão de dados Pandas/Arrow, as UDFs Arrow geralmente são executadas mais rapidamente do que as UDFs Pandas, com menor uso de memória. Vamos comparar as duas UDFs simples:

A UDF Arrow é ~10% mais rápida que a UDF Pandas, e o profiler de memória mostra que ~40% da memória é economizada na execução.

Conclusão

O Databricks Runtime 18.0 introduz as UDFs Arrow Nativas, oferecendo uma alternativa mais rápida e eficiente às UDFs Pandas para a execução de UDFs Python de alto desempenho no PySpark. Ao operar diretamente nos dados Arrow e eliminar a sobrecarga de conversão Pandas/Arrow, as UDFs Arrow proporcionam uma execução ~10% mais rápida, ~40% menos uso de memória e melhor suporte para tipos de dados complexos — tudo com uma sintaxe de decorador familiar e intuitiva.

Pronto para explorar mais? Experimente as UDFs Arrow Nativas hoje no Databricks como parte do Databricks Runtime 18.0. Para começar, basta substituir suas UDFs Pandas existentes por UDFs Arrow. Na maioria dos casos, são necessárias apenas algumas linhas de alteração para obter ganhos de desempenho imediatos. Consulte a documentação das UDFs Arrow e a documentação das UDTFs Arrow para a referência completa da API e exemplos adicionais.

(Esta publicação no blog foi traduzida utilizando ferramentas baseadas em inteligência artificial) Publicação original

Receba os posts mais recentes na sua caixa de entrada

Assine nosso blog e receba os posts mais recentes diretamente na sua caixa de entrada.