At Northwestern Mutual, we are using Spark on Databricks to perform Extract Load Transform (ELT) workloads. We built a configuration-driven python framework that lands data from various source systems and transforms it using Databricks Delta SQL. The framework bakes in consistency, performance, and access control while allowing our developers to leverage their existing SQL skillsets. With this framework, our developers spend less time creating and configuring spark jobs with minimal code required.
The framework ingests a list of job items from a JSON configuration file, each with a command that generates a dataframe and a list of any number of destinations to write the dataframe to. These commands and destinations are specified by type in the configuration, accompanied by command-specific attributes and another file if required, like a SQL file. We can also ensure certain best-practices are followed using these configurable commands and destinations, such as ensuring we are securing PII data in our destinations, ensuring data is saved in the correct locations, and connecting to valid sources when we retrieve data for the environment the job is run in.
Our key focus for this session will be:
Fred Kimball: Hello, my name is Fred Kimball. I’m a software engineer at Northwestern Mutual. I’m here today with my colleague Josh.
Josh Reilly: Hi, I’m Josh Reilley. I’m a lead software engineer at Northwestern Mutual.
Fred Kimball: We’re here today to showcase our modern Config Driven ELT Framework that we’ve been using on several of our teams here at NM. If you’re not familiar with Northwestern Mutual, we’re a 160 plus-year-old company, Fortune 102 company that provides financial security to over 4.6 million people. Today, we’ll be discussing why we chose to go this route and designing and creating a Config Driven ELT Framework. We’ll be going through our basic architecture and how the different components interact with each other, along with our existing and new infrastructure. We’ll be discussing our different configuration options that we’ve developed and how they interact with each other and how we can use them together. And lastly, we’ll be talking about our security and consistency and data governance features that we’ve baked into this framework. So as we began designing this framework, there were several things that we had to keep in mind.
One of those things was, we have hundreds of tables and jobs to migrate from our old tooling to this new Spark and Databricks environment. And we have a lot of team members and a lot of business deliverables to hit. So we can’t necessarily all drop everything and learn all the nitty-gritty details of Spark and Databricks and Delta and all these different, new, exciting tools that we have at our disposal. So we really tried to think of a way that we can make this framework something easy to use and easy to learn for the whole team and even other teams eventually given their current skill set, which was mostly SQL and other BI tools.
In addition to speed, we wanted to enforce some of our best practices, both put in place by us and some suggestions from Databricks. And we also wanted to put in place some security and data governance features to make sure that we’re following all of our enterprise standards as we land data in this new environment. Lastly, the configuration files allow us to track changes more easily. These changes, we can commit them to our source control and easily see which changes are being made. Each table essentially has its own config file. So if we see that one file is changing, we know that it’s changing a very specific table. Another benefit to using a config Framework like this is it reduces the testing surface, so we can test the Python Framework by itself. And the development teams really only have to test their SQL queries and the config file.
And that really speeds up our development process. Another benefit is that it’s reusable by many different teams, so different teams across NM have been using this, and it’s not tailored to one specific team and it allows us to scale to the enterprise level. So let’s take a look at our basic architecture diagram. In the upper left of the diagram, you can see the ELT config box. This is our JSON configuration file. It consists of one to many job items. And a job item is basically a package of work for the framework to ingest and understand and execute. Within a job item, there is a single execCommand and that execCommand executes some command. And then lastly, there’s a list of destinations that are optional. So when the framework on the right side of the screen reads in this ELT config file, the job item reads in the config, it sends that config to the execCommand and that execCommand usually reads data from some source. So, that source system can be a relational database. It can be a web API, or it can even be our Delta Lake.
So, once we get that data back from a query we ran or some other command. That data frame gets sent back to the job item and then the job item sends that data frame to a destination object and that destination interprets which format of destination it should be. And any other options that need to be applied on that destination. And it writes the data to the same location, whether that be a Delta table in our Delta Lake or a relational database or something else. And on that note, I’ll send it over to Josh and he’ll walk through some of our configuration options we have in place.
Josh Reilly: Thank you, Fred. As Fred had mentioned, we are going to take a look at the config options now. This framework was designed so that it uses these interfaces for the execCommand and the destinations. So that developers that work on the framework can extend it with whatever commands and destinations that they can think of. That way we can fit all of our possible use cases. For execCommands I’m going to walk us through what some of these commands are. We have SQL execCommands. This is probably our most popular one. SQL execCommands always run a SQL command or SQL statement. And they can do that either in Databricks Delta, or they could run that against a source database and all execCommands needed to return a data frame. So SQL exact match will just return the data that it gets from the query as a data frame. File read execCommand.
So this, this execCommand can read CSV files. It can read JSON files and when it reads them, it’ll turn them into a data frame and return it to the framework. API read execCommands. So this is for web APIs. It’s a programmatic way for the framework to call out to these endpoints, probably get endpoint, pull back some JSON data, turn it into a data frame and return that to the framework. XML read execCommands. These are only for XML, they’re kind of like file read execCommands, but XML needs to have Schema applied so that it reads it into the correct format. Get incremental stats lower bounds command, and the get incremental stats upper bounds commands. So these are, these are helper commands that we created for use with incremental loads. We apply them as kind of like a parameter to SQL execCommands so that the SQL is that command.
If it’s reading data from a source system, it can know what the lower bound limit and the upper bound limit needs to be. And that upper bound and lower bound are then stored in our stats table. So that’s kind of how that is track between loads for incremental loads. Custom Python command. So the custom Python module command enables developers to create their own Python modules, which could be installed via pip or however they want to get those Python modules onto the cluster. And then they can pass along some config as part of our normal job item config file to this custom Python module, have it run. And then it just needs to adhere to the execCommand interface where it needs to return a data frame as well. So all these execCommands, they need to return data frames, and then we pass them to our destinations and we can have zero to many destinations in our job item.
Some of our destinations include Delta destinations. So Delta destinations are writing out to a file location. We mount S3 buckets to our cluster and write to them like a file location. And then the Delta destination, we’ll also save it as table JDBC destination. So if let’s say we have landed some data in our data lake and we’ve manipulated, it created some transformations, then we can use a JDBC destination to read from this manipulated table and send it to another database like RDS, MySQL or something like that. Temporary view destination. So this destination is kind of a way to make a more advanced config file that has multiple job items in it. Where maybe a one job item we’ll read some data and then save it in a temporary view. And then subsequent job items can read from that temporary view and save it in some other destination.
So that’s a way of kind of adding some more programmability to your config and letting the developer break it down into a more simpler problem. Parquet destination. Real similar to Delta destination. This is just writing to a file location in the Parquet format. And CSV destination. This is also writing to a file location, but then using a CSV format. All right, let’s take a look at what a typical landing config looks like. So here on the left-hand side, we have our config file and it is an array of JSON objects. The JSON object is a job item. There’s only one in this config and each job item needs to have an exec and a list of destinations. So this job item has a SQL execCommand. And then this SQL execCommand is running a query my table, raw.SQL, which is a file on the same [inaudible].
I have that on the right-hand side, you can see it’s just normal SQL. There’s also some options that we’ve configured into the SQL execCommand. It has the partition column and the number of partitions options, which means that this SQL execCommand is going to do our partition greed. And I can tell that this is also a SQL execCommand. A SQL execCommand is going to run against a source database rather than Databricks Delta because it doesn’t have the source set to Delta. So this will use the ID column to partition on and it’ll run eight partitions on its query using JDBC. So the SQL execCommand will produce a data frame from that query and the framework will pass it into the list of destinations. We only have one destination. It’s the Delta destination, which means that it’s going to save that data frame as a Delta table.
And we have configured the encrypted columns. So the framework will go through and replace the values for name and email, in this case with encrypted strings. Okay, let’s go ahead and look at what a multiple job item could fix. So when we walked through what the various destinations were, I had mentioned that there was a temporary XE destination. This config is leveraging that it has two job items in it. So the top job item is a SQL execCommand. And this one runs against Delta. It runs the query new records, raw.SQL query, which is on the right-hand side and that’ll produce a data frame. And the framework will save that in a temporary view as one of the destinations. And that temporary view is named my table new records. So this will let us have subsequent job items we’ll be able to read from that temporary view.
The second job item is also a SQL execCommand that runs against Delta and it runs the query. My transform table raw.SQL, which is on the right-hand side. And we can see that it is reading from the temporary view and then it is joining with another table. So the temporary view has given us a way to be… Do some more like programming with just config. Then that that data frame is then passed into a Delta destination, which is saved to Databricks Delta. All right, Fred, back to you to talk about the security and consistency features in this framework.
Fred Kimball: Alright. Lastly, as we are migrating these hundreds of configs and tables and jobs to this new framework, we need to make sure that our sensitive data is secure, and that our tables are created in such a way that it’s consistent. And that’s important because we want to be able to quickly be able to identify where issues are and if everything is standardized and the same, then it’s easy to pinpoint those issues. So on top of the security that we have in place for accessing our environment, our resources and the security that our cloud providers provide, we wanted to add some extra security around our PII and PHI data columns in our tables. And how we do that is our framework is able to retrieve which information on which columns are encrypted from our table metadata in our Delta Lake. And it goes in and encrypts all that data as it pulls it in from our source systems.
So the data is never saved plain text in our environment once we bring it in. And then once we have to a business need to read that data, those users with that business need, or our jobs will have access to decrypt that data using role-based access. If you’re curious on how we do some of that, we do have a Databricks blog article written with another colleague it’s called Enforcing Column-Level Encryption and Avoiding Data Duplication with PII. You can go check out how we do that.
Along with securing individual columns, we are also able to ensure that data is written out to the correct mounted-storage locations using the framework. And we also have our roles on our clusters set up correctly so that whatever cluster you have access to, that cluster only has access to the buckets that you need access to. And these locations are standardized based on which source we’re pulling the data from, or which data, or which database we’re landing the data in if it’s a transformed data table. And that makes it easy to pinpoint issues when we go to check individual files or file locations.
So when the framework makes a connection outside of Databricks, we need to have a credentials and a connection information. So the connection information is set via connection config file, and that’s placed in our source control, which makes it easy to track changes to it. It goes through all of our emerge requests, approvals all of our CIC. So any changes to that connection information is tracked. And the credentials for those data sources are controlled through Databricks Secrets. And those Databricks Secrets only allow access based on your role. And most users will not have access to those directly and they’ll have access via being able to trigger a framework job, but only the framework job running as a generic user will have access to those secrets.
So it reduces the risk that users are bring in data from some external source that you’re not aware of. So with the help of this framework, we’ve been able to meet the needs of our business faster than ever. And it’s in a secure and consistent way. Hopefully this session has given you something to think about if you’re thinking of creating your own or have your own configure based framework. Thank you for joining the session and please give some feedback if you can.
Fred Kimball is a Software Engineer at Northwestern Mutual. His responsibilities include building, maintaining, and securing data infrastructure, creating automated build and deployment pipelines, and...
Josh Reilly is a Lead Software Engineer at Northwestern Mutual. His role is to provide architectural direction as well as enable his teams to be successful through mentoring and the creation of librar...