Ir para o conteúdo principal

Aprofundamento de Recursos: Marca d'água no Apache Spark Structured Streaming

blog OG

Publicado: 22 de agosto de 2022

Produto12 min de leitura

Principais pontos

  • As marcas d'água ajudam o Spark a entender o progresso do processamento com base no tempo do evento, quando produzir agregações em janelas e quando remover o estado das agregações.
  • Ao unir fluxos de dados, o Spark, por padrão, usa uma única marca d'água global que remove o estado com base no tempo mínimo do evento visto em todos os fluxos de entrada.
  • O RocksDB pode ser usado para reduzir a pressão na memória do cluster e as pausas de GC.
  • Os objetos StreamingQueryProgress e StateOperatorProgress contêm informações importantes sobre como as marcas d'água afetam seu fluxo.

Introdução

Ao construir pipelines em tempo real, uma das realidades com as quais as equipes precisam lidar é que a ingestão de dados distribuídos é inerentemente desordenada. Além disso, no contexto de operações de streaming com estado, as equipes precisam ser capazes de rastrear adequadamente o progresso do tempo do evento no fluxo de dados que estão ingerindo para o cálculo adequado de agregações em janelas de tempo e outras operações com estado. Podemos resolver tudo isso usando o Structured Streaming.

Por exemplo, digamos que somos uma equipe trabalhando na construção de um pipeline para ajudar nossa empresa a fazer manutenção proativa em nossas máquinas de mineração que alugamos para nossos clientes. Essas máquinas precisam estar sempre em perfeitas condições, então as monitoramos em tempo real. Precisaremos realizar agregações com estado nos dados de streaming para entender e identificar problemas nas máquinas.

É aqui que precisamos usar o Structured Streaming e as Marcas d'água para produzir as agregações com estado necessárias que ajudarão a informar decisões sobre manutenção preditiva e muito mais para essas máquinas.

GUIA

Seu guia compacto para analítica moderna

O que é Marca d'água?

De modo geral, ao trabalhar com dados de streaming em tempo real, haverá atrasos entre o tempo do evento e o tempo de processamento devido à forma como os dados são ingeridos e se a aplicação geral experimenta problemas como tempo de inatividade. Devido a esses potenciais atrasos variáveis, o mecanismo que você usa para processar esses dados precisa ter algum mecanismo para decidir quando fechar as janelas de agregação e produzir o resultado agregado.

Embora a inclinação natural para remediar esses problemas possa ser usar um atraso fixo baseado no tempo do relógio, mostraremos neste exemplo que isso não é a melhor solução.

Para explicar isso visualmente, vamos considerar um cenário em que estamos recebendo dados em vários momentos, de aproximadamente 10:50 às 11:20. Estamos criando janelas de 10 minutos que calculam a média das leituras de temperatura e pressão que chegaram durante o período da janela.

Nesta primeira imagem, temos as janelas de agrupamento disparando às 11:00, 11:10 e 11:20, levando às tabelas de resultados mostradas nos respectivos horários. Quando o segundo lote de dados chega por volta das 11:10 com dados que têm um tempo de evento de 10:53, isso é incorporado às médias de temperatura e pressão calculadas para a janela de 11:00 às 11:10, que fecha às 11:10, o que não dá o resultado correto.

Representação visual de um pipeline de Structured Streaming ingerindo lotes de dados de temperatura e pressão

Para garantir que obtemos os resultados corretos para as agregações que queremos produzir, precisamos definir uma marca d'água que permitirá ao Spark saber quando fechar a janela de agregação e produzir o resultado agregado correto.

Em aplicações de Structured Streaming, podemos garantir que todos os dados relevantes para as agregações que queremos calcular sejam coletados usando um recurso chamado marca d'água. Em termos básicos, ao definir uma marca d'água, o Spark Structured Streaming sabe quando ingeriu todos os dados até um determinado tempo, T, (com base em uma expectativa de latência definida) para que possa fechar e produzir agregações em janelas até o timestamp T.

Este segundo visual mostra o efeito da implementação de uma marca d'água de 10 minutos e o uso do modo Append no Spark Structured Streaming.

Representação visual do efeito que uma marca d'água de 10 minutos tem quando aplicada ao pipeline de Structured Streaming.

Ao contrário do primeiro cenário, onde o Spark emitirá a agregação em janela dos dez minutos anteriores a cada dez minutos (ou seja, emitirá a janela de 11:00 às 11:10 às 11:10), o Spark agora espera para fechar e emitir a agregação em janela assim que o tempo máximo do evento visto menos a marca d'água especificada for maior que o limite superior da janela.

Em outras palavras, o Spark precisou esperar até ver pontos de dados onde o tempo mais recente do evento visto menos 10 minutos fosse maior que 11:00 para emitir a janela de agregação de 10:50 às 11:00. Às 11:00, ele não vê isso, então apenas inicializa o cálculo da agregação na loja de estado interna do Spark. Às 11:10, essa condição ainda não é atendida, mas temos um novo ponto de dados para 10:53, então o estado interno é atualizado, mas não emitido. Então, finalmente, às 11:20, o Spark viu um ponto de dados com um tempo de evento de 11:15 e, como 11:15 menos 10 minutos é 11:05, que é posterior a 11:00, a janela de 10:50 às 11:00 pode ser emitida para a tabela de resultados.

Isso produz o resultado correto ao incorporar adequadamente os dados com base na latência esperada definida pela marca d'água. Assim que os resultados são emitidos, o estado correspondente é removido da loja de estado.

Incorporando Marcas d'água em seus Pipelines

Para entender como incorporar essas marcas d'água em nossos pipelines de Structured Streaming, exploraremos este cenário percorrendo um exemplo de código real baseado em nosso caso de uso declarado na seção de introdução deste blog.

Digamos que estamos ingerindo todos os nossos dados de sensor de um cluster Kafka na nuvem e queremos calcular as médias de temperatura e pressão a cada dez minutos com um desvio de tempo esperado de dez minutos. O pipeline de Structured Streaming com marca d'água ficaria assim:

PySpark

Aqui, simplesmente lemos do Kafka, aplicamos nossas transformações e agregações, e então escrevemos em tabelas Delta Lake que serão visualizadas e monitoradas no Databricks SQL. A saída escrita na tabela para uma determinada amostra de dados seria assim:

Saída da consulta de streaming definida no exemplo de código PySpark acima

Para incorporar a marca d'água, primeiro precisamos identificar dois itens:

  1. A coluna que representa o tempo do evento da leitura do sensor
  2. O desvio de tempo esperado estimado dos dados

Retirado do exemplo anterior, podemos ver a marca d'água definida pelo método .withWatermark() com a coluna eventTimestamp usada como coluna de tempo do evento e 10 minutos para representar o desvio de tempo que esperamos.

PySpark

Agora que sabemos como implementar marcas d'água em nosso pipeline de Structured Streaming, será importante entender como outros itens como operações de junção de streaming e gerenciamento de estado são afetados pelas marcas d'água. Além disso, à medida que escalamos nossos pipelines, haverá métricas-chave que nossos engenheiros de dados precisarão estar cientes e monitorar para evitar problemas de desempenho. Exploraremos tudo isso à medida que nos aprofundamos nas marcas d'água.

Marcas d'água em Diferentes Modos de Saída

Antes de nos aprofundarmos, é importante entender como sua escolha de modo de saída afeta o comportamento das marcas d'água que você define.

As marcas d'água só podem ser usadas quando você está executando sua aplicação de streaming nos modos de saída append ou update. Existe um terceiro modo de saída, o modo complete, no qual toda a tabela de resultados é gravada no armazenamento. Este modo não pode ser usado porque requer que todos os dados agregados sejam preservados e, portanto, não pode usar marca d'água para descartar o estado intermediário.

A implicação desses modos de saída no contexto de agregação de janelas e watermarks é que, no modo ‘append’, um agregado pode ser produzido apenas uma vez e não pode ser atualizado. Portanto, uma vez que o agregado é produzido, o motor pode excluir o estado do agregado e, assim, manter o estado geral da agregação limitado. Registros tardios – aqueles para os quais a heurística aproximada de watermark não se aplicou (eram mais antigos que o período de atraso do watermark), portanto, precisam ser descartados por necessidade – o agregado foi produzido e o estado do agregado foi excluído.

Inversamente, para o modo ‘update’, o agregado pode ser produzido repetidamente a partir do primeiro registro e em cada registro recebido, assim um watermark é opcional. O watermark é útil apenas para aparar o estado quando o motor sabe heuristicamente que nenhum outro registro para aquele agregado pode ser recebido. Uma vez que o estado é excluído, novamente quaisquer registros tardios precisam ser descartados, pois o valor agregado foi perdido e não pode ser atualizado.

É importante entender como o estado, os registros que chegam atrasados e os diferentes modos de saída podem levar a comportamentos diferentes de sua aplicação em execução no Spark. A principal conclusão aqui é que, tanto no modo append quanto no modo update, uma vez que o watermark indica que todos os dados foram recebidos para uma janela de agregação, o motor pode aparar o estado da janela. No modo append, o agregado é produzido apenas no fechamento da janela de tempo mais o atraso do watermark, enquanto no modo update ele é produzido a cada atualização da janela.

Por último, ao aumentar sua janela de atraso de watermark, você fará com que o pipeline espere mais pelos dados e potencialmente descarte menos dados – maior precisão, mas também maior latência para produzir os agregados.

Comprimento do Atraso da Janela Precisão Latência
Janela de Atraso Mais Longa Maior Precisão Maior Latência
Janela de Atraso Mais Curta Menor Precisão Menor Latência

Aprofundando em Watermarking

Joins e Watermarking

Existem algumas considerações a serem observadas ao realizar operações de join em suas aplicações de streaming, especificamente ao juntar dois fluxos. Vamos supor, para nosso caso de uso, que queremos juntar o conjunto de dados de streaming sobre leituras de temperatura e pressão com valores adicionais capturados por outros sensores nas máquinas.

Existem três tipos gerais de junções de fluxo-fluxo que podem ser implementadas no Structured Streaming: inner, outer e semi joins. O principal problema ao fazer joins em aplicações de streaming é que você pode ter uma visão incompleta de um lado do join. Dar ao Spark uma compreensão de quando não há correspondências futuras a serem esperadas é semelhante ao problema anterior com agregações, onde o Spark precisava entender quando não havia novas linhas para incorporar ao cálculo da agregação antes de emiti-la.

Para permitir que o Spark lide com isso, podemos usar uma combinação de watermarks e restrições de tempo de evento dentro da condição de join da junção de fluxo-fluxo. Essa combinação permite que o Spark filtre registros tardios e aparar o estado para a operação de join por meio de uma condição de intervalo de tempo na junção. Demonstramos isso no exemplo abaixo:

PySpark

No entanto, ao contrário do exemplo acima, haverá momentos em que cada fluxo pode exigir diferentes defasagens de tempo para seus watermarks. Neste cenário, o Spark tem uma política para lidar com múltiplas definições de watermark. O Spark mantém um watermark global que é baseado no fluxo mais lento para garantir a maior segurança em relação a não perder dados.

Os desenvolvedores têm a capacidade de alterar esse comportamento mudando spark.sql.streaming.multipleWatermarkPolicy para max;, no entanto, isso significa que os dados do fluxo mais lento serão descartados.

Para ver a gama completa de operações de join que exigem ou podem aproveitar watermarks, confira esta seção da documentação do Spark.

Monitoramento e Gerenciamento de Fluxos com Watermarks

Ao gerenciar uma consulta de streaming onde o Spark pode precisar gerenciar milhões de chaves e manter o estado para cada uma delas, a loja de estado padrão que vem com os clusters Databricks pode não ser eficaz. Você pode começar a ver uma utilização de memória mais alta e, em seguida, pausas mais longas de coleta de lixo. Ambos prejudicarão o desempenho e a escalabilidade de sua aplicação Structured Streaming.

É aqui que entra o RocksDB. Você pode aproveitar o RocksDB nativamente no Databricks habilitando-o da seguinte forma na configuração do Spark:

Isso permitirá que o cluster que executa a aplicação Structured Streaming aproveite o RocksDB, que pode gerenciar o estado de forma mais eficiente na memória nativa e tirar proveito do disco local/SSD em vez de manter todo o estado na memória.

Além de rastrear o uso de memória e métricas de coleta de lixo, existem outros indicadores e métricas importantes que devem ser coletados e rastreados ao lidar com Watermarking e Structured Streaming. Para acessar essas métricas, você pode consultar os objetos StreamingQueryProgress e StateOperatorProgress. Confira nossa documentação para exemplos de como usá-los aqui.

No objeto StreamingQueryProgress, há um método chamado “eventTime” que pode ser chamado e que retornará os timestamps max, min, avg e watermark. Os três primeiros são o tempo máximo, mínimo e médio de evento visto neste gatilho. O último é o watermark usado no gatilho.

Exemplo Abreviado de um objeto StreamingQueryProgress

Essas informações podem ser usadas para reconciliar os dados nas tabelas de resultado que suas consultas de streaming estão produzindo e também podem ser usadas para verificar se o watermark em uso é o timestamp de tempo de evento pretendido. Isso pode se tornar importante quando você está juntando fluxos de dados.

Dentro do objeto StateOperatorProgress, há a métrica numRowsDroppedByWatermark. Essa métrica mostrará quantas linhas estão sendo consideradas atrasadas demais para serem incluídas na agregação com estado. Note que esta métrica está medindo linhas descartadas pós-agregação e não as linhas de entrada brutas, então o número não é preciso, mas pode dar uma indicação de que há dados tardios sendo descartados. Isso, em conjunto com as informações do objeto StreamingQueryProgress, pode ajudar os desenvolvedores a determinar se os watermarks estão configurados corretamente.

Múltiplas Agregações, Streaming e Watermarks

Uma limitação remanescente das consultas do Structured Streaming é encadear múltiplos operadores com estado (por exemplo, agregações, junções de streaming) em uma única consulta de streaming. Essa limitação de um único watermark global para agregações com estado é algo para o qual nós, na Databricks, estamos trabalhando em uma solução e liberaremos mais informações nos próximos meses. Confira nosso blog sobre o Projeto Lightspeed para saber mais: Projeto Lightspeed: Processamento de Streaming Mais Rápido e Simples com Apache Spark (databricks.com).

Conclusão

Com o Structured Streaming e Watermarking no Databricks, organizações, como a do caso de uso descrito acima, podem construir aplicações resilientes em tempo real que garantem que as métricas impulsionadas por agregações em tempo real sejam calculadas com precisão, mesmo que os dados não estejam ordenados corretamente ou cheguem no prazo. Para saber mais sobre como você pode construir aplicações em tempo real com o Databricks, entre em contato com seu representante Databricks.

(Esta publicação no blog foi traduzida utilizando ferramentas baseadas em inteligência artificial) Publicação original

Nunca perca uma postagem da Databricks

Inscreva-se nas categorias de seu interesse e receba as últimas postagens na sua caixa de entrada