Modularized ETL Writing with Apache Spark

May 27, 2021 03:50 PM (PT)

Download Slides

Apache Spark has been an integral part of Stitch Fix’s compute infrastructure. Over the past five years, it has become our de facto standard for most ETL and heavy data processing needs and expanded our capabilities in the Data Warehouse.

Since all our writes to the Data Warehouse are through Apache Spark, we took advantage of that to add more modules that supplement ETL writing. Config driven and purposeful, these modules perform tasks onto a Spark Dataframe meant for a destination Hive table.

These are organized as a sequence of transformations on the Apache Spark dataframe prior to being written to the table.These include a process of journalizing. It is a process which helps maintain a non-duplicated historical record of mutable data associated with different parts of our business.

Data quality, another such module, is enabled on the fly using Apache Spark. Using Apache Spark we calculate metrics and have an adjacent service to help run quality tests for a table on the incoming data.

And finally, we cleanse data based on provided configurations, validate and write data into the warehouse. We have an internal versioning strategy in the Data Warehouse that allows us to know the difference between new and old data for a table.

Having these modules at the time of writing data allows cleaning, validation and testing of data prior to entering the Data Warehouse thus relieving us, programmatically, of most of the data problems. This talk focuses on ETL writing in Stitch Fix and describes these modules that help our Data Scientists on a daily basis.

In this session watch:
Neelesh Salian, Developer, Stitch Fix



Neelesh: Hi, this is Neelesh and I am a software engineer at Stitch Fix. I want to talk to you about ETL writing and the modularized version we use with Spark at Stitch Fix. A little bit about myself, I’m a software engineer in the data platform team at Stitch Fix, where I work with things like, HIVE and Apache Spark. I come from Cloudera where I used to work at Spark and MapReduce. And I’ve been a contributor in the Apache software foundation for a little over a few years now. Here’s what we’re going to go towards during this talk, I want to talk to you about what is Stitch Fix, a little bit about the company. What does Apache Spark look like in Stitch Fix and how do we use it. The Ryder modules that help us do additional functionality, in addition to writing to the data warehouse, and finally ending off with the learnings and the future work that’s planned in this area.
So a little bit about Stitch Fix. It os a personalized styling service. We have two avenues of the business. One is getting five handpick items sent to you and you keep what you like and you send back what do you don’t like. And the other one is a personalized, curated store that’s made for you and you check out whatever you like. Behind the scenes, data sciences, everything we do. A Ll of the algorithms org that I’m a part of is consistent of 145 data scientists and platform engineers. They’re split into three main verticals with a platform sitting horizontal. So you have merch, client, styling, and the algorithms platform setting sort of common to all of them to check out the Algorithms Tour we’ll understand how data science works in Stitch Fix.
So jumping right to Apache Spark and how do we use it. So a bit history and the current state. So when we started off in 2016, it was introduced to enhance and scale ETL capabilities with the starting version being 1.2 X and Spark SQL was the Dominic use case that we started off back then and we read and written data into the warehouse as hive tables, which continues to be the extraction we use. And shipped over to the current state. We use two, four Xs, the main version and we’re prototyping the three, one X builds so that we can try out the new versions and Spark is now used for all ETL reads and writes, production, and test. And serves regular PI, Spark, SQL scholar, jobs, notebooks, and [inaudible 00:02:23] based readers and writers and forms a sort of backend for all of these. And it controls all the writing into the data warehouse with additional functionality, which I’m going to talk about during this talk.
So some of the tooling that’s available for data scientists that we built out for reading and writing data, I mentioned briefly Spark SQL, there’s PySpark and Scala jobs, essentially containerized Spark drivers with EMR for a compute. These are production and staging ETLs used by data scientists. There’s notebooks, which are essentially a Jupyterhub set up with Stitch Fix libraries and Python packages pre-installed used by data scientists of course, to test and prototype. And we have a Pandas-based Readers and Writers, which allows you to read and write data using the Pandas abstracter. And in this case, there’s no bootstrap time for Spark jobs because we use Apache Livy for the execution. So essentially it’s pretty quick to run jobs on this platform. And of course it’s used for tests and production.
So pictorially, this is what it looks like. You have three avenues of using Spark, but the commonality between them is the writing library that’ll be used to allow to write to the data warehouse, which includes the three modules that I’m going to speak of, including validation and writing. And the data warehouse is a combination of Amazon S3 and high meta store for metadata. So back then, and even today, a writing goes through these three steps versus validation. It checks the data frames and she to match the type sort of schemas, or any other checking for types that are overflow within the HIVE table. Once all that is passed, it goes to writing the data into files, into S3 would be parquet or text-based on the hive tables configuration. And then we finally update the HIVE Metastore with versioning schema that we use internally for data.
So this is how it looks like writer library validates, writes S3. If that succeeds, we up date the Metastore to say, Hey, these are the new updates to the table. Data versioning is a bit unique in our world. We use any POC timestamp to indicate the rights made to a table, whether it be partitioned tables or non partitions. So let’s take an example of partition table with the date column. So for a while, you have the fifth 27th would be that sub-directory with a batch ID timestamp to indicate the freshness of that data. And the same goes for non partition, but just excluding the date part of the column of the partition as you compare it with the partition table, we also add a timestamp and the metadata. So that we indicate when the last record was done.
So now we had the singular path and have been since the beginning, we thought, what are the functionality we could make it common to data scientists and add more value to their lives by adding it as part of the right pipeline. So we came up with modules that were sort of configurative and transformations. So you could transform the data frame and perform actions based prior to writing to the data warehouse. So how do we go about adding this? Like, so the priority was to make them configurable so you could switch them on and off via Spark properties, each module had to behave the same way for every write pipeline. They have to be configurable to let’s say either block writing or not in the event of failure. And we had to add additional documentation for each module to help steer data scientists to use it in the correct way.
So we finally built three modules. I’ll talk about them individually. And this is Journalizer this data cleanser and data quality checker. They look in the flow of things coming from the Ryder library. When you input a Spark data frame, you go into Journalizer and you go into data cleanser and you go into data quality, and finally go forward to write it into the data warehouse. And I’ll talk about each of these sections individually. S.
o let’s jump in first into Journalizer. This is where we are, we’ve validated the data frame. And we looked at Journalizer as the first module. So what is journalizing and I didn’t give you a bit of a background, but we noticed that data can change particularly, let’s say taken an example of a favorite color. If a client let’s say they’ve changed it over the period of time, how do we track these changes and preserve the old values? These are slowly changing dimensions. So we had to come up with a solution that also checked and made sure that we tracked each change and also maintained a current understanding of what is the favorite color today.
So there was two ways of capturing this historical information. We had two types of hive tables to store this information. One is history, and one is journal. So let’s go with history. We recorded all of the data daily and partitioned by date, but these contained all records duplicated across partitions, even if the record is repeated. It was difficult to track in a sense to track any kind of changes, like the way I pointed out with the favorite color. And it was definitely harder to access the table because of the size. So we came up with this notion of journal tables, which was more compressed and de-duplicated, and we kept it simple it’s to keep two partitions. One is the current value and the old values. And we track the changes by essentially adding timestamps. Think of it as a start and end timestamp to show that there was a favorite color of valid for a certain period of time. And then we sorted it for easy access by any kind of particular primary key set up.
So these two look in this way, like with the history table, you have all the information repeated and consistently stored. So it’s hard to track that nuance to change, but on the journal side we have, and I’ll talk about each of these portion separately, but on the journal side, you have a sort of compressed view with the dated so that you know, that this value is valid value valid for a particular period of time. And in the last week, let’s say the current value is set to as sort of infinity in the end date to signify that this is the current value and the partition is marked appropriately.
So given the compressed nature of Journalized tables, we figured moving historical data into them. Make more sense. So journal table here, it’s meant to be a ledger of change of values and a pointer to the current values. So now let’s look at how they’re created. And look, when we went through the process of creating a journal team. So beginning, we started asking ourselves the question, like how could we get easy access to latest information about a client or a particular key in this case, and how can be compressed and de-duplicated as compared to the history set history table that we, we came up with and can we determine how long a particular color or value set to a certain value in the particular table and how do we order this? How do we update this table each time and maintain this ordering? And finally, where do we do this process of conversion and where do we make sure that the table is consistent each time we write into it?
So to explain the concepts of con compression, here’s an example of, let’s say blue has been the consistent value for the few days and the purple comes up on the 23rd. But on the journal side, what you do is you set blue to be, oh, let’s not, let’s say that it was the first record of value. It start date would be marked as so, and then the end date until it was not valid anymore. And now purple is the valid data. So a valid favorite color value.
And so you mark the start date of, since it’s been valued and then right to the end at an end time stamp, like I showed before, and the same goes with current pointer on the upper half of the diagram, it’s the blue color it’s marked as the current, since there’s no other value. So you mark that as the current value that you’re tracking and on the lower half, when you have this kind of change, you push it into the current zero partition on your right for blue, and then you mark purple as one, so that you indicate that purple is the current value. So anybody ingesting the current equals one partition is supposed to get purple as the value, since that’s been marked as the latest value.
So coming to the process of Journalizing you, how it’s facing the user and how it’s provided on the backend. So the user actually creates the journal table and sets the field to track the metadata. Let’s say client ID in this case. And when the data is written to this table, since we need to maintain consistency with the existing table, we reload the table in its entirety, and we performed dedupe and compression, and we set the values like the current values that you’ve seen, if there are any kinds of changes. And then we sort the table based on the date, and then we rewrite the new data frame back into the table, so that any new ETL or any kind of downstream looking at the table is seeing this latest journalized table.
So looking at the workflow, if it’s a journalized table, you are looking at going to Journalized module. We’re reading the original table from the data warehouse, you’re doing the processes. Like I mentioned, detecting changes. You’re compressing it, deduping it, setting the current values, sorting by date. And then you proceed to either the next module. We’re actually writing it into the data warehouse. So it depends on where you’re part of the workflow. So coming to what it gave us, it gave us dedupe data in terms of the pros. The two partitions helped us easily querying data.
So if we wanted all the latest information about a client, you could easily get it from his current wallet one value. And the old data is all stored timestamped and ranged in the is current zero. And the data pipeline just needs to access one partition. It is compressed on timestamps to indicate the lifespan of values and it’s again, sorted, but on the con side, it is a complicated process. We’ll have to admit. And there are multiple steps to writing it and rewriting is a must to maintain the rules of compression. That is something that we have to work on at some point in time in the future of improvement.
So coming to the second module data cleanser here we are, after the Journalizer, we’re looking at coming into the next module of data cleanser. So a little bit of motivation of why we, why we started thinking about this. How can we essentially clean up old or unreferenced data that’s meant to be excluded? How do we make sure that the records don’t continue to persist and how do we delete them or nullify them consistently throughout the warehouse? Can this be actually configured by data scientists to apply to the table?
So let’s say they don’t want to cut a couple of columns being a part of their table, but they don’t want to rewrite the whole thing. We had this mechanism to cleanse. And instead, so what does cleansing mean? Let’s say we want to nullify or delete some columns in a table. So let’s say column a and column B for nullification on your left bottom, it would be setting all of them to null. And the same goes to deleted where it’s set to empty or any other value that is like a non or something that indicates there’s no value in there.
So building this, we have to think about the criteria again, like we did with journalize. It had to be configurable. The user had to be able to specify a certain key that needs to be monitored and the columns that are needed for cleansing. So you could provide that configuration to the module. And we provided at least to begin with the two treatments like nullification and deletion, and they should happen to the data when it’s writing to the data warehouse or when it’s at rest, so that you maintain consistency with the data that’s already in the warehouse and that’s incoming into the warehouse.
So how do we go about doing this? The actual cleansing portion? Like we perform the cleansing at right time, since it was easier. And it was made sure that all future records were clean, despite the source keep having them. So you know that your downstream tables don’t have them, despite the source, providing them repeatedly and separately, we had to cleanse the entire hive table that is not used, but to make sure that the older partitions don’t have this on reference data, let’s say somebody is ingesting an older partition, but they still see it, so that’s not something we wanted. So how do we, what do we actually need? We needed a mechanism to allow data scientists to configure what is needed to cleanse in this case nullify or delete per table. And this mechanism needed to be accessible, right? And at rest to allow cleansing to happen on the data.
So the implementation. We had a bit to think about two things. One is the configuration and the cleansing. We have an extensive metadata infrastructure, and you should see a blog at some point of time about it that allows users to add metadata into own tables. And these hive tables have explicit metadata fields that you can add any kind of literally information into them. And so it seemed like the perfect place to add the metadata or essential configuration per table that the cleanser could access. And coming to the cleansing, it would look at the table’s configuration, figure out that there’s a couple of columns named as column a and column B, and they needed a particular treatment, nullify or delete, and it could react to that metadata and cleanse it particularly the same all the time. Each time it writes and the same module could do it irrespective of the data being at right or at rest.
So how does the workflow come together? The user initially has the table. He specifies a metadata configuration for cleaning. So in this case, let’s say metadata is provided as the key to target is ID. The treatment is nullified and the columns are target are column A and column B. Now the cleanser when writing weeds, the tables checks the columns that match based on this configuration and performs the actual nullification and deletion, and then proceeds with either the next transformation, or it just takes this cleanse data frame and writes it into the data warehouse. So this is essentially the way the workflow proceeds.
And pictorially this is how it looks, whether it be data at rest below, or whether writing data, they both come to the same module. It looks to the high meta-story and says, can I get some table metadata and checks if actually configuration is provided. If there’s nothing, it just goes, goes forward and says, just write it to the data warehouse. If not, it looks at what metadata is provided and what columns are targeted and actually performs nullification on deletion. And then finally writes this cleanse the data into the data warehouse.
And last but not least the data quality checklist is the final module. This is where we are. We came out of the data cleanser and now we perform data quality checks prior to writing to the data warehouse. A little bit of background. How do we detect errors or skews in data? We talked, how do we do that? How do we check for data problems? How does any of this creep up in our data and how can we be more vigilant and pragmatic about it? And how do we give data scientists this power to set up the data quality checks on themselves for their tables when they’re writing.
So that sort of gave us the motivation to see what do we need to check and what we actually need to build. So we needed a service that helps initialize tests and run tests on hive tables and a mechanism that actually calculates metrics based on configured tests on the data prior to writing to the data warehouse, and finally, an interface that allows two users to autonomously set up these data, quality checks and tests on their pipeline. So what would the test in this case look like? So some of the examples we started off with are things like null count. Let’s say you’re checking for is the null count on this column is certainly suddenly higher than a sort of value is the average below or higher than what is expected. Let’s say the max value for certain columns is suddenly higher. Let’s say somebody puts like 10 feet for a height or something because for human, which is an anomaly in this case, and even a row count, let’s say we’re suddenly writing more rows than anticipated. It might indicate a skew or something else as a problem.
How we put these things together, like I mentioned, we needed the service that was operational that gives data scientists, the ability to run operations, like initializing tests and deleting tests and reading tests, essentially court operation on tests and the ability to run these tests when metrics are provided and triggered. And at the same time in conjunction with this, we built an ability to calculate metrics and we can have Spark readily available in the rider module. And so we made sure that this module interacted with the data quality service, define the metrics that were calculated. So let’s say a test indicated that it needed average as an attribute on a call. So Spark run aggregations or any other things like count or null count. We were customized to write and ran these calculations on an input data frame, and it would calculate the metrics, pass it on to the data quality service, and then the data quality service with one of these tests. And then the Spark job would indicate whether the tests failed or succeeded. And it was configurable to proceed either way.
So how do we surface this to users? The data quality service had a, we built a Python client. We were heavily in the Python ecosystem. And so we built a client that helps run crud operations. There’s also of course, a rest API available and on the right of module, it would be configurable and possible to configure this to run at the time of rights. So you could set something like to true in Spark properties. And that would help you run tests if they’re initialized on your table. And then separately, we created an offline mode. So if you wanted to test on already written data, or you wanted to compare or just test it out on your own, so if the user doesn’t want to block rights or wants to do it on, on the right time, they can always do this in this offline mode that we created for them.
So putting all this together in the data quality realm, it comes to the right writer library. If the table has testified, it goes to the module and then it calculates the metrics, ones like average null count, rows, any other metrics that are needed and sends these actual metrics to run tests into the data quality service at step five, which then in step six reports, the test results. And then the data quality module prints them and continues to write if the tests are successful. And of course we added the configuration to fail or succeed based on the results of the tests. And so you could proceed to writing or not based on a configuration that you were allowed to set.
So putting all these modules together in code, these look like this, if there was a right data frame as a function, it would go in sequences, validating, let’s say this data frame has to go through a whole validation step, but if it’s a journalized table, you’d go through journalize. And each of these steps were turned that were original data frame if neither of these steps were valid. And so you had this transformation being more serial, and so you would expect the right things to pass on. And so generalizing the data cleansing happens if it’s configured. And the finally data quality is checked independently on the data frame and the step would crash. If the tests were configured, if our tests are failure or are configured to stop the right. And then if all of these paths, it goes to the data warehouse and updates the meta-story right.
So, because I want to end this with the learnings and the future work that we have planned. So some of these learnings are, we understood adding modules meant more complexity to the writing module, but we understood that each step was doing the really valuable transformation. So it was key to actually put in, put in place, but making sure each of these transformations with performance and efficient was a top priority because we didn’t want the writer module to be slower or have each of these transformations cause a longer time.
And so we made sure that performance was one of the key factors before rolling it out. Of course, testing and with B unit or integration tests was really key in rolling out. We had very few mishaps because of that. And so it was really key to get that solid before, before rolling out. And I mentioned this earlier, but documentation and communication was key to roll it out to data scientists, to help them notify that this is available to them that this is available as a module. You can configure it. And all the knobs and buttons that were available to them were communicated and added more documentation.
The final point I want to make is the data quality checks to make them efficient. Day One was a challenge. Since we have to programmatically calculate some of the partitions of the data frame and make sure that we understood programmatically, which high partitions that would come up to. And so we had to run checks on individual partitions that would eventually be written. And so this took some effort to get running smoothly and actually getting working more performantly as we expected to be. So that is some of the lessons we learned.
What is planned for the future? Looking ahead, we now know that additional modules can be added easily in a similar fashion. So we can just keep them as transformations or add them as configurable entities in a similar way. The data quality is an initiative that’s still being worked on. There’s added support for customized testing. That’s coming, coming into this module. We have the final goal to have data quality being ingrained in the ETL process of our data science workflows, so that people know people have the advantage of data quality at every step of their workflows, and journalizer and data cleansing are essentially mostly static, but we’re exploring alternate solutions to help augment the lead records more efficiently. Since that is one of the downsides that we have to keep doing this more often.
To summarize, I just want to give you a brief overview. We write data with Spark Stitch Fix. We have a single right path into the data warehouse using Apache Spark. And we added three additional modules that were performing transformations that are config driven and available to data scientists at the time of right. These include journalizing, which essentially is writing a non duplicated historical record of data to help quick access and compression. The data cleanser module we wrote helped delete or notify unreferenced all data as based on a configuration that I was able to provide. And finally, the data quality module helped us calculate metrics and actually run tests on incoming data so that we can maintain data quality checks prior to writing to our data warehouse. That’s all from me. Thank you. Happy to take any questions.

Neelesh Salian

Neelesh Salian is a Software Engineer on the Data Platform team at Stitch Fix, where he works on the compute infrastructure used by the company's data scientists. He is helping build out the services ...
Read more