iFood is the largest food tech company in Latin America. We serve more than 26 million orders each month from more than 150 thousand restaurants. As such, we generate large amounts of data each second: what dishes were requested, by whom, each driver location update, and much more. To provide the best possible user experience and maximize the number of orders, we built several machine learning models to provide accurate answers for questions such as: how long it will take for an order to be completed; what are the best restaurants and dishes to recommend to a consumer; whether the payment being used is fraudulent or not; among others. In order to generate the training datasets for those models, and in order to serve features in real-time so the models’ predictions can be made correctly, it is necessary to create efficient, distributed data processing pipelines. In this talk, we will present how iFood built a real-time feature store, using Databricks and Spark Structured Streaming in order to process events streams, store them to a historical Delta Lake Table storage and a Redis low-latency access cluster, and how we structured our development processes in order to do it with production-grade, reliable and validated code.
– Hello everyone, my name is Daniel. I’m the machine learning platform technical leader at iFood.
And today I’m gonna talk about how we built a realtime feature store at iFood.
Just a quick agenda for today’s presentation. First, I’ll quickly go over iFood and AI and what’s the iFood mission. Then I’ll give a quick explanation on what is a feature store and what’s its purpose. And finally, I’ll dive deeper into how iFood built its feature store.
So first, my quick, quick overview of iFood. iFood is the biggest food tech company in Latin America. We’re in Brazil, we’re in Mexico, we’re in Colombia. We have over 30 million orders per month, bearing over 800 cities in all Brazilian States and we are in over 100,000 restaurants, we have over 100,000 restaurants in our database.
And as such being such a big company, we have AI everywhere. Just some examples, we have AI applied to discovery. So, how to recommend restaurants how to recommend dishes. We have AI applied to logistics and operations. So, how to optimize the drivers allocation, how estimate the delivery time for an order. To find the most efficient route for a driver and another example, we have AI apply to marketing. So how to optimize the marketing ads, how to optimize the allocation of coupons and vouchers for the users.
So having an AI applied to so many in different places, it’s necessary to have a machine learning platform this picture here, it’s from a really famous paper from Google. It’s hidden technical depths for machine learning systems. So what Google was trying to point out is that most of the time when a data scientist is working on machine learning code, she also has to worry about lots of infrastructure problems, such as how to collect data, how to analyze data, how to spin-up a serving infrastructure, and so on and so forth. So the goal for the machine learning platform should be to handle all of that data represented here in the dark blue boxes. And so the data scientists can worry about the machine learning code. The machine learning platform is the theme, I am the technical lead for and one of the responsibilities for this team, is building the feature store, which I’ll be going over today.
So, what is a feature store? First of all, what are features, right. So, features are any kind of data, any kind of attributes that you use to train a machine learning model. So, anyone that has played with machine learning tutorial probably has stumbled upon the Iris data set, which we are showing here in this picture. And the goal of the the problem is how do you identify what class a file, is being represented by attributes such as the petal length and petal width, and so on and so forth. At iFood we also have features for lots of problems. And they can mostly be classified in three types. So we have state features for instance. Did the user have or didn’t she have a coupon at the time an order was made. Another type is aggregate features. So, for instance, what was the average ticket price for a given user in the last 30 days at that moment an order was made. And the third type is external features. So for instance, wasn’t raining at the time order was made.
And the feature store basically is where you store the features, that’s right. So it’s the central place you store and serve the features. So anyone in the organization can query for them. They’re mostly used by machine learning algorithms, but they can also be useful for other applications. One simple example would be a non AI logic in the application that you query the average ticket price for a user. So you’ll decide whether to show a high end or low end list of restaurants.
Just a quick overview of the requirements when you’re building a feature store. We have requirements that are applicable to all use cases. And some requirements that are mostly focused on the machine learning scenario. So the general use case you obviously want low latency and we’d like to distinguish between access and calculation latency. Access latency is basically how long it takes to query for the less known value of a feature. And that’s mostly dependent on the database we use to query such as, Redis has really low latency, maybe Hive database has a higher latency, for instance. And you also have the calculation latency, which is the time you take to update the value of a feature. And that’s mostly defined by the processing infrastructure used, such as Spark.
Also another requirement is access control. So how do you guarantee some features are readable by some some users and some are not. For instance, payment related features, they are really useful when you’re building an anti fraud model, but maybe they’re not so useful for a marketing model. So you should be able to control who can access what feature, versioning also very important, so you can update the calculation code of a feature. You should be able to scale your feature store to handle any number of features on any kind of data without compromising the latency and you should have an easy API for data access right. In the machine learning scenario, we also have a few set of feature requirements.
First of all, you should be able to backfill, for the general use case you’re only worried about, mostly worriedly about the latest version of a feature. But for machine learning, when you’re building training data sets, you’re also interested on past values of the future. So when you create a new feature, you should be able to calculate values in the past and really related with this, you should also be able to time-travel, right? So you should be able to get the value of a feature in the past. For instance right now, the average ticket price for a given user in some value, but that’s probably different, than what it was one month ago, two months ago.
So I’m gonna go over how iFood built this feature store.
So first of all, I’d like to give a quick presentation on the iFood software architecture, before we go into the feature store. iFood is going towards a model in which streaming is a first-class citizen. So we have micro services basically for everything at iFood. So every time a user create an order, makes a payment, every time a driver’s has her location updated through the GPS sensor in her cell phone, every time a user logs in and so on and so forth, those micro services they publish an event to a central bus, we are using a set of Kafka queues for that. And those events in turn they are consumed by our data lake infrastructure, real time data lake infrastructure. And this data lake is the source for our feature store, which aggregates the values and creates the features and store them into the future store.
So first, why we chose to implement the data lake architecture such as this, like I said, we have a Kafka queue, receiving the real time events. We have the data lakes streaming jobs, which really read the events and append them to a Delta table streaming Delta table. Why we do that? First of all, Kafka storage is expensive, if we wanted to use a model in which we use infinite potential for Kafka, that would get expensive, really fast. Our Kafka queues, they have limited retention.
But we also want to have full event history because that enables us to recalculate and backfill the features. Like I said, that’s a requirement for machine learning for the feature store for machine learning. And Delta tables provide a cheap storage option Delta tables backed by an S3 bucket. And also Delta tables can double either as a batch or as a streaming source, so that’s a bonus.
So, in really broad terms, that’s the iFood feature store Architecture. I’m gonna give a really quick overview on how the data flows here and then I’ll go over step by step on each part. So, first of all, we read the data from the data lake, the streaming Delta tables, they get aggregated and they’re used as source for the calculation jobs. The results, they’re published to yet another Kafka bus and data from this Kafka bus, they are read from a set of materialization jobs, which finally write them to the final storage that are used by the end users. They’re used by the end users, to query for our future values.
The first part I’m gonna go over, there are aggregation jobs, that there are showed by the set of parts of the feature. So we read data from the streaming Delta table they are aggregated and published to the Kafka bus. And features they are usually combinations of a data source, a window range, a grouping key, a value, a filter, and then aggregation type. For instance, we could have the average ticket price per user during lunch in the last 30 days, and this data can get from an order stream. So features are usually a combination of those six attributes, and we build the aggregation jobs around that.
However, with Spark streaming, you can only have you can only execute one group by operation per data frame or job. And each combination of grouping key and window range results should be expressed in manual data frame. And that means increased costs and operational complexity, because most of the times, you change small parameters in one of those attributes to express a feature. For instance, could want to calculate what’s the sum of the ticket price per user in the last day, the last three days, the last seven days, and for each of these combinations, and you see the code here, it’s really similar. We’re just changing the window size. But each of those result in a new data frame in a new job and that gets you increased costs and operational complexity.
So how we work around that, we created what we call the two step aggregation logic. What we do. we store an intermediate state for several aggregation types for a fixed small window, smaller window. We call that window step, then that window step is actually the name that Spark also uses when defining defining windows. Then we combine those results to to emit the results for several window sizes at once in a single job. This also allows us to use the same code and the same job to calculate historical and real time features. So we lower our costs and our operational complexity.
So that picture here, it’s a quick example on how we do that, for this example, we’re calculating the count of orders per user. In a set of windows, we want to calculate it for one day windows, three day windows and seven day windows. So the first row here, we have the D-6, D-5, and that’s coming from the streaming source. For instance, we could have one order made by this user in the D-6, two orders in the D-5, three in the D-4, and so on so forth and to the D-0, which is today, and that a user made two orders, right. So if we want to calculate the number of orders this user did by one day windows, three day windows and seven day windows, that’s a simple matter of combine those intermediate states. The final result of one day windows, it’s really just a pass through, right. You don’t have to do anything to complicate it. When you’re calculating three day windows inside, the windows gets a little bit more interesting. But basically, what you have to do, you have to combine sets of three days, to have the three day windows and the full set of seven days, to have the seven day windows. In this example, we’re executing a simple count but that’s applicable to, you can use that to calculate an average sum. You can use that to execute count distinct operations, So that’s really flexible.
And how do we express that, right?
So Spark streaming has a really powerful function that is flatMapGroupsWithState. That basically allows you to define a really flexible state and a really flexible input and really flexible output to combine the input data and produce an output data. And by expressing this logic, using flatMapGroupsWithState, we’re able to combine dozens of jobs into one. We have a pseudocode here on how we implemented that, but basically what we do, we read a source, we calculate how much of the state we should actually store we should actually keep, so we can discard all the data in and ingest new data.
And if the input is in some of the intermediate values, we update the data the specific intermediate state.
Then we go over each of the combination rules we have, and then meet and create result result roles for that.
This is just an example of the input and output for dysfunctions. So, for instance, if we have an order, being made by the user of ID one in the first of January of 2020. And we’re calculating the number of hours per one three and seven days, and that one row of input produces three rows of output, here in this example. That are the number of orders, per each of the window sizes.
So after we have the calculated values by the aggregation jobs, we go over the materialization jobs. Which are expressed by this part of the picture. Data comes in from the Kafka bus, they get read by the materialization jobs and finally persisted to a final storage. And the materialization jobs, they work in the following manner. The result from the aggregation jobs, they are feature update comments. Think CDC or log tailing, that’s a really conc concept in which we’ll explain how an operation, how a transaction was made. In the case of the feature store, we usually expressing updates. Some feature for some entity, at some role with some value, right.
So for instance, if we are reading some update comments, for the average ticket price, the last 30 days for a given user, that’s expressed here in the first table, the top most table, and we’re storing that to a Delta table that would be transformed into the table below the bottom table. And we use a trick for the Delta table the storage, in which we use a single column for all the features.
We call that the feature map and we combine the Merge into operation of the Delta table with the map_concat function. So we are able to be really flexible, when storing the features, right. So for instance, we have in this scenario here, we have the average ticket price in the last 30 days with the value of 25.8 for user one, and then another operation arrives for you for the user of ID two. It’s different feature, that’s the number of orders in the last 30 days with the value of 17 and gets to a new row, because that’s row did the next feature before. And then we received yet another common and yet another update expression. And that’s the number of orders in the last three days, but that’s again for user one. So by using merge into and a map_concat, we are able to emulate an absurd, operation for the Delta tables.
Another really cool feature, of this choice of architecture, is that we’re really flexible on where we store the results for the feature store. So right now for machine learning, we use historic materialization job, which uses a Delta table to store the historical future values. That’s what we went through the last slide. But we also use a real time realization job that uses a Redis as a storage for real low latency real time access. So for instance, when the user is creating an order and you want to have an estimate on how long it will take, you must be able to get the features necessary for this model really fast. So Redis gets us a really low access latency. But if for instance, you know, some user in the company wants to consume features from a different storage, say, DynamoDB, ScyllaDB, Cassandra, okay, that’s fine. You just plug in a new authorization job reading from the same Kafka bus and then you’re good to go.
I’m gonna go over the backfilling jobs right now. So the backfilling jobs they are responsible, like I said, for producing best values for the features. The parts of this picture here that are pertinent to the backfilling jobs are those ones. So what we do and I’m gonna go over in a more broad level here because they’re really complicated and we’re kind of short on time. But what we do, we store in a metadata database we use the number the DynamoDB for that. We stored the creation time for each feature.
And having that in hand, we separate the aggregation jobs into two kinds, the normal aggregation job precluding jobs, the backfilling jobs they read the source Delta lake from the Delta lake.
They read order, they even sold the history of events and calculate and then create resolved roles for the future up to the moment of the creation of the future. The aggregation jobs in turn, they have the creation of the future, read from the metadata database. And they only emit results for values that arrive after the creation. So basically, up to the creation, the backfilling jobs, they’re responsible for that. After the creation, it’s up to the aggregation jobs.
So that’s the architecture of the iFood feature store.
We’ve learned a few lessons and a few best practice along the way, and we’d like to share them. First of all, Delta tables double as a streaming or batch sources, that’s really powerful. And that gives you a lot of flexibility. Another interesting point, is that optimize is a most for streaming jobs. Streaming jobs create really small files. So if you want to have an efficient reading from the table, you must optimize the streaming Delta tables. Either on auto mode or as a separate process. And that’s a kind of a corner case. But when you’re starting a brand new job from a streaming Delta table, the file reading order it’s not guaranteed, so and that’s even more not noticeable after you run an optimize, which you really should, but if the order of the files is compromised, you’re basically introducing an artificial late arrival for your events and that can really, it can mess up your aggregation jobs if they’re not prepared to deal with that. So if the event processing is important for your job, you should either use trigger that wants to process the first historical batch as a whole, or you should create some separate process that reads each partition sequentially and guarantees the reading order.
Yeah, yet another point. So flatMapGroupsWithState is really powerful,
but you have a caveat, you have to handle the state management with real care, because if you don’t, you can blow up the memory of her jobs. For each Batch, which is the kind of thing for Spark streaming, it’s really powerful too. But one thing we’ve learned along the way, it’s that it can be triggered on an empty data frame. So if you’re expecting a non empty data frame, you should be careful with that.
Also, when we’re heavily dependent on the Merge Into Delta operation, But you’ll have to be really careful, that you’re actually getting the partition pruning because if you don’t, you’re gonna have a performance drop.
Another thing that’s important to be mindful of is you should change some parameter, between job streaming job restarts. And finally, the StreamTest class that’s as far as I know, available only for Scala and Java. But it really helps with the unit tests for streaming jobs, it helps to debug the jobs and really raises the bar of the code.
Finally, some positive outcome. So like I said, by using this choice of architecture, we have a unified codebase, both for historical and real time features. So that’s basically 50% less code than what we had before. Because we had a separate code, for each for fridge kind of feature. Also, since right now, because of the feature store, we have a single job that calculates both the historical and the real time features. We could trim down the number of jobs from dozens to around 10.
The huge batch ETL jobs they’re substituted by much Smaller stream jobs, but they do run 24/7 all the time. And then also Delta tables they allow for isolation between read and write operations and in transaction isolation between
the reading and write operation, so that’s really helpful too.
And that’s it for the iFood feature store. Thank you guys so much. And if you have any questions, I’ll be glad to answer.
iFood
Daniel holds a master's degree in Computer Science from Federal University of Minas Gerais. He has co-founded Hekima, a data science consulting company, and the company has completed over 100 projects with large brazilian companies in logistics, sentiment analysis, entity deduplication, candidates screening and schedule optimization, among others. He currently works at iFood, LATAM's largest foodtech company, as the technical leader of the ML Platform team.