Sawtooth Windows for Feature Aggregations

May 28, 2021 11:05 AM (PT)

Download Slides

In this talk about zipline, we will introduce a new type of windowing construct called a sawtooth window. We will describe various properties about sawtooth windows that we utilize to achieve online-offline consistency, while still maintaining high-throughput, low-read latency and tunable write latency for serving machine learning features.We will also talk about a simple deployment strategy for correcting feature drift – due operations that are not “abelian groups”, that operate over change data.

In this session watch:
Nikhil Simha, Senior Engineer, Airbnb



Nikhil: Hello, everyone. I’m Nikhil. I work on Zipline at Airbnb. Zipline is a feature engineering framework. And today, we are going to talk about Sawtooth Windows. So before we dive deeper into why you need Sawtooth Windows for generating features at all or why you even need windows, let’s talk about the larger context around this problem. So, we are specifically interested in Supervised Machine Learning. And even in that, we are only going to deal with structured data in this talk structured data is the data that’s present in databases and in event streams. And to give the counter example, unstructured data is media like images, video, audio, and freeform text.
So with that little bit of context, I want to establish a few rules of thumb, maybe these are things that you guys are already familiar with. So just to visualize the context here, your data scientist starts with the Machine Learning Problem. And then, he begins by exploring data that is available across the company or available externally in public for free. And then, he goes about transforming that raw data into features that could be useful for doing predictions. So, he would train a model by using the labels, until the model is satisfactorily predicting the labels. If not, the loop continues. But once the data scientist is happy, he would go about serving this model and features. And this will all finally be wired up to the application, that is actually using Machine Learning to predict something.
So, Zipline deals with just Feature Creation and Feature Serving. So it is this red box that we highlight here. So, with this context in mind, there are a few rules of thumb. The first one is that complex models are better than simple models. When I say better, it means we can identify more complex relationships within data. So to give you an example, let’s look at predicting the output of XOR. A linear model will fail to predict the output of XOR, but it will be sufficient to predict the output of OR. So, the labels here are basically green and red, and we are trying to separate the green labels from the red labels. So, a line can do that for OR. But for the XOR, a line cannot do that. So you cannot draw a line that can separate the red labels from green. That’s what I mean by a complex model. So, it’s a higher degree polynomial. So, the deeper you get with your neural networks, the higher the complexity of the model becomes. And linear models are less complex than some things like SVM or decision trees.
So the more relevant rules of thumb here are, good data is much better than bad data. When I say that, what I mean is, the labels that you have are mostly true and balanced, which means you have the same number of positive and negative examples as your problem needs. And then there are Features. So, this is where the crux of the talk is going to be. We want the features to be consistent. What I mean by that is, what you use for training should be the same thing that you use for serving. It sounds almost trivial, but this is extremely hard to achieve in production. And there are no tools out there today that allow you to get that consistency. And the features should be real-time in the sense that, you don’t want to use stale feature values.
You want to use real time values as an update your features as events happen across your product immediately. And the third thing is stability of features. What I mean by that is, you want to choose features that don’t grow continuously over time. So, I’ll give two examples. So, first thing is the total amount of time that a user spends on a particular product. This thing keeps growing throughout the life cycle of your product. And if you use this as a feature, you would need to retrain your model to be accustomed to the new values of total usage time. Instead, you could change the sum or the total with an average time that a user spends in a day on your product. And that value wouldn’t change drastically. It will only change when users behavior really changes, which is the signal that we are trying to extract.
So, another interesting rule of thumb based on the last two slides is that, simple models with good data beat by far complex models with bad data. And another rule of thumb is that, the effort required to produce better data is much harder than the effort required to use a more complex model. If you think about it, it’s just swapping out a library and wiring the feature on labeled data through a new library that uses a more complex model. So why is that? The first reason is that, Realtime Features are hard. The next slide just talks about why Realtime Features are hard.
The other reason is that Windowed Aggregations are unsupported. So even in the SQL language, it’s just about becoming a standard to support Aggregation Windows properly. And as a result, most tools are pretty inefficient when it comes to computing windowed aggregations. And the third reason is that, usually people write two different pipelines to produce Training data and to produce Serving data. And as a result, there is no guarantee between training and serving consistency.
So, why are Realtime features hard? So most data sources like Kafka, Kinesis, the way they’re set up, usually only stores about couple of days worth of data. So event sources don’t go back in history much. So, if you wanted to do longer windows, you’re basically left to writing your own software, that stitches old data that is in a hive table or in some other data with event data that is in Kafka or Kinesis. And, if you’re trying to build them off of a database, you would need to do range scans and databases struggle enormously with range scans. So an example is, let’s say you want to compute average time spent by a user. You would need to look at all the draws for a session and do an average every time a request is issued. So, that breaks the scalability of the database sources.
So, technically there is no real solution to do this, except for writing your own service and creating a new database that indexes data completely differently. So if you see these two problems, the gaps really are not in machine learning at all. They’re in systems engineering. So, if you are a data scientist that did mission learning courses and joined the industry, the problems you would face day-to-day are completely different from the education you received.
And one last thing is that, real time features are hard to backfill. So what I mean by that is, first you need training data, right? You don’t need serving data, but what happens in practice is, you set up the infrastructure to serve and then issue shadow requests or when real requests come in, you assume that you are computing, a new set of Realtime Features, log them, accumulate enough data to train, and then train a model that is going to predict using this new features. So in that case, you would need to wait for a couple of months in most cases, to gather enough data to train your model. So, what’s the goal of this topic? The goal is pretty simple. We have talked about several issues. We said, features should be real time and features are all aggregations. Even the ones that you usually are not used to thinking of as aggregations. We will see some examples soon. And most aggregations should be windowed.
This is for the reason of stability, right? These are the three things that we more or less mentioned. We are going to see how Sawtooth Windows can help us do this. So, we’ll define what Sawtooth Windows are, later in the slides. So, this is the goal of the stock, to show you how to build and manage Sawtooth Windows. So, let’s start with an example. You’re trying to recommend a restaurant. So given a user and a restaurant payer, you’re trying to see how likely that this user is going to like this restaurant. So, you come up with three features. So you want to say, if the restaurant is highly rated, that’s a signal. I can recommend this restaurant to this user. If the check-ins of this user for this particular cuisine of food, is aligned with the restaurant, then you’ll be more inclined to recommend that. Meaning that, that user has already an interest in Greek food so, this is a Greek restaurant. So, you would probably want to rank that higher.
Now the third one is, the latest cuisine check-in by user. So, this is completely made up set of features. So for this case, what I mean by the latest cuisine is that, if you had Greek food yesterday, you probably don’t want to eat it tomorrow.
You’ll try to go for something new or you’ll try to go for the same thing every day. In either case, knowing this helps your model recognize what kind of a user you are and whether this restaurant is relevant. So, just to visualize what’s going on here with these features, let’s say there is a point at time at which you want to predict, how likely is the user going to like a restaurant? And at a later point in time, you’ll get a label saying that, this user went to this restaurant and gave it a rating of four, five, three, two, whatever, right? So, you have a stream of check-ins of this user, all denoted as one by cuisine and you also have a set of ratings that this restaurant received through time. And you are trying to create a training data set first, which looks like this.
Given that label, you want to know the total number of check-ins for this cuisine, for this user and the average rating of the restaurant. And at a later point in time, if you’re trying to predict again, then you would want to update this data in realtime. So, the average rating goes to 2.5 and the total number of check-in goes to four.
So, the contract simply is this. When you’re serving, give any user and a restaurant, do you want to reply with features? And when training, there is a user, a restaurant, then there is a timestamp at which that label was received or the recommendation was given. And you want to enrich this labeled data with features. Before we go further, let’s talk about data sources. There are essentially two kinds of data sources. The first kind is events. So, events are all timestamped.
An example is a user transaction stream. These are events that happened across time that a bank decided to log. And the other kind of data is entities. Usually, you would think of this kind of data as something that is served by a microservice or the data that exists in a dim table. Events exist in fact tables for example. And, the example that is related to the previous user transaction stream, is the user balance table. So as transaction happens, user balance gets updated, but it’s still showing the current snapshot of balance at any given point in time. It’s not the Delta, it’s the full value of balance, right? So, that is roughly intuitive difference between events and entities. And, there are mirrors of this also for non realtime data and in warehouse, you would call them dim or fact tables.
So, just to visualize what I mentioned, let’s say you have a business that has a service fleet and a production database that is backing this service fleet. And, you would have a Message Bus and the service fleet would continuously write events like click streams, transactions, et cetera. And this would get replicated in a data. They can store it as daily partitions. And other sort of data that we call the entities, is the Change Capture Stream and the Change Capture Log. And, the caveat here is that, most companies don’t capture change data in a data like, they just put it in a file somewhere, back it up, but no one really uses this data to do anything. But, Zipline has a fundamental reliance on this kind of data. And, what instead you get in the warehouse is a midnight or a daily snapshot of the production database.
So what Zipline does is, looks at this groups of sources that have equivalent data, the first source. So there is Derived Data and Media. We will talk about the group. The first group is Event Sources. It’s the stream and the equivalent hive table, if you will, in the data like. And the second group is entities, which is the Change Capture Stream, the Change Captured Log and the Database Snapshot, right? And then there is Derived Data. This cannot be real time essentially, but Zipline can still understand and I do aggregations on top of Derived Data. And the third thing that we said initially we won’t talk about is media, Zipline can help you derive data that comes from media like let’s say, embeddings of image that happened to be a vector of doubles that is stored as Derived Data somewhere. Zipline can help you merge that but, media itself is something Zipline doesn’t help deal with.
So, here is a full APA example. I’m just throwing this at you and take your time, pause the video and try to understand what this example means. So in this case, we are computing the average rating of a restaurant in the last year. So, there is a window, there is an operation and then there is group of three data sources like we talked about before. There is a snapshot table, the change data or mutations table and the mutations topic. And similarly, for the other features that we mentioned before, we are trying to compute the user cuisine preference. So, this is an event source. It’s not an entity source, so you’d have a single table and a topic. So, it might help to go back and forth between what I visualized before and this example, to get a sense of how this configuration actually works.
So in this case, we are computing the check-in count of a cuisine of a user, cuisine key and we are computing it over a window of one year, right? And then there is a filter. So, we are looking at the user activity table and we’re filtering only the check-in events from that. And similarly, we are trying to get the latest cuisine that the user consumed and we are saying, the window is seven days. So, anything older than that is out of context. So, if user ate Greek before 10 days ago, it’s okay roughly, to recommend that again to the user.
So the philosophy behind this API, which helps understand the API is that, it looks a bit like SQL, but it’s not entirely SQL. The difference is that, we keep the expression language. So, all the strings that you saw there, that look a bit like expressions in select clauses and where clauses are, what’s called the Expression Language of the SQL. And then, there is Control Structural Language, which is the GROUPBY, the JOIN, the HAVING, SELECT, WHERE, FROM, et cetera. So, the Structural Language is what the API takes over. So, we control how you specify GROUPBY, and we control how you specify JOIN et cetera.
So, another interesting thing about that configuration is that, we enrich the SQL specification or the SQL fragments with windows. And we allow you to specify multiple sources of data as equivalent data. So, the topic, the table and the mutations are all containing the same data or roughly equivalent data. And another thing here is that, we differentiate very strictly between entity data and event data. And internally what happens is that, Python file gets converted to a thrift blog and it’s consumed by a spark job, which is basically a Scala code. These are versioned and these are all submitted to airflow and driven by airflow. So, that’s just to give you an idea for the tech stack behind the scenes. So before we try to understand windows, we need to understand what we are windowing over. We are windowing over aggregations and there are basically different kinds of aggregations.
When I say kinds, let’s start with an example SUM. So, sum is commutative. So, you can sum things in various orders and still get the same results. So if there is a list of numbers, you can sum them in any order that you want and you’ll still get the same result. And associative means that you can break up this list into a smaller list, sum them independently and sum them again. So, the benefit of these two properties is that, you can look at a stream of data or like a data that lives in a list of files. A large number of files use many machines to independently execute smaller sums and aggregate them together. This is really powerful because, this allows us to aggregate data much faster.
So, average is also similar. Take a minute to think about how averages can be broken down. The solution to do this is pretty simple actually. You break down the average into a sum and count and then operate on the sum and count separately. We call that sum encounters, an intermediate representation. So, one implicit condition here is that, the intermediate representation needs to be constant memory. No matter how many events that we are looking at, we only want to keep constant memory to hold the sum and the count or to hold the sum, et cetera.
So, there are two classes of aggregations. The first class is sum, average and count. The second class is min/max, Approx Unique, percentiles, topK. So, pause the video here and think about how they’re different. They’re different because, the second class cannot be deleted. So, if there is an update or something, you can update the sum by subtracting the old value and adding the new value, right? You cannot do that with min and max. You cannot subtract the old value from a min and add the new value. That concept doesn’t exist for min and max. Unless you store all the events, which we don’t want to do.
So, going to windows. We have few kinds of windows. The first one in literature is what we call hopping. Let’s say, you want to look at events that happen in a one hour window. You would break it down into 10 minute hops. And let’s say a query comes in as shown in the poorly hand-drawn diagram. You would sum all the events from e1 to e6 and say, this is the result for my query. If you see, we are dropping the event Eastern, which happened after two o’clock. So, obviously the problem with hopping window is that, it loses events that are after the most recent hops. So, it’s a bit stale and it can be as stale as the hop size.
So if you were to graph it, it looks like staleness keeps going up as the time goes on, until it hit hop size, then it drops and then gives them. The advantage of hopping windows is that, they are memory efficient. Meaning, you can only store, you need to only store the aggregate values that exist in the hop. Now, the next kind of window is sliding. So, just to contrast with hopping window, sliding window is exact. So, it goes exactly one hour back and computes and includes all the events that are most recent. So it wouldn’t drop e7. Instead, it would drop e1 in this example.
So, the benefit of sliding window is obvious. It’s fresh, it doesn’t drop the most recent data, but it’s memory intensive. You cannot slide windows without storing all the events that are in the window. So, you need to hold all these windows and memory. And if you think about it, if you want to do large windows or one year windows, you would need to use a huge amount of memory to serve your features. So, what Zipline uses is called a Sawtooth Window. So, we want to retain the good properties of both hopping window and the sliding window. So, we want to keep the data fresh. Like we do in sliding windows and we want to use as little memory as possible. So hopping windows use very little memory. So we want to have that property as well. But, how do we do that?
The idea is to relax the window size. So, what I mean by that is instead of sticking to an exact one other window, what if we could vary the window a little bit. But instead, only keep the hops, every time you update values. So, Sawtooth Window is basically a union of hopping window and sliding window events. So if you see the query happens at let’s say two or nine, it goes back all the way until one, right? So, this has the property that it includes e7, which is the most recent event. So it’s fresh. And it also has a property that you don’t need to keep all the events that happened in middle. You only need to keep as many partial aggregates as there are hops. So, you would only need to keep the hops that happen in those 10 minute sub intervals. So, if you were to plot this thing again on a graph, the effective window size to time, it looks like the edge of a sawtooth blade, which is why we call this Sawtooth Windows.
The catch here is that, sum and count will also show this flicker, but average and other kinds of metrics won’t show the flicker in window size. But as long as this, this is what to do during training. And this is what you do during serving. There is no problem for models to deal with this kind of a flicker.
So, the architecture is pretty straightforward. We take the feature definition that we showed before and we create the Batch Aggregate and then we also create a Streaming Aggregate. So, if you remember Kafka only stores last seven days worth of data. If you wanted to look at older data, you need to look at hive table and that’s what Zipline does, when I say batch aggregates. It looks at older data from the warehouse from hive and it also looks at newer data that exists in Kafka and Kinesis, and then updates the feature store. And then, there is a client library that you can embed and serve with the model. At the end, this is all consumed by applications. So green is what the user essentially touches. And this dotted box is what is within Zipline.
So, the Lambda looks like this. So, if you were to visualize what data gets stored where between batch and streaming, you would see this kind of data if we were to look at the databases themselves. So, each row in the batch data would be the tail of the window, and then there would be the head of the window. So, a seven day window is broken down into three sections, the middle part, which doesn’t change very often, the tail part, which is hopping accurate and the front part which is fully accurate. And if you were to look at releasing this every day, day after day, it would look like so. So, we would keep a two-day buffer and every time you release, we would stitch together the extreme ends of the tails of the window, the middle part, and the front part, as you see in the diagram.
And, when the release is happening, you see the dotted pencil line, that marks how we compute windows when the release is happening. This is important because, you don’t want any downtime when a release is happening. So, every day we release the batch part, essentially. So take a minute to think about what that set of boxes inside that pencil line means. So, that is the high level overview of how Sawtooth Windows work and how we split them across batch and streaming.
And we technically, automatically choose this hops for users at Airbnb. And we choose them with certain guarantees that, hop size is less than a certain percent of the window size, so the flicker is controlled. And we only chose hops that are daily, hourly, and five minutes so that we can reuse them. So, a 30 day window and 90 day window can use the daily hops. And, this is also used across queries. So if you ask me for a 90 day window tomorrow, instead of throwing away the yesterday’s data completely, we will look at daily hops simply and re-aggregate them. So, that’s it. Thanks for taking the time to watch this video. Definitely spend some time in the place that I mentioned to pause and think about how this thing is to work. But otherwise, I’ll take questions now. Thank you.

Nikhil Simha

Nikhil is a Software Engineer on the Machine Learning infrastructure team at Airbnb. He is currently working on Bighead, an end-to-end machine learning platform. Prior to Airbnb, he built self healing...
Read more