As capacidades de streaming do Apache Spark™ evoluíram dramaticamente desde a sua criação, começando com um processamento simples e sem estado, onde cada lote operava de forma independente. A verdadeira transformação veio com a adição de capacidades de processamento com estado através de APIs como mapGroupsWithState e mais tarde flatMapGroupsWithState, permitindo que os desenvolvedores mantenham e atualizem o estado em micro-lotes de streaming. Essas operações com estado abriram possibilidades para processamento de eventos complexos, detecção de anomalias e reconhecimento de padrões em fluxos de dados contínuos.
A última adição do Apache Spark Structured Streaming, transformWithState, representa uma evolução significativa no processamento de fluxo com estado e oferece várias vantagens sobre seus predecessores,flatMapGroupsWithState e applyInPandasWithState,para executar processamento com estado arbitrário de forma mais eficaz. Com o Apache Spark 4.0, este framework alcançou novos patamares de expressividade e desempenho. Esta última evolução oferece o conjunto de ferramentas abrangente necessário para construir aplicações de dados em tempo real sofisticadas que mantêm o contexto ao longo do tempo enquanto processam milhões de eventos por segundo.
Consideraremos os sistemas de monitoramento ambiental como exemplo para demonstrar as capacidades do transformWithStateInPandas, onde coletamos, processamos e analisamos fluxos contínuos de dados de sensores. Embora nosso exemplo se concentre em dados ambientais, a mesma abordagem se aplica a muitos casos de uso operacionais, como telemetria de equipamentos, rastreamento de logística ou automação industrial.
Imagine que você está monitorando a temperatura, umidade, níveis de CO2 e matéria particulada de um local ao longo de um período de tempo, e precisamos acionar um alerta se qualquer uma das médias dessas medições ultrapassar ou ficar abaixo de um limite.
É aqui que as APIs ValueState entram em jogo. Eles podem ser usados para armazenar estado como primitivas ou estruturas complexas. Vamos ver como funciona.
Vamos começar com um único sensor. A cada poucos segundos, este sensor envia uma leitura que se parece com a seguinte:
Para cada sensor, localização e cidade, precisamos manter um estado que acompanhe não apenas as condições atuais, mas também o contexto histórico. Você pode pensar nisso como a memória do sensor, mantendo o controle de tudo, desde a última leitura de timestamp até o número de alertas gerados. Projetamos nosso esquema ValueState para capturar este quadro completo:
Depois de definir nosso processador com estado como TemperatureMonitor, passaremos o processador para o operador transformWithStateInPandas e persistiremos a saída em uma tabela Delta. Isso garante que os dados do TemperatureMonitor estejam disponíveis para serviços externos e análises.
Vamos olhar os dados processados por TemperatureMonitor e armazenados nas tabelas Delta de saída. Ela possui as leituras ambientais de vários sensores em diferentes locais (Paris, Nova York, Londres, Tóquio e Sydney) junto com seus alertas acionados.
Como você pode ver, transformWithState nos ajuda a processar efetivamente o estado e emitir vários alertas ambientais para alta umidade, temperatura, níveis de CO2, etc., em diferentes localizações.
Agora, vamos imaginar uma cidade onde sensores monitoram continuamente as condições ambientais em diferentes localizações. Quando ocorre um pico de temperatura, os administradores da cidade podem precisar saber: Isso é um problema localizado ou um problema em toda a cidade?
As APIs ListState estendem o gerenciamento de estado para lidar com coleções ordenadas, perfeitas para dados de séries temporais e análise histórica. Isso se torna crucial ao rastrear padrões e tendências ao longo de uma linha do tempo ou de um limite arbitrário que escolhemos.
Vamos considerar um cenário onde uma cidade contém vários sensores transmitindo dados constantemente. Quando qualquer localização dentro da cidade relata uma temperatura que excede nosso limite de 25°C, então capturamos os dados e os armazenamos em uma ListState consciente do tempo:
No exemplo abaixo, usamos a classe EnvironmentalMonitorListProcessor e ListState junto com o TTL (Time To Live) integrado para manter este histórico dos dados do sensor com uma atualização de uma hora:
Os valores de estado usados pela transformWithState suportam um valor opcional de tempo de vida (TTL), que é calculado com base no tempo de processamento do valor mais uma duração definida em milissegundos. Quando o TTL expira, o valor correspondente é removido da loja de estado.
O TTL com ListState é crucial para manter automaticamente apenas os dados relevantes dentro de um objeto de estado, pois remove automaticamente os registros desatualizados após um período de tempo especificado.
Neste exemplo, o TTL garante que as análises em toda a cidade permaneçam atuais e relevantes. Cada entrada de estado recebe um carimbo de data/hora de expiração, e uma vez que expira, o estado é limpo automaticamente, prevenindo o crescimento ilimitado do estado enquanto mantém o contexto histórico recente da cidade.
Com o histórico armazenado no objeto ListState, podemos identificar padrões e realizar vários cálculos. Por exemplo, em EnvironmentalMonitorListProcessor determinamos as tendências de temperatura comparando a leitura atual com a leitura histórica mais recente.
Agora vamos conectar o EnvironmentalMonitorListProcessor em um pipeline de streaming, armazenar os resultados em uma tabela Delta e inspecioná-los mais adiante.
Como você pode ver na captura de tela acima, a tabela Delta agora mostra análise temporal em diferentes localizações. Ao combinar o armazenamento temporal do ListState com a análise em nível de cidade, criamos um sistema que não apenas detecta problemas ambientais, mas entende seu contexto e evolução em cidades inteiras. As APIs ListState, juntamente com a gestão de TTL, fornecem uma maneira eficiente de lidar com dados ambientais históricos, evitando o crescimento ilimitado do estado, tornando-o ideal para sistemas de monitoramento ambiental em toda a cidade.
Agora, vamos imaginar um cenário em que planejadores de cidades inteligentes implantam sensores ambientais em diversas zonas urbanas - desde interseções movimentadas do centro da cidade até bairros residenciais e complexos industriais. Cada zona possui padrões ambientais únicos que variam de acordo com a hora do dia e a estação do ano.
Usando as APIs MapState, o sistema pode manter leituras ambientais específicas do local e identificar locais onde as leituras excedem os limites aceitáveis. Esta arquitetura usa localizações da cidade como chaves para monitoramento paralelo em vários ambientes, preservando os valores máximos de medição para rastrear tendências ambientais importantes enquanto evita o crescimento ilimitado do estado.
O EnvironmentalMonitorProcessor aproveita as sofisticadas capacidades de armazenamento de chave-valor do MapState para organizar dados por localização dentro das cidades. Isso permite a análise em tempo real de condições em mudança em diferentes ambientes urbanos, transformando dados brutos de sensores em inteligência acionável para o gerenciamento ambiental urbano.
A estrutura MapState é inicializada com a localização como chave da seguinte forma:
O processo de atualização de estado em nossa implementação leva os valores máximos para cada parâmetro ambiental, garantindo que rastreamos os níveis máximos de poluição em cada local:
A implementação agora pode ser integrada em um pipeline de Streaming Estruturado Spark da seguinte maneira:
A saída da tabela Delta agora mostra um monitoramento ambiental abrangente em vários locais/cidades.
Nas seções acima, mostramos como vários casos de uso de monitoramento ambiental podem ser facilmente suportados usando a nova API transformWithState no Apache Spark. Em resumo, a implementação acima pode habilitar os seguintes casos de uso:
Junto com o código do pipeline mostrado acima, um dos recursos mais poderosos da nova API transformWithState é sua integração perfeita com o leitor de estado no Apache Spark. Esta capacidade fornece uma visibilidade sem precedentes do estado interno mantido pelo nosso sistema de monitoramento ambiental, tornando o desenvolvimento, a depuração e o monitoramento operacional significativamente mais eficazes.
Ao gerenciar um sistema crítico de monitoramento ambiental em várias cidades, entender o estado subjacente é essencial para solucionar anomalias, verificar a integridade dos dados e garantir o funcionamento adequado do sistema. O leitor de fonte de dados de estado nos permite consultar tanto metadados de alto nível quanto valores de estado detalhados.
Como mostrado na captura de tela abaixo, os usuários agora podem obter acesso detalhado a todas as suas linhas de estado para todos os tipos compostos, aumentando assim a capacidade de depuração e observabilidade desses pipelines.
A API transformWithState do Apache Spark™ 4.0 representa um avanço significativo para o processamento de estado arbitrário em aplicações de streaming. Com o caso de uso de monitoramento ambiental acima, mostramos como os usuários podem construir e executar cargas de trabalho operacionais poderosas usando a nova API. Sua abordagem orientada a objetos e conjunto de recursos robustos permitem o desenvolvimento de pipelines de streaming avançados que podem lidar com requisitos complexos enquanto mantêm a confiabilidade e o desempenho. Incentivamos todos os usuários do Spark a experimentar a nova API para seus casos de uso de streaming e aproveitar todos os benefícios que esta nova API tem a oferecer!
Você pode baixar o código acima aqui: https://github.com/databricks-solutions/databricks-blogposts/tree/main/2025-05-transformWithStateInPandas/python/environmentalMonitoring
(This blog post has been translated using AI-powered tools) Original Post
Produto
June 11, 2024/11 min de leitura

