Ir al contenido principal

Análisis Profundo de Características: Marcas de Agua en Apache Spark Structured Streaming

blog OG

Publicado: 22 de agosto de 2022

Producto13 min de lectura

Puntos clave

  • Las marcas de agua ayudan a Spark a comprender el progreso del procesamiento en función de la hora del evento, cuándo producir agregados de ventanas y cuándo eliminar el estado de las agregaciones.
  • Al unir flujos de datos, Spark, por defecto, utiliza una única marca de agua global que elimina el estado basándose en la hora mínima del evento vista en los flujos de entrada.
  • RocksDB puede aprovecharse para reducir la presión sobre la memoria del clúster y las pausas de GC.
  • Los objetos StreamingQueryProgress y StateOperatorProgress contienen información clave sobre cómo las marcas de agua afectan su flujo.

Introducción

Al crear canalizaciones en tiempo real, una de las realidades con las que los equipos tienen que trabajar es que la ingesta de datos distribuida no está inherentemente ordenada. Además, en el contexto de operaciones de streaming con estado, los equipos necesitan poder rastrear adecuadamente el progreso de la hora del evento en el flujo de datos que están ingiriendo para el cálculo adecuado de agregaciones de ventanas de tiempo y otras operaciones con estado. Podemos resolver todo esto utilizando Structured Streaming.

Por ejemplo, digamos que somos un equipo trabajando en la creación de una canalización para ayudar a nuestra empresa a realizar mantenimiento proactivo en nuestras máquinas mineras que arrendamos a nuestros clientes. Estas máquinas siempre necesitan estar en óptimas condiciones, por lo que las monitoreamos en tiempo real. Necesitaremos realizar agregaciones con estado en los datos de streaming para comprender e identificar problemas en las máquinas.

Aquí es donde necesitamos aprovechar Structured Streaming y Watermarking para producir las agregaciones con estado necesarias que ayudarán a informar decisiones sobre mantenimiento predictivo y más para estas máquinas.

GUÍA

Tu guía compacta para el análisis moderno

¿Qué es Watermarking?

En términos generales, al trabajar con datos de streaming en tiempo real, habrá retrasos entre la hora del evento y la hora de procesamiento debido a cómo se ingieren los datos y si la aplicación general experimenta problemas como tiempo de inactividad. Debido a estos posibles retrasos variables, el motor que utiliza para procesar estos datos necesita tener algún mecanismo para decidir cuándo cerrar las ventanas de agregación y producir el resultado agregado.

Si bien la inclinación natural para remediar estos problemas podría ser usar un retraso fijo basado en la hora del reloj, mostraremos en este próximo ejemplo por qué esta no es la mejor solución.

Para explicar esto visualmente, tomemos un escenario en el que estamos recibiendo datos en varios momentos, aproximadamente entre las 10:50 AM y las 11:20 AM. Estamos creando ventanas de 10 minutos que calculan el promedio de las lecturas de temperatura y presión que llegaron durante el período de la ventana.

En esta primera imagen, tenemos las ventanas de caída que se activan a las 11:00 AM, 11:10 AM y 11:20 AM, lo que lleva a las tablas de resultados que se muestran en los respectivos momentos. Cuando el segundo lote de datos llega alrededor de las 11:10 AM con datos que tienen una hora de evento de las 10:53 AM, esto se incorpora a los promedios de temperatura y presión calculados para la ventana de 11:00 AM a 11:10 AM que se cierra a las 11:10 AM, lo que no da el resultado correcto.

Representación visual de una canalización de Structured Streaming que ingiere lotes de datos de temperatura y presión

Para garantizar que obtengamos los resultados correctos para los agregados que queremos producir, necesitamos definir una marca de agua que permitirá a Spark saber cuándo cerrar la ventana de agregación y producir el resultado agregado correcto.

En las aplicaciones de Structured Streaming, podemos asegurar que se recopilen todos los datos relevantes para las agregaciones que queremos calcular utilizando una función llamada watermarking. En el sentido más básico, al definir una marca de agua, Spark Structured Streaming sabe cuándo ha ingerido todos los datos hasta cierto tiempo, T, (basado en una expectativa de latencia establecida) para que pueda cerrar y producir agregados de ventanas hasta la marca de tiempo T.

Esta segunda imagen muestra el efecto de implementar una marca de agua de 10 minutos y usar el modo Append en Spark Structured Streaming.

Representación visual del efecto que tiene una marca de agua de 10 minutos cuando se aplica a la canalización de Structured Streaming.

A diferencia del primer escenario donde Spark emitirá la agregación de ventana de los diez minutos anteriores cada diez minutos (es decir, emitirá la ventana de 11:00 AM a 11:10 AM a las 11:10 AM), Spark ahora espera para cerrar y emitir la agregación de ventana una vez que la hora máxima del evento vista menos la marca de agua especificada sea mayor que el límite superior de la ventana.

En otras palabras, Spark necesitó esperar hasta ver puntos de datos donde la hora máxima del evento vista menos 10 minutos fuera mayor que las 11:00 AM para emitir la ventana de agregación de 10:50 AM a 11:00 AM. A las 11:00 AM, no ve esto, por lo que solo inicializa el cálculo agregado en la tienda de estado interna de Spark. A las 11:10 AM, esta condición aún no se cumple, pero tenemos un nuevo punto de datos para las 10:53 AM, por lo que el estado interno se actualiza, pero no se emite. Luego, finalmente, a las 11:20 AM, Spark ha visto un punto de datos con una hora de evento de las 11:15 AM y, dado que las 11:15 AM menos 10 minutos son las 11:05 AM, que es posterior a las 11:00 AM, la ventana de 10:50 AM a 11:00 AM puede emitirse a la tabla de resultados.

Esto produce el resultado correcto al incorporar adecuadamente los datos según la latencia esperada definida por la marca de agua. Una vez que se emiten los resultados, el estado correspondiente se elimina de la tienda de estado.

Incorporación de Watermarking a sus Canalizaciones

Para comprender cómo incorporar estas marcas de agua en nuestras canalizaciones de Structured Streaming, exploraremos este escenario recorriendo un ejemplo de código real basado en nuestro caso de uso declarado en la sección de introducción de este blog.

Digamos que estamos ingiriendo todos nuestros datos de sensores desde un clúster de Kafka en la nube y queremos calcular los promedios de temperatura y presión cada diez minutos con una desviación de tiempo esperada de diez minutos. La canalización de Structured Streaming con watermarking se vería así:

PySpark

Aquí simplemente leemos de Kafka, aplicamos nuestras transformaciones y agregaciones, y luego escribimos en tablas de Delta Lake que se visualizarán y monitorearán en Databricks SQL. La salida escrita en la tabla para una muestra particular de datos se vería así:

Salida de la consulta de streaming definida en el ejemplo de código PySpark anterior

Para incorporar el watermarking, primero necesitábamos identificar dos elementos:

  1. La columna que representa la hora del evento de la lectura del sensor
  2. La desviación de tiempo esperada estimada de los datos

Tomado del ejemplo anterior, podemos ver la marca de agua definida por el método .withWatermark() con la columna eventTimestamp utilizada como columna de hora del evento y 10 minutos para representar la desviación de tiempo que esperamos.

PySpark

Ahora que sabemos cómo implementar marcas de agua en nuestra canalización de Structured Streaming, será importante comprender cómo otros elementos como las operaciones de unión de streaming y la gestión de estado se ven afectados por las marcas de agua. Además, a medida que escalamos nuestras canalizaciones, habrá métricas clave que nuestros ingenieros de datos necesitarán conocer y monitorear para evitar problemas de rendimiento. Exploraremos todo esto a medida que profundizamos en el watermarking.

Marcas de agua en diferentes modos de salida

Antes de profundizar, es importante comprender cómo su elección de modo de salida afecta el comportamiento de las marcas de agua que establece.

Las marcas de agua solo se pueden usar cuando ejecuta su aplicación de streaming en modos de salida de append o update. Existe un tercer modo de salida, el modo complete, en el cual se escribe toda la tabla de resultados en el almacenamiento. Este modo no se puede usar porque requiere que se conserven todos los datos agregados y, por lo tanto, no puede usar marcas de agua para eliminar el estado intermedio.

La implicación de estos modos de salida en el contexto de la agregación de ventanas y las marcas de agua es que en el modo 'append' (agregar), una agregación solo se puede producir una vez y no se puede actualizar. Por lo tanto, una vez que se produce la agregación, el motor puede eliminar el estado de la agregación y así mantener el estado general de agregación acotado. Los registros tardíos – aquellos para los que la heurística de marca de agua aproximada no se aplicó (eran anteriores al período de retardo de la marca de agua), por lo tanto, deben descartarse por necesidad – la agregación se ha producido y el estado de agregación se ha eliminado.

Inversamente, para el modo 'update' (actualizar), la agregación se puede producir repetidamente comenzando desde el primer registro y en cada registro recibido, por lo que una marca de agua es opcional. La marca de agua solo es útil para recortar el estado una vez que el motor sabe heurísticamente que no se recibirán más registros para esa agregación. Una vez que se elimina el estado, nuevamente, los registros tardíos deben descartarse ya que el valor de agregación se ha perdido y no se puede actualizar.

Es importante comprender cómo el estado, los registros que llegan tarde y los diferentes modos de salida pueden generar diferentes comportamientos de su aplicación que se ejecuta en Spark. La principal conclusión aquí es que tanto en los modos append como update, una vez que la marca de agua indica que se han recibido todos los datos para una ventana de agregación, el motor puede recortar el estado de la ventana. En el modo append, la agregación se produce solo al cerrar la ventana de tiempo más el retardo de la marca de agua, mientras que en el modo update se produce en cada actualización de la ventana.

Por último, al aumentar la ventana de retardo de su marca de agua, hará que el pipeline espere más tiempo por los datos y potencialmente descarte menos datos – mayor precisión, pero también mayor latencia para producir las agregaciones.

Longitud del Retardo de Ventana Precisión Latencia
Ventana de Retardo Más Larga Mayor Precisión Mayor Latencia
Ventana de Retardo Más Corta Menor Precisión Menor Latencia

Análisis Profundo de las Marcas de Agua

Uniones y Marcas de Agua

Hay un par de consideraciones a tener en cuenta al realizar operaciones de unión en sus aplicaciones de streaming, específicamente al unir dos streams. Digamos que para nuestro caso de uso, queremos unir el conjunto de datos de streaming sobre lecturas de temperatura y presión con valores adicionales capturados por otros sensores en las máquinas.

Hay tres tipos generales de uniones de stream-stream que se pueden implementar en Structured Streaming: uniones internas, externas y semi-uniones. El principal problema al realizar uniones en aplicaciones de streaming es que puede tener una imagen incompleta de un lado de la unión. Darle a Spark una comprensión de cuándo no se esperan coincidencias futuras es similar al problema anterior con las agregaciones, donde Spark necesitaba comprender cuándo no había nuevas filas para incorporar al cálculo de la agregación antes de emitirla.

Para permitir que Spark maneje esto, podemos aprovechar una combinación de marcas de agua y restricciones de tiempo de evento dentro de la condición de unión de la unión de stream-stream. Esta combinación permite a Spark filtrar los registros tardíos y recortar el estado para la operación de unión a través de una condición de rango de tiempo en la unión. Demostramos esto en el siguiente ejemplo:

PySpark

Sin embargo, a diferencia del ejemplo anterior, habrá ocasiones en las que cada stream puede requerir diferentes desfases de tiempo para sus marcas de agua. En este escenario, Spark tiene una política para manejar múltiples definiciones de marca de agua. Spark mantiene una marca de agua global que se basa en el stream más lento para garantizar la mayor seguridad en cuanto a no perder datos.

Los desarrolladores tienen la capacidad de cambiar este comportamiento cambiando spark.sql.streaming.multipleWatermarkPolicy a max;, sin embargo, esto significa que se descartarán datos del stream más lento.

Para ver el rango completo de operaciones de unión que requieren o podrían aprovechar las marcas de agua, consulte esta sección de la documentación de Spark.

Monitoreo y Administración de Streams con Marcas de Agua

Al administrar una consulta de streaming donde Spark puede necesitar administrar millones de claves y mantener el estado para cada una de ellas, el almacén de estado predeterminado que viene con los clústeres de Databricks puede no ser efectivo. Podría comenzar a ver una mayor utilización de memoria y luego pausas de recolección de basura más largas. Ambos afectarán el rendimiento y la escalabilidad de su aplicación de Structured Streaming.

Aquí es donde entra RocksDB. Puede aprovechar RocksDB de forma nativa en Databricks habilitándolo de la siguiente manera en la configuración de Spark:

Esto permitirá que el clúster que ejecuta la aplicación de Structured Streaming aproveche RocksDB, que puede administrar el estado de manera más eficiente en la memoria nativa y aprovechar el disco local/SSD en lugar de mantener todo el estado en memoria.

Además de rastrear el uso de memoria y las métricas de recolección de basura, existen otros indicadores y métricas clave que deben recopilarse y rastrearse al tratar con Watermarking y Structured Streaming. Para acceder a estas métricas, puede consultar los objetos StreamingQueryProgress y StateOperatorProgress. Consulte nuestra documentación para ver ejemplos de cómo usarlos aquí.

En el objeto StreamingQueryProgress, hay un método llamado "eventTime" que se puede llamar y que devolverá las marcas de tiempo max, min, avg y watermark. Los primeros tres son el tiempo de evento máximo, mínimo y promedio visto en ese trigger. El último es la marca de agua utilizada en el trigger.

Ejemplo abreviado de un objeto StreamingQueryProgress

Estas piezas de información se pueden utilizar para conciliar los datos en las tablas de resultados que generan sus consultas de streaming y también para verificar que la marca de agua utilizada sea la marca de tiempo de tiempo de evento prevista. Esto puede ser importante cuando se unen flujos de datos.

Dentro del objeto StateOperatorProgress se encuentra la métrica numRowsDroppedByWatermark. Esta métrica mostrará cuántas filas se consideran demasiado tardías para ser incluidas en la agregación con estado. Tenga en cuenta que esta métrica mide las filas descartadas después de la agregación y no las filas de entrada brutas, por lo que el número no es preciso, pero puede indicar que se están descartando datos tardíos. Esto, junto con la información del objeto StreamingQueryProgress, puede ayudar a los desarrolladores a determinar si las marcas de agua están configuradas correctamente.

Múltiples Agregaciones, Streaming y Marcas de Agua

Una limitación que queda de las consultas de Structured Streaming es encadenar múltiples operadores stateful (por ejemplo, agregaciones, uniones en streaming) en una única consulta de streaming. Esta limitación de una marca de agua global singular para agregaciones stateful es algo para lo que en Databricks estamos trabajando en una solución y publicaremos más información al respecto en los próximos meses. Consulte nuestro blog sobre Project Lightspeed para obtener más información: Project Lightspeed: Faster and Simpler Stream Processing With Apache Spark (databricks.com).

Conclusión

Con Structured Streaming y Watermarking en Databricks, las organizaciones, como la del caso de uso descrito anteriormente, pueden crear aplicaciones resilientes en tiempo real que garantizan que las métricas impulsadas por agregaciones en tiempo real se calculen con precisión, incluso si los datos no están ordenados correctamente o no llegan a tiempo. Para obtener más información sobre cómo puede crear aplicaciones en tiempo real con Databricks, póngase en contacto con su representante de Databricks.

(Esta entrada del blog ha sido traducida utilizando herramientas basadas en inteligencia artificial) Publicación original

No te pierdas ninguna publicación de Databricks.

Suscríbete a nuestro blog y recibe las últimas publicaciones en tu bandeja de entrada.