A modelagem dimensional é uma abordagem comprovada para a construção de data warehouses prontos para análises. Embora muitas organizações estejam migrando para plataformas modernas como Databricks, essas técnicas fundamentais ainda se aplicam.
Na Parte 1, nós projetamos nosso esquema dimensional. Na Parte 2, nós construímos pipelines ETL para tabelas de dimensão. Agora na Parte 3, implementamos a lógica ETL para tabelas de fatos, enfatizando eficiência e integridade.
Em o primeiro blog, definimos a tabela de fatos, FactInternetSales, conforme mostrado abaixo. Em comparação com nossas tabelas de dimensões, a tabela de fatos é relativamente estreita em termos de comprimento do registro, com apenas referências de chave estrangeira para nossas tabelas de dimensões, nossas medidas de fatos, nossos campos de dimensão degenerada e um único campo de metadados presente:
NOTA: No exemplo abaixo, alteramos a declaração CREATE TABLE do nosso primeiro post para incluir as definições de chave estrangeira em vez de defini-las em declarações ALTER TABLE separadas. Também incluímos uma restrição de chave primária nos campos de dimensão degenerada para ser mais explícito sobre o papel deles nesta tabela de fatos.
A definição da tabela é bastante direta, mas vale a pena discutir o LastModifiedDateTime campo de metadados. Embora as tabelas de fatos sejam relativamente estreitas em termos de contagem de campos, elas tendem a ser muito profundas em termos de contagem de linhas. As tabelas de fatos geralmente abrigam milhões, se não bilhões, de registros, muitas vezes derivados de atividades operacionais de alto volume. Em vez de tentar recarregar a tabela com uma extração completa em cada ciclo ETL, normalmente limitamos nossos esforços a novos registros e aqueles que foram alterados.
Dependendo do sistema de origem e de sua infraestrutura subjacente, existem muitas maneiras de identificar quais registros operacionais precisam ser extraídos em um determinado ciclo ETL. As capacidades de captura de dados de alteração (CDC) implementadas no lado operacional são os mecanismos mais confiáveis. Mas quando esses não estão disponíveis, geralmente recorremos a timestamps registrados com cada registro de transação à medida que é criado e modificado. A abordagem não é à prova de falhas para detecção de alterações, mas como qualquer desenvolvedor ETL experiente atestará, muitas vezes é o melhor que temos.
NOTA: A introdução do Lakeflow Connect oferece uma opção interessante para realizar a captura de dados de alteração em bancos de dados relacionais. Esta capacidade está em pré-visualização no momento da escrita deste blog. No entanto, à medida que a capacidade amadurece para expandir mais e mais RDBMSs, esperamos que isso forneça um mecanismo eficaz e eficiente para extrações incrementais.
Em nossa tabela de fatos, o LastModifiedDateTime campo captura um valor de timestamp registrado no sistema operacional. Antes de extrair dados de nosso sistema operacional, revisaremos a tabela de fatos para identificar o valor mais recente para este campo que registramos. Esse valor será o ponto de partida para nossa extração incremental (também conhecida como delta).
O fluxo de trabalho de alto nível para nosso ETL de fatos procederá da seguinte maneira:
Para tornar esse fluxo de trabalho mais fácil de compreender, descreveremos suas fases-chave nas seções a seguir. Ao contrário do post sobre ETL de dimensão, implementaremos nossa lógica para este fluxo de trabalho usando uma combinação de SQL e Python, dependendo de qual linguagem torna cada etapa mais fácil de implementar. Novamente, uma das forças da Plataforma Databricks é seu suporte para múltiplos idiomas. Em vez de apresentá-lo como uma escolha de tudo ou nada feita no topo de uma implementação, mostraremos como os engenheiros de dados podem alternar rapidamente entre os dois dentro de uma única implementação.
Os dois primeiros passos do nosso fluxo de trabalho se concentram em extrair novas informações e informações recentemente atualizadas do nosso sistema operacional. No primeiro passo, fazemos uma simples busca do último valor registrado para LastModifiedDateTime. Se a tabela de fatos estiver vazia, como deveria estar na inicialização, definimos um valor padrão que está suficientemente no passado para acreditarmos que capturará todos os dados relevantes no sistema de origem:
Agora podemos extrair os dados necessários do nosso sistema operacional usando esse valor. Embora esta consulta inclua bastante detalhe, concentre sua atenção na cláusula WHERE, onde empregamos o último valor de timestamp observado do passo anterior para recuperar os itens de linha que são novos ou modificados (ou associados a pedidos de venda que são novos ou modificados):
Como antes, os dados extraídos são persistidos em uma tabela em nosso esquema de staging, acessível apenas aos nossos engenheiros de dados, antes de prosseguir para as etapas subsequentes no fluxo de trabalho. Se tivermos qualquer limpeza de dados adicional a realizar, devemos fazê-lo agora.
A sequência típica em um ciclo ETL de data warehouse é executar nossos fluxos de trabalho ETL de dimensão e depois nossos fluxos de trabalho de fatos logo em seguida. Ao organizar nossos processos desta maneira, podemos garantir melhor que todas as informações necessárias para conectar nossos registros de fatos aos dados de dimensão estarão disponíveis. No entanto, existe uma janela estreita dentro da qual novos dados orientados à dimensão chegam e são capturados por um registro transacional relevante para fatos. Essa janela aumenta se tivermos uma falha no ciclo ETL geral que atrasa a extração de dados de fatos. E, claro, sempre podem haver falhas referenciais nos sistemas de origem que permitem que dados questionáveis apareçam em um registro transacional.
Para nos protegermos deste problema, inseriremos em uma determinada tabela de dimensão quaisquer valores de chave de negócio encontrados em nossos dados de fatos em estágio, mas não no conjunto de registros atuais (não expirados) para essa dimensão. Esta abordagem criará um registro com uma chave de negócio (natural) e uma chave substituta que nossa tabela de fatos pode referenciar. Esses registros serão marcados como chegando tarde se a dimensão alvo for um SCD do Tipo-2, para que possamos atualizar adequadamente no próximo ciclo ETL.
Para começar, vamos compilar uma lista de campos de negócios chave em nossos dados de staging. Aqui, estamos explorando convenções de nomenclatura estritas que nos permitem identificar esses campos dinamicamente:
NOTA: Estamos mudando para Python nos próximos exemplos de código. Databricks suporta o uso de várias linguagens, mesmo dentro do mesmo fluxo de trabalho. Neste exemplo, Python nos dá um pouco mais de flexibilidade enquanto ainda se alinha com os conceitos de SQL, tornando essa abordagem acessível para mais desenvolvedores SQL tradicionais.
Note que separamos nossas chaves de data das outras chaves de negócios. Voltaremos a essas em um momento, mas por agora, vamos nos concentrar nas chaves não-datadas (outras) nesta tabela.
Para cada chave de negócio que não seja data, podemos usar nossas convenções de nomenclatura de campo e tabela para identificar a tabela de dimensão que deve conter essa chave e, em seguida, realizar um left-semi join (semelhante a uma comparação NOT IN() mas suportando correspondência de várias colunas, se necessário) para identificar quaisquer valores para essa coluna na tabela de staging, mas não na tabela de dimensão. Quando encontramos um valor não correspondido, simplesmente o inserimos na tabela de dimensão com a configuração apropriada para o IsLateArriving campo:
Esta lógica funcionaria bem para nossas referências de dimensão de data se quiséssemos garantir que nossos registros de fatos estejam vinculados a entradas válidas. No entanto, muitos sistemas de BI downstream implementam lógica que requer que a dimensão de data abrigue uma série contínua e ininterrupta de datas entre os valores mais antigos e mais recentes registrados. Caso encontremos uma data antes ou depois do intervalo de valores na tabela, precisamos não apenas inserir o membro ausente, mas criar os valores adicionais necessários para preservar um intervalo ininterrupto. Por esse motivo, precisamos de uma lógica ligeiramente diferente para quaisquer datas de chegada tardia:
Se você não trabalhou muito com Databricks ou Spark SQL, a consulta no coração deste último passo provavelmente é estranha. A sequence() função cria uma sequência de valores com base em um início e fim especificados. O resultado é uma matriz que podemos então explodir (usando a explode() função) para que cada elemento na matriz forme uma linha em um conjunto de resultados. A partir daí, simplesmente comparamos o intervalo necessário com o que está na tabela de dimensão para identificar quais elementos precisam ser inseridos. Com essa inserção, garantimos que temos um valor de chave substituta implementado nesta dimensão como uma smart key para que nossos registros de fatos tenham algo para referenciar.
Agora que podemos ter certeza de que todas as chaves de negócio em nossa tabela de staging podem ser correspondidas a registros em suas dimensões correspondentes, podemos prosseguir com a publicação na tabela de fatos.
O primeiro passo neste processo é procurar os valores de chave estrangeira para essas chaves de negócio. Isso pode ser feito como parte de um único passo de publicação, mas o grande número de junções na consulta muitas vezes torna essa abordagem desafiadora para manter. Por esse motivo, podemos adotar a abordagem menos eficiente, mas mais fácil de compreender e modificar, de procurar valores de chave estrangeira um negócio de cada vez e anexar esses valores à nossa tabela de staging:
Novamente, estamos explorando convenções de nomenclatura para tornar essa lógica mais fácil de implementar. Como nossa dimensão de data é uma dimensão de papel e, portanto, segue uma convenção de nomenclatura mais variável, implementamos uma lógica ligeiramente diferente para essas chaves de negócios.
Neste ponto, nossa tabela de staging abriga chaves de negócio e valores de chave substituta, juntamente com nossas medidas, campos de dimensão degenerada e o valor LastModifiedDate extraído do nosso sistema de origem. Para tornar a publicação mais gerenciável, devemos alinhar os campos disponíveis com os suportados pela tabela de fatos. Para fazer isso, precisamos descartar as chaves de negócio:
NOTA: O source dataframe é definido no bloco de código anterior.
Com os campos alinhados, a etapa de publicação é simples. Comparamos nossos registros de entrada com os da tabela de fatos com base nos campos de dimensão degenerada, que servem como um identificador único para nossos registros de fatos, e então atualizamos ou inserimos valores conforme necessário:
Esperamos que esta série de blog tenha sido informativa para aqueles que buscam construir modelos dimensionais na Plataforma Databricks. Esperamos que muitos experientes com essa abordagem de modelagem de dados e os fluxos de trabalho ETL associados a ela achem o Databricks familiar, acessível e capaz de suportar padrões estabelecidos há muito tempo com mínimas alterações em comparação com o que pode ter sido implementado em plataformas RDBMS. Onde surgem mudanças, como a capacidade de implementar lógica de fluxo de trabalho usando uma combinação de Python e SQL, esperamos que os engenheiros de dados achem que isso torna seu trabalho mais direto para implementar e suportar ao longo do tempo.
Para saber mais sobre Databricks SQL, visite nosso website ou leia a documentação. Você também pode conferir o tour do produto para Databricks SQL. Suponha que você queira migrar seu armazém existente para um data warehouse sem servidor de alto desempenho, com uma ótima experiência do usuário e custo total mais baixo. Nesse caso, Databricks SQL é a solução — experimente gratuitamente.
(This blog post has been translated using AI-powered tools) Original Post