At Mars Petcare (in a division known as Kinship Data & Analytics) we are building out the Petcare Data Platform – a cloud based Data Lake solution. Leveraging Microsoft Azure, we were faced with important decisions around tools and design. We chose Delta Lake as a storage layer to build out our platform and bring insight to the science community across Mars Petcare. Migrating away from Azure Data Factory completely, we leveraged Spark and Databricks to build ‘Kyte’, a bespoke pipeline tool which has massively accelerated our ability to ingest, cleanse and process new data sources from across our large and complicated organisation. Building on this we have started to use Delta Lake for our ETL configurations and have built a bespoke UI for monitoring and scheduling our Spark pipelines. Find out more about why we chose a Spark-heavy ETL design and a Delta Lake driven platform, the advantages (and difficulties) of migrating away from Azure Data Factory, and why we are committing to Spark and Delta Lake as the core of our Platform to support our mission: Making a Better World for Pets! Key Takeaways:
– Hi, we’re Mars Petcare. I’m Kirby, here with George, and we’re data engineers for the global petcare data and analytics team within Mars.
So Mars Petcare is made up of about 50 different brands, 85,000 associates, hitting over 55 countries, and really we just cover a wide range of pet care needs. So our talk today, we’re just gonna go a bit through what our data platform looks like. Some of the reasons that we’ve chosen to go for a Databricks and Spark ETL Solution, and Delta Lake as both an internal tool along side that as well as our output for our analysts. So our platform, we’re building out a data lake, which is like a global lake with all of the brands petcare data, all put into this one place. So then you’ve got a consistent format, you’ve got a single location for all of these data assets and that gives you that one source of truth which all of the analytics teams, across all of these different brands can tap into and use together. And that also means you’ve got shared documentation on all of these data assets and collaboration across teams that just wasn’t possible before, when it was all managed individually by the brands.
Some of the brands that we’ve got, we’ve got a series of vets like Banfield, BluePearl, VCA. And they’re (laughs) really fun ’cause we get a lot of medical data, you get medical notes, diagnosis, things like that, so you really get a view on the healths of the pets and the things that come out of checkups. And then we’ve got a lot of nutrition brands, Pedigree, Royal Canin, IAMS. So a lot of the time we can tell kind of what foods they’re eating. You can combine some of the information from them together, to then see how pet food is affecting their health. We’ve got some pet tech companies like Whistle, who do GPS trackers for dogs, and Wisdom Panel who do genetic sampling. And then centers like Waltham, which is a pet science center, and they’re really heavily into the research around pet health and how to improve pet lives.
So the landscape currently is our sources come from all of these different business units who operate as their own businesses, so they’ve got their own source systems, their own data sets, their own models, and that’s all managed by them using whatever tools that they have chosen and set up with. Which means there’s a really varied approach to data collection and the structure that they store that data in. And this massive diversity in data quality, data format, is quite difficult if you want to swap in between them because you have to then learn about all of these systems, and have access to tap into them. So that’s kind of the source that we’re dealing with. So our ETL tool takes all of that and then standardizes certain fields, we have the same data format, we provide the data all in the exact same way for users on the analytics platforms, so they don’t have to deal with all of the desperacy kind of from these source systems. And then on top of that we also, optimize these tables for analytical use or output rather than the kind of more operational format that the data’s in.
– Great, so let’s dive into a little bit about our solution, how we built this framework using Databricks and Spark.
So to start with I’m giving you some context, to talk about our tech stacks. So Mars has chosen to use Azure, so that’s the platform we’re on. This is great, we’ve got access to a lot of tools and obviously this plays very nicely with Databricks. So the typical tool for general ETL on Azure, as some of you probably know, is ADF, Data Factory. This is great, this is a great tool, it’s got a lot of features straight out the box. And this is what we were originally using in our team as well. What we found over time and linking back to the slide that Kirby was just talking about was that, since this data is controlled by the separate business units within Mars, we kind of have to adhere to their schemas and their schema changes, their export capabilities, whether that’s fully up to scratch or not. That we were adding a lot of custom logic in via Databricks notebooks into those ADF pipelines, in order to handle these error cases and these changes in schema evolutions and then everything like that. So we got to a point where we just looked at what our pipe lining system was like and decided that we could just build this from the ground up. And have a much more scalable solution using the powerful tool that we had at our disposal, which was Spark and Databricks. So then moving in to Databricks, using that as our basis we then had this ability to directly connect to all sorts of systems because of the built in JDBC connectors, and libraries that are packaged as part of the runtime. And on top of that, we had access to the entire open source library of Python, using mainly PySpark as our choice of language. We could access any open source Python library to kind of do some more clever stuff around automated passing of unstructured data like CSVs or JSONs.
It’s also infinitely invited commerce scalable, we can scale out with those clusters as wide as we want, depending on how much data we need to push through this system. And then finally we’re really integrating Delta Lake in our framework, both as the metadata config layer for the pipeline itself, but also as our output layer that we give to our analysts for writing models and analyzing, and producing out from the data.
So here’s a sort of high level diagram of what our ETL framework looks like, it’s called Kyte, so if I use that name, that’s what I’m referring to. This is a typical run of a single flow of a single table through Kyte. So at the beginning we have the source picking up the data using these connection templates, the way that we approach this is with, sort of packaged up connectors that we can then reuse, so rather than a separate pipeline or specific code to connect to a specific data source, we have a connector for something like Azure Blob storage, or a connector for database systems. Then we can reuse that connector with the metadata coming from those Delta Lake configs, in order to pick up the data from the right place, Without having a bunch of excess repurposed code. So those Delta Lake as metadata configs are really useful then because we can drive that entire pipeline using those as a source and not have to hard-code any of those values into the actual pipeline itself. And then beyond that connection templates step, the rest of the pipeline is then identical, and it doesn’t matter if we picked up a flat CSV or if we picked up a structure table from a database, that the ETL framework within Spark that we’ve built is then identical from the rest of the pipeline.
So, that’s what it looks like for a single table, but sometimes it may look a bit more like this, right. So, as I was mentioning about scalability, we have the ability to massively parallelize this, and we’re parallelizing this at both the kinda Spark level with the data transformations and the partitioning of the data as we go, using Sparks. But we’re also parallelizing it at another level on the driver in order to run separate sources at the same time, and run separate tables at the same time as well. So, using Spark and Databricks gives us that ability to make it as small or as big as we want.
So now that we’ve got that framework, and we’ve built that sort of framework in Databricks and in Spark, we can then go bigger than that, and we can build on top of that. So using the Databricks API, we’ve managed to kinda make things a lot more user friendly. And the first is this video that’s playing on the screen here, which is our custom dashboard for our ETL framework.
This is called control, it’s kind of a place for monitoring,
creating jobs, checking the status of our data in the lake. And all of this is completely hinged on using the extensive Databricks API, the library, if anyone hasn’t used it before is great. You can do just about anything you can do in the Databricks UI, and on the website, probably even more, I would say. And it can enable things like this where you can hook in to set runs off and receive data and view the conflict tables again. So it can allow us to build this kinda tool. And another really useful thing about, we use the Databricks API for is our testing. So being in Azure house we’re using Azure DevOps for our general source control and deployments. And we can tie in our testing of our Databricks pipelines directly into that because we can use the Databricks API. And so by specifying what size clusters we need for certain tests, because we can know that in advance and know what size they need to run on. We can build these tests into automatic continuous integration pipelines, as well as deployment pipelines to just go off and cool these runs and return results directly into that.
So then moving into a kinda specific part of the pipeline, just to understand how it works a little bit and a little bit so deep Delta Lake itself and parquet which is built on, have a schema evolution capability. But we’ve kinda added on top of that a way where we configure it up, so we have complete understanding of what schema revolutions are happening to the tables over time that we’re dealing with. So this slide is just gonna go through a little bit of detail around how we deal with that. So the day one, we’ve got this initial load, it’s just a set of columns with a set of data types, we detect that as a hitter, and we store it in our Delta Lake config metadata as our ground truth.
So day two rolls around, we get the same table for this next increment, and it’s got another column as we see here. So we compare this and we say, okay, that is the same schema except we’ve got a new column. Now, we know that Delta Lake can handle this and so we say, that’s absolutely fine. We make our ground truth, this new schema with the extra column. Day three comes and you can see that column one has now actually changed to an incompatible datatype. We’ve gone from an integer to date time, and we can’t reconcile that. So what happens here is that we compare this to our ground truth. It says, no, this isn’t gonna work. And we can error out with the correct error at this point in the pipeline so that we know what’s gone wrong. We can then go back to the source and get this fixed, because clearly something’s happened. Day four rolls around and again, we’ve dropped that new column that we have but the schema kinda lines up. What’s important here is that we’re comparing to the ground truth from day two, and not from day three, because we never put that in ’cause it never successfully processed. So we’re allowing the schema evolution, which is typically a real troublesome part. And we’re allowing it to be relatively lenient, and then utilizing the capability of Delta Lake to merge schema. – The way our pipelines are set up, means there’s a lot of dependencies between notebooks and as a team, there’s a lot of collaboration happening on this, as we’ve got kind of one source code for a single pipeline that runs hundreds of tables, have to be a bit careful with dependencies and overlaps. So we’ve kind of set up our own workflow for using Git with Databricks notebooks, and that’s built off some open source code.
That I’ll put a link in the chat to this on. So we manage all of our build and release pipelines and all of our our Git repositories through Azure DevOps, good integration, real easy to spin up. So we actually work in a really traditional Git workflow. We have a master branch, and then you feature branch off, I think how a lot of teams work. But what we do is, it rather than using the built in link between Databricks and Azure DevOps, we have a middle step, which is we manage our Git kinda on our local machine where you pull down the entire repositories, the entire project code, and then you can deploy that into your user workspace and Databricks and work there. So that lets you have a branch of the entire project rather than just a specific notebook, which I think is more works better with Azure Data Factory compared to orchestrating through Databricks. So having this defined way of working on a whole project level means that when you have those dependencies between notebooks, it doesn’t get mixed up, you can test them ’cause you’ve got your own projects going on. And it means checking in, there’s a lot less issues with people having merge conflicts and overwrites, so really, it just opens up the entire team being able to work on the same pipeline at the same time without breaking things in between.
– Sort of these next couple of slides, I’ll just explain a little bit more detail around how we’re working with those Delta Lakes as our config metadata and our approach to doing that. So first thing to talk about the benefits of using Delta Lake we get the asset transaction benefits, which means when we’re doing concurrent writes, this is thread safe. And this is super important, as I mentioned in the previous slide around how we’re parallelizing that driver process in our pipeline, we need the safety of being able to concurrently write to that conflict at the same time across multiple threads. Once we have that safety, what this means is that rather than having to spread out our configs into different locations, maybe per source or per table, we can just have a single master config table, I mean, one per environment, but really, there is a single master set of those configs and everything can interact with that one set. This is great, because it means, we all hope that things don’t go wrong. But if they do, it’s really easy to manually change, we don’t have to go and dig out where that individual config is. We have a whole process built around changing the conflicts. And there’s also initial setup in doing our pipeline, adding a new source or adding tables where we need to manually change and so having that one master set lets us just create a process for that rather than digging around in a complex nest of files. So we’re basically getting that traditional benefit of a kind of database system. We’ve got that ability to concurrently write and read and that ability to have a transaction log, but with zero extra infrastructure. So we don’t have the complexity of spinning up a server, whether that’s cloud or on-prem. It’s just files on a lake with that metadata layer that lets us do this. And then another added benefit on top of that is that since we’re using Databricks, we’ve got that really nice kind of interaction between Delta Lake and Databricks, where we can really visibly see that metadata layer, look at that transaction log directly, view the size of the files and everything like that. The other point then is versioning. Again, we hope that things don’t mess up. But if they ever do, it’s super easy to just use time travel on Delta lake and revert back to an old version if something does go wrong, rather than, again, using the example of many files dotted around and you’re overwriting those files over time because you can’t merge as such. That’s a lot harder to rollback on. So that’s a really useful benefit. And also the benefit of being able to not necessarily roll back but at least see and track that transaction log, who or what has caused the most recent write, merge or append and to be able to track changes like that.
So this diagram is kind of a bit of a visual on our approach. We have this need to keep our config sort of source controlled and the core data that lives inside those conflicts, even if it’s just the schema of that config itself, but it may include the manually inputted data as well, we want that source control and we want that to be able to interact with other humans. So we actually have that in JSON, and that JSON contains the data for the table itself, but it also contains the deployment instructions on how those tables get deployed into the Delta Lake config metadata. So maybe that’s just a simple overwrite take what’s in the JSON and override the Delta Lake tables. But it might be a more complex merge, where you only want to merge on certain columns and update certain other columns. So this JSON file is really easy to interact with as a person and we’re planning to surface that in our UI in the future. That gets Git that gets pushed into Git through a PR, and then Git will go off on a pipeline to go to deploy out into the delta side. So that goes and uses the Databricks API that I mentioned earlier to cool this notebook, this script that is our config deployer. Now that script will run through and on most runs and on the successful runs, it will run the overwrites, run the appends, run the merges into our actual Delta Lake config tables that the pipeline is interacting with. And at that point, we then need to go back to Git and update Git with that version of JSON. So there’s kind of a double master system going on. What the point there is, is that whatever sits in the Delta Lake tables is mirrored by what’s sitting in the JSON. So in cases where this doesn’t work and the deployment fails for whatever reason, maybe a human error in the JSON or maybe having two columns with the same ID or something, it will fail, that deployment will revert those Delta Lake tables, pre deployment as it were, and then also revert back Git to pre deployment. So in this way, whichever happens, whether the success or failure, the point is that we always end up with a mirroring between the JSON and the Delta Lake table.
So this slide, just to give a bit of kind of in depth detail if anyone’s interested. On the left is what the deployment part of that JSON file looks like. So at the very top you can see a very simple override for a key value table. And the kind of second half of that JSON shows what the more complex merge looks like. So it’s defining those merge rules, which columns were merging on and which columns were updating. And all of that is then taken forward and deployed in the right way by the script. On the right hand side, what we have is what a table definition looks like. So you can see that the schema is laid out, the identity columns, and then just a dummy row to show what that would look like, super human readable, really easy to understand, and really easy to add and change as well, without one off updates happening to those Delta Lake tables, we’re controlling it all in a sourced controlled manner by using JSON. – Think George has just spoken about kinda Delta Lake and how that’s good for kind of back end processes, but that’s also what we choose to store the data as for the analysts to access and use.
So that’s our kinda endpoint of the actual data that flows through all of this pipelines. And we tried out a few different things, but found that with our user base, it was definitely the both easiest and quickest. Part of that comes from being able to optimize and the kinda data we use, it’s quite big. Definitely needs Spark to hit it. And by partitioning on kinda common filters like dates, you can then just skip chunks of that. And we Z-order a lot as well by join keys and the performance aid on that is noticeable. (laughs) So what that means is when you’re doing a project using this data, and just a lot quicker to get through, and you don’t spend ages waiting for your queries to go, which is what used to happen on some of the more smaller SQL storage systems. We’ve also got that versioned data which isn’t only good for rolling back when stuff goes wrong our end it’s also good for analysts and researchers to use because you can recreate projects that have already been done on the exact same data set. So where non traditional databases is been updated and you lose that history, or you have to do is say, I wanna look at what it was a couple of months ago, and read in that, it means you can then validate those models, you can do comparisons between what we had before and what we’ve got now. And yeah, being able to track that history, sometimes when you’re looking at specific use cases is really something that you just can’t do on other systems. We’ve also found just being able to freeze a table that you’re working on without writing out your own copy of it, is very good. So saying, I’m starting a project but I don’t want the data to change underneath me as it gets updated. I will always start my project by reading in as of version six, and that is my project version. And then later, you can take that away and run it on a more up to date data set without having counts changed and stuff like that as you going. So we found that that’s definitely an analytical bonus, and then having the metadata accessible and visible is also very good being able to see when a table is last updated, being able to see the partition columns very easily means that the optimizations we’ve applied are accessible and usability, you can say, what’s this partitioned on? Okay, I’m gonna filter by that specific date, because that’s the one that will speed it up and that other people have targeted as a reliable date column. So we surface some of our tables through the HIVE metastore and Databricks using unmanaged tables, which, again, if you just wanna look at the data quick and do the exploration, or you just click on the data tab, and then you’ve got an example, you’ve got the schema, and you’ve got all of that metadata, like the data types and update history there, which means if you’re not sure which table you’re looking at, it’s quite easy to dig in and get a quick view on the wall. So that’s kind of how we surface everything to our users.
Everything we do is quite an action team is kind of for the advantages that come out of the projects that get done.
So I thought we just finish off with a few examples of the stuff that our data goes into. So I mentioned Waltham our Science Institute, really heavy research stuff going on, so just go on analysis, machine learning stuff.
So they’ve actually built an AI tool, which can predict kidney disease in cats up to two years before the like traditional tests that would be done in a vet. So this is called RenalTech, and it’s rolling out at the moment in the US through Banfield Pet Hospitals. And just being able to look at the amount of data that we have as a whole using Spark kinda opens up these kinda models, which can just really revolutionize how we diagnose pets and pick up on early early signals of common diagnosis.
We’ve also got the Pet Insight Project who are looking at pets activity behavior from these GPS trackers and pairing it with health issues which are recorded in the vet medical notes. And that’s another way to look at early detection of things. So the potential for it is you can pair up dogs behavior at home, like how far they’re running, kind of activity like that paired with common diagnosis, again, gives you a much deeper insight into the pet life. Which means the diagnosis on the other end is just the more you can base that on, the more accurate you’re likely to be. And then another one of the pet tech tools that we’ve got
is the Wisdom Panel, which looks at the genetic health of dogs. So if you can take a sample through Banfield for example, on their puppy plans, you can get a genetic sample done as part of just your pet health care routine. It then means that before they have any chance of developing something, you can be aware of what the most likely issues for that exact breeder. So if you have a certain breed type that’s likely to get diabetes, for example, that might be flagged to you to look out for the warning signs a lot earlier nearly catch stuff, the better it is for that pet. So really all of this data, the outcome is really building a better world for pets. And that’s the platform that our team is building. – Thanks, everyone for listening.
George graduated from University of Mancherster, UK with a BSc in Business Management. Working for Mars Petcare he has lead the construction of Kyte: An advanced Databricks & Spark built pipeline tool processing into Delta Lake on the Mars Petcare Data Platform.
Kirby graduated from University of Exeter, UK with a Masters in Physics. She now works for Mars Petcare, and has lead the design and implementation of the first Delta Lake engine on the Mars Petcare Data Platform.