Ir al contenido principal

Implementación de un almacén de datos dimensional con Databricks SQL, parte 3

Construcción de los flujos de trabajo ETL de hechos

dimensional data modeling pt 3

Publicado: 27 de mayo de 2025

Soluciones13 min de lectura

Summary

  • Extracciones de Delta: Implementación de extracciones de datos incrementales de sistemas operativos usando marcas de tiempo para identificar registros nuevos o actualizados.
  • Miembros de llegada tardía: Manejo de datos de dimensión de llegada tardía insertando registros faltantes para asegurar la integridad referencial con las tablas de hechos.
  • Publicación de tablas de hechos: Publicación de datos en tablas de hechos emparejando claves de negocio con claves sustitutas de tablas de dimensión.

El modelado dimensional es un enfoque probado en el tiempo para construir almacenes de datos listos para análisis. Si bien muchas organizaciones están migrando a plataformas modernas como Databricks, estas técnicas fundamentales aún se aplican.

En la Parte 1, diseñamos nuestro esquema dimensional. En la Parte 2, creamos pipelines ETL para tablas de dimensión. Ahora, en la Parte 3, implementamos la lógica ETL para las tablas de hechos, enfatizando la eficiencia y la integridad.

Tablas de hechos y extracciones delta

En el primer blog, definimos la tabla de hechos, FactInternetSales, como se muestra a continuación. En comparación con nuestras tablas de dimensión, la tabla de hechos es relativamente estrecha en términos de longitud de registro, con solo referencias de clave externa a nuestras tablas de dimensión, nuestras medidas de hechos, nuestros campos de dimensión degenerados y un solo campo de metadatos presente:

NOTA: En el ejemplo a continuación, hemos modificado la declaración CREATE TABLE de nuestra primera publicación para incluir las definiciones de clave externa en lugar de definirlas en declaraciones ALTER TABLE separadas. También hemos incluido una restricción de clave primaria en los campos de dimensión degenerados para ser más explícitos sobre su función en esta tabla de hechos.

La definición de la tabla es bastante sencilla, pero vale la pena tomarse un momento para discutir el campo de metadatos LastModifiedDateTime. Si bien las tablas de hechos son relativamente estrechas en cuanto al número de campos, tienden a ser muy profundas en cuanto al número de filas. Las tablas de hechos a menudo albergan millones, si no miles de millones, de registros, a menudo derivados de actividades operativas de alto volumen. En lugar de intentar recargar la tabla con una extracción completa en cada ciclo ETL, generalmente limitaremos nuestros esfuerzos a los registros nuevos y a aquellos que han sido modificados.

Dependiendo del sistema de origen y su infraestructura subyacente, existen muchas formas de identificar qué registros operativos deben extraerse en un ciclo ETL determinado. Las capacidades de captura de datos de cambios (CDC) implementadas en el lado operativo son los mecanismos más confiables. Pero cuando estos no están disponibles, a menudo recurrimos a las marcas de tiempo registradas con cada registro de transacción a medida que se crea y modifica. El enfoque no es infalible para la detección de cambios, pero como cualquier desarrollador ETL experimentado atestiguará, a menudo es lo mejor que tenemos.

NOTA: La introducción de Lakeflow Connect proporciona una opción interesante para realizar la captura de datos de cambios en bases de datos relacionales. Esta capacidad está en vista previa en el momento de la redacción de este blog. Sin embargo, a medida que la capacidad madura para expandirse a más y más RDBMS, esperamos que esto proporcione un mecanismo efectivo y eficiente para extracciones incrementales.

En nuestra tabla de hechos, el campo LastModifiedDateTime captura dicho valor de marca de tiempo registrado en el sistema operativo. Antes de extraer datos de nuestro sistema operativo, revisaremos la tabla de hechos para identificar el último valor de este campo que hemos registrado. Ese valor será el punto de partida para nuestra extracción incremental (también conocida como delta).

GUÍA

Tu guía compacta para el análisis moderno

El flujo de trabajo ETL de hechos

El flujo de trabajo de alto nivel para nuestro ETL de hechos procederá de la siguiente manera:

  1. Recuperar el último valor de LastModifiedDateTime de nuestra tabla de hechos.
  2. Extraer datos transaccionales relevantes del sistema de origen con marcas de tiempo iguales o posteriores al último valor de LastModifiedDateTime.
  3. Realizar cualquier paso adicional de limpieza de datos requerido en los datos extraídos.
  4. Publicar cualquier valor de miembro que llegue tarde en las dimensiones asociadas.
  5. Buscar valores de clave externa de las dimensiones asociadas.
  6. Publicar datos en la tabla de hechos.

Para que este flujo de trabajo sea más fácil de entender, describiremos sus fases clave en las siguientes secciones. A diferencia de la publicación sobre ETL de dimensiones, implementaremos la lógica de este flujo de trabajo utilizando una combinación de SQL y Python, basándonos en qué lenguaje hace que cada paso sea más sencillo de implementar. Una vez más, una de las fortalezas de la Plataforma Databricks es su soporte para múltiples lenguajes. En lugar de presentarlo como una elección de todo o nada tomada al principio de una implementación, mostraremos cómo los ingenieros de datos pueden alternar rápidamente entre los dos dentro de una sola implementación.

Pasos 1-3: Fase de extracción de Delta

Los dos primeros pasos de nuestro flujo de trabajo se centran en extraer información nueva y recientemente actualizada de nuestro sistema operativo. En el primer paso, realizamos una simple búsqueda del último valor registrado para LastModifiedDateTime. Si la tabla de hechos está vacía, como debería estar al inicializar, definimos un valor predeterminado que se remonta lo suficiente en el tiempo como para capturar todos los datos relevantes en el sistema de origen:

Ahora podemos extraer los datos requeridos de nuestro sistema operativo utilizando ese valor. Si bien esta consulta incluye bastantes detalles, centre su atención en la cláusula WHERE, donde empleamos el último valor de marca de tiempo observado del paso anterior para recuperar los elementos de línea individuales que son nuevos o modificados (o asociados con pedidos de ventas que son nuevos o modificados):

Como antes, los datos extraídos se conservan en una tabla en nuestro esquema de staging, solo accesible para nuestros ingenieros de datos, antes de proceder a los pasos posteriores del flujo de trabajo. Si tenemos alguna limpieza de datos adicional que realizar, deberíamos hacerlo ahora.

Paso 4: Fase de miembros que llegan tarde

La secuencia típica en un ciclo ETL de data warehouse es ejecutar nuestros flujos de trabajo ETL de dimensiones y luego nuestros flujos de trabajo de hechos poco después. Al organizar nuestros procesos de esta manera, podemos asegurar mejor que toda la información necesaria para conectar nuestros registros de hechos con los datos de dimensiones esté disponible. Sin embargo, hay una pequeña ventana dentro de la cual llegan nuevos datos orientados a dimensiones y son capturados por un registro transaccional relevante para hechos. Esa ventana aumenta si tenemos un fallo en el ciclo ETL general que retrasa la extracción de datos de hechos. Y, por supuesto, siempre puede haber fallos de referencia en los sistemas de origen que permitan que aparezcan datos cuestionables en un registro transaccional.

Para protegernos de este problema, insertaremos en una tabla de dimensiones dada cualquier valor de clave de negocio que se encuentre en nuestros datos de hechos preparados pero no en el conjunto de registros actuales (no caducados) para esa dimensión. Este enfoque creará un registro con una clave de negocio (natural) y una clave sustituta que nuestra tabla de hechos puede referenciar. Estos registros se marcarán como que llegan tarde si la dimensión de destino es una SCD de Tipo 2, para que podamos actualizarlos adecuadamente en el próximo ciclo ETL.

Para empezar, compilaremos una lista de campos clave de negocio en nuestros datos preparados. Aquí, estamos aprovechando las estrictas convenciones de nomenclatura que nos permiten identificar estos campos dinámicamente:

NOTA: Cambiamos a Python para los siguientes ejemplos de código. Databricks admite el uso de múltiples lenguajes, incluso dentro del mismo flujo de trabajo. En este ejemplo, Python nos da un poco más de flexibilidad y al mismo tiempo se alinea con los conceptos de SQL, lo que hace que este enfoque sea accesible para los desarrolladores de SQL más tradicionales.

Observe que hemos separado nuestras claves de fecha de las otras claves de negocio. Volveremos a ellas en un momento, pero por ahora, centrémonos en las claves no de fecha (otras) en esta tabla.

Para cada clave de negocio no de fecha, podemos usar nuestras convenciones de nomenclatura de campos y tablas para identificar la tabla de dimensiones que debería contener esa clave y luego realizar un left-semi join (similar a una comparación NOT IN() pero admitiendo coincidencias de varias columnas si es necesario) para identificar cualquier valor para esa columna en la tabla preparada pero no en la tabla de dimensiones. Cuando encontramos un valor no coincidente, simplemente lo insertamos en la tabla de dimensiones con la configuración apropiada para el campo IsLateArriving:

Esta lógica funcionaría bien para nuestras referencias de dimensiones de fecha si quisiéramos asegurar que nuestros registros de hechos se enlazaran a entradas válidas. Sin embargo, muchos sistemas de BI posteriores implementan lógica que requiere que la dimensión de fecha albergue una serie continua e ininterrumpida de fechas entre los valores más tempranos y más tardíos registrados. Si encontramos una fecha antes o después del rango de valores en la tabla, necesitamos no solo ingresar el miembro faltante, sino crear los valores adicionales requeridos para preservar un rango ininterrumpido. Por esa razón, necesitamos una lógica ligeramente diferente para las fechas que llegan tarde:

Si no has trabajado mucho con Databricks o Spark SQL, la consulta en el corazón de este último paso es probablemente desconocida. La función sequence() crea una secuencia de valores basada en un inicio y un fin especificados. El resultado es un array que luego podemos expandir (usando la función explode()) para que cada elemento del array forme una fila en un conjunto de resultados. A partir de ahí, simplemente comparamos el rango requerido con lo que hay en la tabla de dimensión para identificar qué elementos necesitan ser insertados. Con esa inserción, nos aseguramos de tener un valor de clave sustituta implementado en esta dimensión como una clave inteligente para que nuestros registros de hechos tengan algo a lo que referenciar.

Pasos 5 - 6: Fase de publicación de datos

Ahora que podemos estar seguros de que todas las claves de negocio en nuestra tabla provisional se pueden emparejar con registros en sus dimensiones correspondientes, podemos proceder con la publicación a la tabla de hechos.

El primer paso en este proceso es buscar los valores de clave externa para estas claves de negocio. Esto se puede hacer como parte de un único paso de publicación, pero el gran número de uniones en la consulta a menudo hace que este enfoque sea difícil de mantener. Por esta razón, podríamos tomar el enfoque menos eficiente pero más fácil de comprender y modificar de buscar los valores de clave externa una clave de negocio a la vez y añadir esos valores a nuestra tabla provisional:

Nuevamente, estamos explotando las convenciones de nomenclatura para hacer esta lógica más sencilla de implementar. Debido a que nuestra dimensión de fecha es una dimensión de rol y, por lo tanto, sigue una convención de nomenclatura más variable, implementamos una lógica ligeramente diferente para esas claves de negocio.

En este punto, nuestra tabla provisional contiene claves de negocio y valores de clave sustituta junto con nuestras medidas, campos de dimensión degenerados y el valor LastModifiedDate extraído de nuestro sistema de origen. Para hacer la publicación más manejable, debemos alinear los campos disponibles con los admitidos por la tabla de hechos. Para hacer eso, necesitamos eliminar las claves de negocio:

NOTA: El dataframe source se define en el bloque de código anterior.

Con los campos alineados, el paso de publicación es sencillo. Emparejamos nuestros registros entrantes con los de la tabla de hechos basándonos en los campos de dimensión degenerados, que sirven como identificador único para nuestros registros de hechos, y luego actualizamos o insertamos los valores según sea necesario:

Próximos pasos

Esperamos que esta serie de blogs haya sido informativa para aquellos que buscan construir modelos dimensionales en la Plataforma Databricks. Esperamos que muchos experimentados con este enfoque de modelado de datos y los flujos de trabajo ETL asociados lo encuentren familiar, accesible y capaz de soportar patrones establecidos con cambios mínimos en comparación con lo que se puede haber implementado en plataformas RDBMS. Donde surjan cambios, como la capacidad de implementar lógica de flujo de trabajo utilizando una combinación de Python y SQL, esperamos que los ingenieros de datos encuentren que esto hace que su trabajo sea más sencillo de implementar y mantener con el tiempo.

Para obtener más información sobre Databricks SQL, visite nuestro sitio web o lea la documentación. También puede consultar el recorrido del producto para Databricks SQL. Si desea migrar su almacén existente a un almacén de datos sin servidor de alto rendimiento con una excelente experiencia de usuario y un menor costo total, Databricks SQL es la solución: pruébelo gratis.

(Esta entrada del blog ha sido traducida utilizando herramientas basadas en inteligencia artificial) Publicación original

No te pierdas ninguna publicación de Databricks.

Suscríbete a nuestro blog y recibe las últimas publicaciones en tu bandeja de entrada.