Code Once Use Often with Declarative Data Pipelines

May 26, 2021 03:15 PM (PT)

Download Slides

Did you know 160,000,000,000 pounds of food ends up in North American landfills each year? Flashfood is helping reduce food waste by providing a mobile marketplace where grocers can sell food nearing its best before date. In 2020 alone Flashfood diverted 11.2 million pounds of food waste while saving shoppers 29 million dollars on groceries.

To operate and optimize the marketplace, Flashfood ingests, processes, and surfaces a wide variety of data from the core application, partners, and external sources. As the volume, variety and velocity of sources and sinks proliferate, the complexity of scheduling and maintaining jobs increases in tandem. We noticed this complexity largely stemmed from different implementations of core ETL mechanics, rather than business logic itself.

We’ve implemented declarative data pipelines following a mantra of ‘code once use often’ to solve for this complexity. We started by building a highly configurable Apache Spark application which is initialized with details of the source, file type, transformation, load destination, etc. We then used Airflow to extend on the DatabricksRunSubmitOperator which allowed us to customize the cluster and parameters used in execution. Finally, we used airflow-declartive to generate DAGs in YAML, enabling us to set configurations, instantiate jobs, and orchestrate execution in a human readable file.

The declarative nature means less specialized personnel are able to set up an ETL with confidence, no longer requiring a deep knowledge of Apache Spark intricacies. Additionally, by ensuring that boilerplate logic was only implemented once, we reduced maintenance and increased delivery speed by 80%.


In this session watch:
Anthony Awuley, Machine Learning Lead, Flashfood
Carter Kilgour, Data Engineer, Flashfood



Anthony: Hello everyone. I’m Anthony and I’m here with my colleague, Carter. So today we’ll be doing a presentation on Code Once, Use Often Declarative Data Pipelines.
In the agenda for today, I would first go through Flashfood, what we do as a company, and then go to a problem we faced in our ETL pipeline design, and then a solution we adopted that is the Declarative Pipeline. We will then move on and talk about a couple of examples, follow up with lessons learned and then do a brief talk on Spark YAML.
So, food waste. Food waste is a huge problem in the world. So it is estimated at about 160 billion pounds of food in North America end up in a landfill each year. Food waste makes up at least six percent of all greenhouse gas emissions globally, emitting more than three billion tons of CO2 each year.
So according to a report done by National Geographic, if international food waste were a country, it would be the 10th leading cause of greenhouse gas emissions behind the US and China.
According to a report done by US Department of Agriculture, in the US alone, about 30 to 40% of the food supply ends up in a landfill annually. In Canada, about 58% of all food produced goes to waste annually. So in total, the value of all food that is lost or wasted in Canada is a staggering 49 billion dollars. This amount of food is enough to feed every Canadian for five months. Meanwhile, at the same time, 10.5% of US households were food insecure at some point during 2019.
So this is where Flashfood comes in. Flashfood is basically a marketplace for food nearing expiry, where we give grocers the ability to recover costs on shrink. We also provide them with the ability to reduce their carbon footprint while at the same time feeding more families affordably with fresh food. So in 2020 alone, we diverted about 11.2 million pounds of food from the landfill, thereby saving shoppers $29 million on groceries.
So at Flashfood, data helps shape our future and actions. So on the product front, we use our data to power our mobile and web platforms and on the analytics and data drives our decisions, as well as business intelligence. On a data science front, there are a lot of very interesting projects going on around recommendation system, fraud detection, dynamic pricing, et cetera.
At the center of data engineering in Flashfood is Spark. So Apache Spark, for those of you who are not familiar with Spark, Apache Spark is a unified analytics engine for big data processing with built-in models for streaming SQL, machine learning, and graph processing. Spark basically enables us to do data processing at scale, at lightning speed. We were able to achieve about 80% performance improvement in our ETL pipeline when we migrated to Spark.
We are able to provision Spark clusters on demand, perform distributed computing and scale horizontally. Spark also gives us the ability to connect to multiple data lakes and warehouses. It supports reading different varieties of data, structured or unstructured data. And we also are able to use Spark for either batch or real-time data processing. So it also enables us to process relational data like operational databases and data from line of business applications and non-relational data like mobile apps, social media. So at Flashfood, Spark powers our feature engineering, feature selection, ETL, machine learning, and all analytics workload.
So, partners are key to our business at Flashfood. We are flexible on how we integrate and manage their data. Our partners provide us with varying volumes of data. So we have examples of some partners providing us with terabytes of data, others providing us with gigabytes of data and megabytes. So data comes in at different volumes and this data also varies in shape, dimentionality, velocity, and delivery. Some of our partners have cloud provider restrictions. So a partner might not want to work with a specific cloud provider. So in that case, we try not to use any of that cloud provider’s services within the ETL pipeline for that partner. This means we have to be able to use any data lake during our intermediate data processing steps.
So this is what we are aiming to achieve. So the questions we are asking is given that we are processing data of different varieties, of different volumes from different sources and we have restrictions coming from some of our partners regarding the data lake we are able to use. The question we were asking ourselves is how do we ingest data of any volume, real-time or batch at any variety structured or unstructured from any source at all, use any data lake as intermediate data store, load data to any destination? Given that we are ingesting data from hundreds of partners with a possibility of growing into millions as we expand across the globe, we needed a way to quickly deploy an ETL job with minimal effort and Spark domain knowledge. So this is what led us into the proposed solution we will be looking into in this presentation. So to continue, my colleague, Carter, will be taking over from here.

Carter: Thanks, Anthony. So yeah, so kind of a breakdown of our problem statement is how can we quickly create and easily maintain a growing number of pipelines? So, we’ve made a few different attempts at this to kind of get to our proposed solution, which we’re going to come up on. But first off, just to explain what we’re looking at. So in this case, we’re sinking over an operational database into Delta. And you can picture this a lot bigger scale if you have multiple microservices, multiple external systems going into Delta but also between Delta and out to external sources. So between Delta into a feature store or a silver or gold table and out to an external system for maybe an always on MPP. In that case, also a data mart where you could really serve out product. But for this case in particular, we’re going to break it down just so we have a clear kind of description and it fits in the time.
So for this case, we’re looking at bringing in an operational database into Delta Lake. And with this example, in our first attempt, we kind of broke it down into not enough automation, which can also be kind of considered as not enough standardization or normalization between pipelines. So almost a bit of a wild west in the code base. So in this case, we have a different job for each table. And with these jobs, they might have some shared libraries as well. And this has some pros in the sense that for instance, maybe one table is really large and it has an updated app field, so you can do that incrementally, but another table doesn’t have that field, so you have to kind of overwrite the table each time just to make sure it’s up to date, but there’s also the case where maybe one of the tables has PII and you want to be able to actually just encrypt those fields.
So to each table, although it kind of has the same goal of bringing in an operational database to a Delta Lake, there is some differences between them that kind of lead to this solution, but on the downside of it, this can kind of quickly get out of control. There’s very little standards between each pipeline. It’s hard to actually really find out what the dependencies are between them. And it gets growing become more and more difficult to actually maintain because you don’t know which wires you can cut and which ones you can keep.
So the second attempt we did was on kind of the flip side of that was doing too much automation or standardization across our pipelines. So in this case, looking at an operational database and thinking into Delta, we have this magic sink all tables job, which is going to automatically figure out what needs to be done for each table. So it’s going to figure out, is there an updated app field so I can observe and do that incrementally? Is there any fields with the word name in it that I want to encrypt? And this usually works really well the first time you go through it with the set of pipelines that you’ve planned these edge cases for, but it can be pretty hard to reason with and maintain as it goes on, especially adding new tables, or even if you want it to hand this off to someone, it’s pretty hard to figure out what’s going on behind the hood.
The next thing that kind of makes it difficult is being able to pass it on to somebody else. And the other thing of that is it can cause issues that you don’t expect. So it’s making decisions that you haven’t explicitly told it to. And if you add a new table and something’s acting weird, it’s actually kind of hard to debug.
So just kind of a summary on that is it turns into a bit of a house of cards where it’s doing things and with the littlest change or the slightest change in the source, it can actually have some bad consequences and blow up. So in this case, we’re seeing that it’s going to crash and it’s going to cause issues down the line.
So just a quick summary on both of these sides. So on the one side, it’s difficult to maintain. You’re spending a lot of time on boiler plate logic, and it requires quite a bit of Spark knowledge, but on the other side, inferred values are going to cause unexpected behaviors. It can be a lazy solution to problems. So for instance, if you’re going into Redshift and it doesn’t like nested fields and you just automatically flatten them, sure that works, but now your RedShift’s going to have columns that you didn’t explicitly know were going to be there. And it can just cause confusion, because you really want to actually think about, do you want those columns in there, which ones you want, rather than kind of taking the easy way out with this solution.
So that’s kind of where we came on our solution and it really is a combination of these two different implementations. So for this, we have our YAML based Airflow DAGs mixed with a configuration based Spark application. So in this case, we’re actually going to pull out different parts of each of the ones we previously talked about. So in the Databricks side, we kind of have that one job that’s going to handle these configurations and then in Airflow, we’re actually going to specify what these configurations would look like. So rather than having the different implementations in Spark, we’re actually just going to pass them in and let the job handle them accordingly.
So what this looks like as you go down the configurations is it will actually just do different things based on what’s passed in and how to handle that. So the configuration will be able to do things like managing incremental tables. It’ll be able to perform transformations like encryption and with Airflow on top of it as well is it doesn’t have to be a linear one configuration runs after another. You could set any dependencies between them and you can also run them all at the same time on different Spark clusters, so it works out well there.
And kind of just a quick summary on why configurations, so configurations, they create, and especially in AML, they create this nice file that really acts as a contract between the source and sync. So you know exactly where data is coming from, where it’s going, how often it’s running and the transformations that happen in between.
Another thing is it kind of forces dry principals for similar jobs. So to actually go and make change to the Spark application, it’s a bit less of a wild west and you kind of have to think about what you’re adding in and then how that’s affecting the system. So it kind of makes you double think when we’re using code and then finally it lets you manually or programmatically on new jobs. So as we’ll see in the YAML instance, it makes it really easy to add those new configurations, but when breaking down a YAML, it can really be turned into a [inaudible] pretty quickly. So on the other side, you could only use the Spark application and pass in those configurations programmatically.
And just a quick touch on that is that programmatically, it also really leads into being able to do what Anthony said of even just adding a UI that allows our ETLs to work. So as we scale, that’ll work across it and we can make our own version of a stitch or a [inaudible] that’ll make that happen.
So the actual implementation of this kind of begins with Airflow Declarative. So Airflow Declarative’s an open-source project that lets us write our Airflow DAGs in YAML syntax. So we see in this case, we have the DAG of the simple DAG and we have the task of a hello world and in there we can use any Airflow operators. And the nice thing about this is, again, it’s kind of enforcing those coding standards, where you’re not trying to cheat different things in your Airflow script for generating them and it’s clear to actually go in and look.
So kind of the first example of our solution is this is what our sink table job that we looked in the attempt three would look like in its file. So in this case, we broke out the DAG definition, which is going to do the scheduling. And we’re just focusing on one task in that Airflow DAG. So for this task, the first line we’re looking at class and the class is going to be our PTL job operator, which is really just going to extend the Databricks run submit operator and pass in some parameters to it.
The main class name is going to be the one in our Spark function and that one could even be extracted into the operator itself. And then we were able to actually type what type of compute we want, how many nodes and really any other settings, if you wanted to add libraries there, that’s very flexible.
And then kind of the big thing that gets passed down and the core of our application is the configuration based Spark application. So this configuration is going to get turned into a JSON string and passed in, but first we can break down kind of what’s in it. So first when it gets passed in the application, we see the extract piece. So in the extract piece, we have the connection type which can be Mongo, Redshift, SQL, and the required parameters will actually reply accordingly for that.
So in this case, we have a Mongo. We’re looking at a certain database. We’re getting the URI from our secrets and we select a certain table. And then the next ones are actually going to be optional parameters. So the schema, if you don’t enter that, we can do it as a first schema. And then the incremental, for instance, if there isn’t an incremental field that you can use, you can just leave that out and it’ll take the whole table.
In the transformation, we can put in different things and what’s really nice here is that you can use a lot of the same underlying logic, but just change what transformations are done based on the table. So for this case, we’re going to encrypt some PII fields. We’re going to flatten out the location and we’re going to extract the latitude and longitude from the location. But you can picture if there is another Mongo connection that didn’t have any of these restrictions, you could just leave the transformation blank, or you could add other things to handle those cases accordingly. And it really doesn’t require any changes to the Spark, but you’re really just looking at it from a high level.
And then finally in load, in this case, we’re putting the Delta, but it could be any other system. And we can add some fields to, in this case upsert, but you could easily change the mode to a slowly changing dimensions and have it work the same way. So you’re keeping a lot of consistency across your code, but you’re also leaving a bit of flexibility to handle the differences that come in different tables.
And we’ll go over this part quickly just for the essence of time, but in terms of a PTL or our custom operator that goes down to Airflow is it’s actually going to extend on the Databricks submit run operator and this works really natively out of Airflow. So in this case, we’re able to select our instance pool from Airflow variables based on the type of cluster passed in. And then we’re able to use the minimax workers that are passed, as well as some init scripts that you don’t really want to do every single time you have to submit this job. And then at the bottom, we can see that our configurations is actually getting passed in as a parameter to that job. And that’s actually going get parsed out in our Spark application. And at the end, you can see that it’s calling in the Databricks run [inaudible]. So with that, I’ll be passing it off to Anthony to continue on with some more examples.

Anthony: Yeah, thanks Carter. So we are working towards reducing code change when integrating with a new partner or [inaudible] services. So we have declarative specification of job parameters as well as ETL application configuration parameters. So we want to be able to support reading of multiple varieties of data structured or unstructured from multiple data lakes or data warehouses. We also want to be able to infect a schema of these files that we are loading. And the main goal here is to be able to represent all of these concepts within a declarative specification.
So for data lakes that are natively supported on Spark, we are able to mount and read data locally from them. In this example, the storage container shown here represents a data lake. So we have other attribute definitions that represent data from other sources like data warehouses, relational databases, et cetera.
In this particular example shown on the right, we are loading data from two separate data lakes and combining them into one data frame. So as you can see, we have two different file types and each of those file types belong to a separate data lake container. At a transformation stage, we were also able to do the same thing basically, so we can apply any form of transformation to the data that is ingested. So we have some example definitions on the right, like we are able to rename columns, drop columns, export columns, or even applies from custom UDF functions to our data frame.
And then finally at the load stage for each stage of our transformation, bronze, silver, gold, we are basically able to determine or apply a configuration parameter for loading data into any destination at all. So in this particular example, we have MongoDB and we have blob storage. So basically at the bronze stage of our transformation, we will be writing data to both MongoDB and blob storage. So to continue with a summary on all the work we did, I’ll be handing it over to my colleague, Carter.

Carter: Thanks, Anthony. Yeah. So just a quick summary on what we put together and the core components, so the first aspect is it’s configuration based, and this is really beneficial because it separates business logic from the underlying application logic. The next core piece is Databricks and this supports native support across multiple clouds, which was one of the big things for us and a scalable processing, so we can handle data both small and big, and it has reliable connectors for external third party sources that are actually up kept in the Databricks community.
And finally, we have Airflow, which has given us extensible orchestration, even just being able to take an open source package and turn DAGs into YAML, which you wouldn’t be able to do with a lockdown tool. The next thing is it has community operators for other things that don’t fit into this configuration. And it also has compute for small data jobs, so you can throw it on an Airflow Kubernetes operator and hit APIs with it to get it into the data lake and then use the configuration based application to then bring it into Delta and bring it downstream.
So just some quick results on this. We’ve reduced maintenance overhead by kind of bringing all our jobs into one place and having some standards between them. We’ve democratized the ability to create like jobs, so if you want to sync a new Mongo database over, it’s actually pretty intuitive to look at something similar and just tweak the parameters to what you want. And finally, it’s really improved readability and coding standards. So in that YAML DAG, you can really see what’s meant to happen and what’s actually getting passed in for it rather than kind of being a bit abstracted, either in the code itself or only having parts of the information in your orchestrator.
So some lessons learned are, is favoring parameters over inference, so it’s better to actually specify what it’s going to do. So we talked a bit earlier about maybe having the schema when it’s not there automatically doing that, but we’ve actually been pushing lately to not have any default parameters and make it so you have to enter them, so that way you’re not getting any side effects you’re not expecting.
And then the next one is we’re using code for extract and load. And this is a common theme across a lot of talks, but really there’s only so many ways you can incrementally process from one table to another, whether you’re doing a full ELT approach, adding incremental partition overwrite, there’s only a handful, so you can really standardize those under the hood and then just reference them.
Finally, instance pools are important. So especially with this implementation, you have a new cluster spinning up for every task for every ETL, and in that case, the four minute startup time can really put a bit of a damper on things, especially when jobs don’t take that long to run. So with instance pools, you can kind of correct for that.
And some challenges ahead we have are how much to generalize this config. So we’ve done it a few different ways, even wherever the first ones was hyper specific to a certain group of pipelines, but we’ve also looked at it the other way and we’ll touch on that shortly is it could just be Spark code. So what’s the happy medium of generalizing this config, so it can fit most pipelines without too much modification.
And the next one is programmatically adding new configurations. So if we put a UI on it and it had a few core required fields in the few optional fields, then we use that configuration to then schedule the job. And we’re wondering kind of what that would look like. Do we even take it all the way to YAML and Airflow, or do we have our own kind of service running on top of these JSONs.
The next thing is a grammar parser for simple function definition YAML. So this kind of leads into the last one as well, but being able to just define a simple function in YAML itself without having to go and change the application code. Checking the YAML validity at the source. So we’re a bit unsure about this. So being able to check in Airflow if your YAML’s valid before waiting for your cluster to start up and try it out would be helpful. And also maybe that could look like a front end that just has a different window into entering these. That’s something we still have to figure out.
And the last thing, and this one’s kind of came out organically as we tried to really nail down what this configuration should look like is we have SparkR, PySpark and Spark SQL, but could Spark YAML be a thing?
And in terms of Spark YAML, this is just in a concept phase right now, but could it look something like this, where you have your orchestration and your execution logic in the same place, and you can handle simple functions, especially with Spark, you have a lot of parameter heavy functions. So you’re there with the passing in dot option dot option dot option or passing in a dictionary of options, but doing that in your code, it becomes a bit unclear because you’re not doing actually many lines of code, but they’re very parameter heavy. So this is just a nicer way to actually write those parameters.
And the other point is just combine the orchestration with execution. So in one place, you know what’s happening, you know when it’s running and you can really keep an eye on it. So our thoughts around this would be something similar to what PySpark is to Scala, but having it with the YAML interface that kind of writes down to Scala code.
Yeah. So we’d be happy to talk to anybody about this concerns or thoughts on it. And we know everybody has their architectures at home and actually go in and make big changes to it can be quite time-consuming and it really might not be prioritized. And with that in mind, we just want to leave a few core principles that we’re kind of taking in mind, whether we write our pipelines here or in another way in the future.
So we’re actually going to look at it in a literary sense. And Garrison Keillor said, “A young writer is easily tempted by the elusive and ethereal and ironic and reflective, but the declarative is at the bottom of most good writing.” So in here he says kind of five key words, four of them being things not to do, and one of them being something to do. So we actually flipped those first four words into their antonym to really build some core principles into what you’re supposed to do in your pipelines.
So we broke those down into Keillor’s Principles, and the first one we’re talking about is explicit. And we’ve touched on this quite a bit in the presentation, but the execution of your pipelines and the settings and variables that they use should be explicitly stated. So if it’s going to be upserting, it should have that. If it’s going to infer schema, you should include that in there. And it really shouldn’t be guessing at what it should be doing while it’s running, because that’s leaving in room for error.
And next is indelicate, so the system should extend without breaking. So especially if you have something like the magic sync all table job, but you want to be able to add on new configurations, new transformations without breaking anything under the hood. And this could just look like having the default of that configuration set to what is already existing, and then having a new thing that you can switch on.
The next one’s logical, so behaviors should do exactly as stated, and this means no side effects in your systems. So if you’re writing it to a different place as a backup in Delta, that should be stated, and it shouldn’t just be assumed that the person that’s going to come in and take over this pipeline knows that that’s there because then your jobs could be running longer and you don’t actually know why, and then suddenly somebody realizes you have a full backup in blob or S3.
The next one is simplistic, so jobs should make limited decisions and fail quickly. So your job shouldn’t, for instance, it gets a bad string column and it’s supposed to be an integer and it gets a string. It really shouldn’t just try to automatically change that column. It should either put the record somewhere and let you know or just fail altogether and let you know, because then you’re really saving yourself from downstream errors and you’re getting to the root of the problem a lot earlier.
And the final one, kind of the focus of the talk, was pipeline should be declarative. So from the orchestrator, it should be clear in function and execution and what those pipelines should be doing. So you should be able to look in one place and see what am I expecting on the other end?
Yeah. Thanks for everybody for listening to our talk. Special thanks to the [inaudible] Flashfood for giving us the time to put this together and for Tori and Allie on the Databricks side for all the support along the way. Have a nice rest of the conference.

Anthony Awuley

I hold an MSc in computer science from Brock University in Ontario, Canada with a research area in Genetic Algorithms. I co-authored an IEEE CEC publication titled "Feature Selection And Classificatio...
Read more

Carter Kilgour

Carter Kilgour is an experienced data engineer with a demonstrated history of architecting and creating data intensive software. He has worked across various verticals developing data platforms to emp...
Read more