Reckitt is a fast-moving consumer goods company with a portfolio of famous brands and over 30k employees worldwide. With that scale small projects can quickly grow into big datasets, and processing and cleaning all that data can become a challenge. To solve that challenge we have created a metadata driven ETL framework for orchestrating data transformations through parametrised SQL scripts. It allows us to create various paths for our data as well as easily version control them. The approach of standardising incoming datasets and creating reusable SQL processes has proven to be a winning formula. It has helped simplify complicated landing/stage/merge processes and allowed them to be self-documenting.
But this is only half the battle, we also want to create data products. Documented, quality assured data sets that are intuitive to use. As we move to a CI/CD approach, increasing the frequency of deployments, the demand of keeping documentation and data quality assessments up to date becomes increasingly challenging. To solve this problem, we have expanded our ETL framework to include SQL processes that automate data quality activities. Using the Hive metastore as a starting point, we have leveraged this framework to automate the maintenance of a data dictionary and reduce documenting, model refinement, testing data quality and filtering out bad data to a box filling exercise. In this talk we discuss our approach to maintaining high quality data products and share examples of how we automate data quality processes.
Karol Sawicz: Hello everyone. My name is Karol Sawicz, and today we’ll be discussing the topic of automating data quality processes at Reckitt. In this talk we will go over who are Reckitt, and what is the project background and project architecture. And then we will describe what is the ETL framework that we are using and how we’re turning the data sets into the data products. And at the end, you will see the demo of the data quality processes that we use.
So to start off, who are Reckitt? Our company’s an FMCG company with global presence, we have many, many brands which you can see on the right. This is actually not the full list, we have many more especially local brands. And at the moment we have more than 40,000 employees, which many of those are actually sales reps, which are the main data source for the project we are discussing today.
Now we have more than 50 sales CRMs worldwide. And as you can see, more than a thousand of sales representatives globally. All of that is leading to many local and global reporting platforms, which in effect are like combining of hundreds of data lakes.
A little bit about me, I’ve been working at RB as a business analyst for four years already. And in this time I was building end to end reporting solutions, both local and global. And I was also rolling out the sales reporting platforms, as well as the systems which all the sales reps use on a daily basis. Before that I graduated from Polish-Japanese Academy in computer science, and I build electric bikes in spare time.
To start about the project, our project is called sales execution reporting. And the challenge of that was that we had many, many systems locally and all of the reporting was done locally as well. We didn’t have any scale of that, so everyone was doing the same thing locally, data ingestion, data cleaning, and then visualization. So the challenge was to solve that and move this tool to that original level. And our goals were to automate the data ingestion, so the ETL framework will be helping with that, as well as make the mapping and cleansing of the data very easy and sustainable. And at the end, we will get the model, which is combining all of the data sources as well as clean data ready for analysis so that every new project can start from the base that we built, not have to redo the whole thing again.
To go over the architecture, we’ve splitted this to three slides, each representing the bronze, silver, and gold layer of our architecture. This is also adhering to the guidance that Databricks is giving. And on the bronze level, we don’t really process anything apart from archiving the data. On the silver layer, this is where all the magic happens. And this is where the ETL library that we’ve built is first ingesting the data. And then it moves to the value local values mapping application, which is providing the mapping from local values to global values. This could be local brands mapping to global brands or any system statuses that we get from the systems. And at the end of that, once everything is mapped and nicely processed, the data quality is checked according to the rules that we predefined for each and every country separately, or every source.
And at the end of all of that we have data which is already clean, it’s filtered out from the junk data, and on top of that we define reporting views, which could be used for our BI reporting or reporting directly in Databricks. And this is really it for the architecture, now I’m going to hand it off to Richard who is going to describe how the ETL framework works, as well as what do we do with the data quality library, and this is what the demo will cover as well. Thank you very much.
Richard Chadwic…: Hi, my name is Richard Chadwick and I’m a data engineering consultants in Cervello, and I’ve been working on the sales execution project for the last 18 months. I’ve been working on archiving ETL validation and deploying that data as a service. So we’re briefly going to discuss the metadata driven ETL framework first, and this is a solution for solving the complexity of many small, similar but unique ETL processes. And so the framework we built is split into three parts, land data, SQL processes for transforming data, and an ETL framework that allows a user to mix or match any land or SQL process to create an end-to-end ETL plan.
So going briefly through them one at a time, the land framework is implemented as a sort of configuration table of all the unique archive data objects. And within those configuration tables, you would have any variable that you would need to include it in a sort of spark function to land that data as a temporary view. And so we create wrapper functions that allow a user to land any unique data source with a single line of code by calling the unique key associated with that archive their object.
For our SQL framework, we have a repository of version controlled SQL scripts, and these can be parameterized and the configurations will describe the execution plan of the SQL scripts. So you might have a many to one relationship between configurations and SQL scripts, and so for example, if you’re merging data into a target table and you’re reusing that script for multiple processes, you could parametrize the partitions that you’re inserting into, for example. And this is a way that we use to reduce the amount of SQL scripts that we have to manage and store.
And finally, the ETL framework allows the user to mix or match any number of land or SQL processes to create an end to end ETL process, and that would configure the whole ETL process from our sort of bronze to silver layer. And all of these frameworks can be implemented with just two lines of code, where a user just has to call the unique key associated with a SQL script or a land object or an ETL plan and a few parameters that would describe either the branches that we’re taking scripts from, the partitions of data we’re landing, or maybe some of the parameters that we would pass into the SQL script. So the benefit of this data-driven ETL framework is one where configurations double up as documentation. And so a wide range of stakeholders can have an easy view of what’s happening and because of that low code execution, they can also participate in that as well. And so data engineers can set up a few, kind of lead by example, and then more analysts and engineers can contribute to those ETL processes by seeing how those configurations result in end to end transformations.
Another advantage of reducing the complexity is that our ADF pipelines, or pipelines for orchestrating these end to end processes are quite simple. So they’re often just a lookup on a configuration table and then a for-loop around the single Databricks notebook and that could be executed sequentially or concurrently depending on the needs, or also demands of your data. And we want to take these benefits, the data-driven ETL framework and apply them to creating a data product. And what do we mean by that? We mean an accurate data dictionary. They are objects with good naming conventions, data types, and assessable ordering to their columns, value mapping and data quality tested against expectations and a strategy for remediating those data quality failures. And all of this will contribute to adhering to any service level agreements you want to set for the data products that you’re servicing, your analysts or your dashboards, or anything like that.
So without further ado, we’re just going to skip into our demo where we’ll go into more detail about how we use configuration driven processes and configuration driven SQL to execute these data quality processes that turn a dataset into a data product. So you can find all the code here, if you have any questions you can comment on this, GitHub or ask us after the demo. So we’re just going to skip to the demo now.
Speaker 3: Welcome to the demo section of the presentation and the GitHub repository that we shared contains eight notebooks, and they’re available as a DBC archive. So you can upload them into your Databricks workspace and you’ll be provided with these eight notebooks. If you execute them chronologically, you can replicate the demo in your own environment. So I’m going to skip past these first and second notebooks. All we’ve done in them is create some sample data using internal Databricks datasets, so anyone can execute that notebook and have that same sample data. And in the second notebook, we created some empty dictionary tables to populate our dictionary information with. And so we can see those tables here. We’ve got sample data, which three of these are docs tables and is a view, and we’ve got our dictionary tables.
One is going to be for column metadata, so stuff like data types. And one’s going to be for table metadata, for stuff like create statements and file locations. So we’re in our first automated data quality process, and we’re going to look at populating our data dictionary using our metastore. We have all the information we need, but the problem is that we can only access them sort of one date or object at a time. So we can do stuff like show tables from a schema, or describe detail of a table, or show the create statement. And what we want is a view of the whole schema in one temporary view. And if we have that, we can execute a SQL process that will update our data dictionary. So that’s what we’re going to create in this notebook. And we’re going to do this by creating a function that will take a list of databases as an argument, and it’s going to identify all of the tables in those databases, and it’s just going to run a for- loop for those tables.
And each of those loops is going to pick up one section of metadata. So we’re going to have one that picks up the column metadata, one that picks up the table metadata, and one that picks up the create statements. And that data’s going to be conformed using pandas data frames, converted into a temporary view. And in that way, we land it to make it available for a downstream ETL process. And so I’m not going to go into too much detail about how these functions work, other than the fact that it’s just sort of spark commands, describe command spark catalog commands, some pandas and string manipulation. And you can check this out on your own time, but once we’ve defined this function, it allows us to land our metadata from the metastore with a single function call.
And so we’re just demoing that here and we’re landing the metadata for the sample data schema. So we can see our table metadata here, and we have the file location for our tables, the number of files they take up the size and bytes. And we’ve also inferred some information where we don’t have it, so for example, the big loans, which is a view, we don’t have a file location so we’re just going to populate that with the string none. All of these nones would actually be a no, but pandas didn’t support that, so none is kind of a temporary landing string instead of no, and we replaced that with another in the SQL script that will populate this into our main dictionary. And so we’ve got our table metadata, we’ve got our column metadata. So our column names, data type isn’t nullable, is a partition in the table.
And we’ve inferred that ordinal position based on the ordering of the columns we have in the describe table statement, and we’ve also got our create metadata. And so these are just the create statements associated with each table. So we have one line of code, we’re able to land this data and using a bit of a SQL script here, we’re just going to persist this in our dictionary tables. So the create table statements and the table metadata will go in the table dictionary and the column metadata will go in the column dictionary. And so all the SQL scripts are doing is converting those nones back to nos and providing like a unique ID for the column [inaudible] that database table and column name together. And putting that into the target dictionary. It’s also going to perform an anti-join against the target to identify which data objects columns have been deleted, and it’s going to update them with an [istalated] flag.
So now we can see if we query our column dictionary, we have all our columns specified, and all the metadata for those columns populate in our dictionary. And then we’re going to do the same for the table metadata. We’re going to join the create metadata with the table metadata so that we can store all in one data table. And again, we’re going to perform that anti-join so we can update it’s deleted values. And so what we have there is a repeatable process now where you can execute this notebook daily using a job, and that will provide you an automated and up-to-date data dictionary with all the metadata that’s available in your metastore.
So in our next notebook, we’re going to look at commenting and reordering columns, and this is not so much an automated data quality process, but assistance with a data quality process that has a lot of value associated with it. So in our last notebook we were doing this daily job that could update our dictionary. And as part of that update, it would ingest the comments associated with any of the columns, and it would provide a user sort of a useful thing that would help speed up that time to insight when working with any dataset. So arguably, comments around columns are one of the most important components of a data dictionary. So we’re in this situation where if we apply these comments to our table, we’re going to hit two birds with one stone, we’re going to get those comments available on our tables for users to see with described commands, and we’ve also have this centralized dictionary of all our comments clear in one place.
Now, the challenge with applying these comments is these replace column statements allow us to reorder the columns or to add comments to columns, and they’re pretty close to creating the table. And so I’ve got an example here, and what you’ll see is that you have to specify the data type of each column, and you have to apply your comments, and you cannot miss any of the columns out because it’s the equivalent of dropping a column, which you can’t do. And so it becomes quite a challenge to get these commands together. And so what we propose in this notebook is just something that’s going to help you with that. And so how we’re going to do that is we’re going to use the dictionary we’ve just created as a staging area, and we’re going to provide a function that will take the comments on ordinal positions in that dictionary and pass the commands to apply those changes to the table. And in doing so, we’re allowing a user to update comments or ordinal positions in the dictionary instead of having to pass these massive commands to reorder a hundred columns or apply comments.
And so we’re just going to quickly demo that in this notebook. And so in this first command, we’re just updating a couple of comments in our dictionary and the second command we’re reordering our columns alphabetically. So now if we query the dictionary demo column table, we can see that there’s some comment applied and out alphabetically ordered columns have incrementing ordinal position with them. And we’re just going to define this function. And there’s going to pass what are pretty simple commands if you have all this data available to you, but very difficult to write manually. And again, it’s just a few spark commands and some string manipulation, only about 50 lines this function. But the end result is a one-liner that will bring the ordinal positions and comments of columns in line with the values in the dictionary.
So this very much simplifies the process of applying comments and opens it up to a wider set of users, anyone who’s able to edit that data dictionary and execute that one line. And this is something that if you put the investment in up front is going to provide significant value to your data products down the line, and in a year or two, if you imagine a whole data model with every column commented would be quite a valuable proposition. One thing we can’t do is apply comments to views after they’d been created, and so an easy way to get those comments to view is with comment inheritance. So in our first notebook which we skipped past, we created this view and big loans, which was just a select style of a where clause, and what we’re going to do is just recreate that view in place. And after that, if we describe it again, we can see that the ordinal positions and comments have been inherited. So this is very convenient for documenting views as well.
Moving on to our fifth number, we’re going to look at configuring data quality tests. So the objective here is just to create some configuration tables that contain with them all the parameters that we would need to execute data quality tests. And the way we’re going to implement this as close to a box ticking exercise as possible to make it easy for users to iterate on these tests, is to just add a couple of columns to our column dictionary. One of those columns will be for no [test] , and one will be for a length test. And then we’re going to use these columns to create views that will be the configuration tables. We’re also going to add a key column to our table dictionary that will allow us to establish the unique non no keys for each table that will help us carry out those tests. So you can see in this command here we’re adding the [no test and] length test column, and for the table dictionary we’re adding the key column. We’re updating the value of those keys in the table dictionary.
And we see those changes applied here, we’ve got key column with keys. And we’re going to create views that will leverage those no and length test columns, that will act as our configuration tables. So in the no test one, we’re also going to concatenate the database table and column name with the is no string to create a test name that describes the test failure. And we’re going to do the same with the length test, and that’s going to include the integer, the length that we expect the string to be. So you can see if we query that no test, we haven’t got any results because we haven’t configured any tests. We configure no tests for all the columns and the people in geo table, and a length test for the name column in the people table by updating those values. And so if the value is no, there will be no tests executed. You can see now that we have these configuration tables, and contained within them, all the parameters we would need to carry out tests.
In this next little bit, we’re going to look at executing data quality tests. Now, one of the most important features that dictates the user experience for any data quality testing framework is how results are presented. And what we generally find is that when we find a data quality issue that needs to be resolved, one of the first things we do is look to review a sample of data effected by that data quality issue. So we think it would be convenient if data quality issues were expressed as features of our dataset. And so we propose a method by storing the results of data quality tests as tags on offending rows that describe the data quality tests that have failed. And in doing so, these features can be leveraged like any other feature to filter data, aggregate data, or bucket as you need to, to resolve those data quality issues.
So to achieve this, we’re going to add a list type column to every delta table, and we’re going to call that data quality tag. And we’re going to create a parametrized SQL function that similar to our previous processes, will leverage the configuration tables to populate SQL queries, that will execute a SQL plan, that will populate this tag column with our failed data quality tests. So I’m not going to go into too much detail about this function. Again, a lot of string manipulation, but an important component of this is creating case statements to describe a test. And so you can see here is our sort of no test case statement, case when value is no then fail, else pass. And the way the SQL query works is it just concatenates a bunch of these strings together to perform a large suite of tests, and it collects them all into a single array and it will be using the unique non no key of the column to merge those results back into the target.
So here we established this function, and we’re again in a position where we have a single line of code and we can execute the SQL plan that will carry out these tests on a table based on the configurations we set. So here we’re testing the people table, and we can figured no tests for every column, and we also configured a length for test for the name column. And I’m just going to quickly walk you through the SQL plan. And so what we’re doing in our first query is we’re staging our results. And you can see the concatenation of all these case statements that each express an individual test. And secondly, we’re going to make sure that our unique non no keys are actually unique and non no, where that’s not the case we won’t be able to right the resource back to the table.
So we’re just going to hit it with a key not unique tag, in the case where they are non unique results. And then finally, we’re going to merge those resource back in, and we’re going to quickly update the tag column with a key not unique for any instance where the unique key columns are no, because we wouldn’t have been able to merge them whether they were duplicate or not. So after executing this process, we can see that we have now data quality tags on our people table, and we can see that one of our records has failed the test, name is not length for. So again with one-liners, we can implement this for the geo table and we can see that we have some results here. We’ve got is no values and we can also see we’ve got key not unique values.
And this key not unique tag is effectively blocking any other tests from taking place, and should be considered the worst of all failures, non-unique records. We can also execute tests for tables for which we have not configured any tests, and all we’re going to be doing here is checking the uniqueness and non-noness of those keys that we’ve established. So in this instance, everything’s going to pass except stuff about unique and non-no key. Looking at the results for the [inaudible] table, we can actually see that our ID column was not unique in any case, and so everything has been hit with a key not unique value. And so even though we haven’t actually applied any tests, there is still this one default test applied to every table and it’s been failed in every instance. So what we’re going to quickly do here is replace the ID column with incrementing inches, so that in that way, we’ll know that it’s unique.
And we’re going to set a no test for every column in this table which has over a hundred columns to it. And we’re going to execute that test again. And here we can see we’re making very big SQL statements that we wouldn’t be able to compose manually. And these will performantly create these tests and persist these results in our data quality tag column. And so we can see here we’ve got in this instance, in some cases over a hundred tags, and there’s really unlimited amount of tests that you can configure. You should just make sure that the they’re actionable with results. You don’t want to create so much tests that a user is just going to ignore them completely because they can’t act on them.
So we can store as many test results as we want in a single data quality tag column. And we can view them up to 4,000 tags, which is more than you would ever need to store hopefully, else you’ve got much bigger problems with your data. And then this style of test is testing anything that is affecting a single record, so the [inaudible] a test so we could set apart pattern matching tests with rejects, future date tests, valid value tests, range tests, anything you can think of that is evaluating a single record against the given condition.
So in this notebook I’m going to look at leveraging the data quality tags we just created to soft block or soft delete our data. So we’re going to have a isBlocked column on all our Delta tables, and that’s just going to be like a is deleted column, and it’s going to have true or false values that we can use to filter for good and bad quality data in our data sets. And the benefit of this framework is it allows us to keep all our data in place whilst only exposing our end users to good quality data. And this is the main reason that we created this framework, because all the other alternate solutions propose quarantining or deleting bad quality data. So in this notebook, I’m just going to show you how to orchestrate this process.
So first I create an isBlocked configuration table, and I’m going to use that to dump all of my tests in, and I’m going to have one column that will allow me to configure each test as a blocker or not a blocker. Then I’m going to create a SQL process that will update this table. And I should execute this every time I change one of the values in my null or length test columns. And this will bring the values of my isBlocked configuration table in line with the configurations I have set in my dictionary column table, as well as including that key not unique test.
So you can see we’ve populated our isBlocked config table with all our tests, and we have a default value for null as isBlocked, and that’s the equivalent for false. If we set that value to true, and that data quality tag is present in the data quality tag row, then that record will be soft blocked or deleted, soft deleted, however you want to phrase it. So we add the isBlocked column to all our delta tables, we define a function that will pass an update statement based on the values in that isBlocked configuration table, and we update our key not unique test to be a blocker. And I would advise that all the key not unique tests are blockers, because these could potentially block other test failures that are blockers, and so this should be considered the worst test failure of the more.
So we execute this function on our geo table where we have only blocked the keynote unique tag, and if we review the isBlocked flags we can see where we’ve got a no and failure. The record is not blocked, but where key is not unique, the record isBlocked, and where there’s no test failure the record is not blocked. So we can update our isBlocked table to make all the tests in geo blockers and execute this function again, and now we can see that any record that has a test failure associated with it has the isBlocked flag set to true.
So we could create views on this table, select star from sample data geo where isBlocked equals false, and that will create us a table of good quality data that we can give to an end user. And that will also inherit the comments associated with that table, so if we’ve documented the underlying table, the view would also be documented.
So moving over to the final notebook of our demo, we’re just going to have a look at how we orchestrate mapping local to global values for our entire data model. And this is something that we do when translating values to English or conforming categorical values. So the best way for me to explain this is to give you a little look at the mapping data model that we’ve created and walk you through how these configuration tables interact with each other.
So here we have our table set up and we start off with our column mappings table on the far left, this is a Delta table. In this we’re just going to define the source and target columns for our mappings and using this table, we’re going to pass a create view statement that will be a union of all our source columns’ unique values. So this will get together for us all the values that we need to provide mappings for. We’re going to create a mappings table, which we will populate with our source and target values, so define our mappings. And then we create a view from our values to map and mappings table that will show us the values that we have yet to map. And so this is something that we can use to serve values to users for mappings through an Excel sheet or through an external application. With our populated values in our dictionary table, we’re then able to create a parameterized SQL process that will apply these mappings to our data model. And I’m just going to quickly walk you through that below.
So first we’re going to create our column mappings table. We’re going to define some source and target columns from mappings, and we’re going to add those target columns to our people table, and so all these mappings are for the people table. So here we can see our source and target values, and then using this create map view function that we’ll define, we’re going to create a view, which will be the unique values that we need to map. So you can see how this function works, it’s kind of picking up all those unique values from the source column and just them all and including the source column that those values are coming from.
So here we have our nine unique values to provide mappings for, and we’re going to create the Delta table for which we’ll populate with mappings, and the view for which we will use to determine unmapped values. And so we could see if we created that view, we have our nine values that were all mapped. And if we defined the target buyers for eight of those nine mappings, we can see that we have one value left to provide a mapping for. Next we’re going to define this function that will be a parameterized SQL processor, we’ll apply the mappings to our target table.
And so I’m not going to go through this, we don’t have time, but again, like our other process, it leaves us with this one liner that we can execute to implement those mappings on our target table. So we have a parameterized SQL process here that’s staging our mappings and then merging them in using the unique non-no key. And if we have a look at our people table again, we can see that these mappings have now been applied, except for the one value that we didn’t provide a mapping for. And this approach to mapping very much scales to all of our data volume needs. So millions of records, hundreds of different mappings all work with this orchestration. And that concludes the demo part of our presentation.
Richard Chadwic…: Thank you very much for watching our presentation. If you have any feedback or questions, we’d be happy to hear them.
Richard Chadwick is a data engineering consultant at Cervello, a Kearney Company, where he works with enterprise clients to develop data products using the latest cloud technologies. Richard holds a M...
Karol Sawicz is an IT Business Analyst at RB, where he is involved in various data projects with responsibilities ranging from building full end to end reporting solutions as well as project managing ...