Machine Learning feature engineering is one of the most critical workloads on Spark at Facebook and serves as a means of improving the quality of each of the prediction models we have in production. Over the last year, we’ve added several features in Spark core/SQL to add first class support for Feature Injection and Feature Reaping in Spark. Feature Injection is an important prerequisite to (offline) ML training where the base features are injected/aligned with new/experimental features, with the goal to improve model performance over time. From a query engine’s perspective, this can be thought of as a LEFT OUTER join between the base training table and the feature table which, if implemented naively, could get extremely expensive. As part of this work, we added native support for writing indexed/aligned tables in Spark, wherein IF the data in the base table and the injected feature can be aligned during writes, the join itself can be performed inexpensively.
Feature Reaping is a compute efficient and low latency solution for deleting historical data at sub-partition granularity (i.e., columns or selected map keys), and in order to do it efficiently at our scale, we added a new physical encoding in ORC (called FlatMap) that allowed us to selectively reap/delete specific map keys (features) without performing expensive decoding/encoding and decompression/compression. In this talk, we’ll take a deep dive into Spark’s optimizer, evaluation engine, data layouts and commit protocols and share how we’ve implemented these complementary techniques. To this end, we’ll discuss several catalyst optimizations to automatically rewrite feature injection/reaping queries as a SQL joins/transforms, describe new ORC physical encodings for storing feature maps, and discuss details of how Spark writes/commits indexed feature tables.
Speakers: Cheng Su and Sameer Agarwal
– So, hello everyone. My name is Sameer and along with Cheng, today we are going to talk about the journey of scaling machine learning with Spark at Facebook. More specifically, this talk is going to focus on Feature Engineering today, which is the art of extracting and transforming features before they are used to train ML model. And the purpose of this exercise is to make the model more accurate and stable. So before we start the talk, just a very brief introduction about us. My name is Sameer Agarwal. I’m a software engineer at Facebook in the Data Platform Team. I’m an Apache Spark Committer but most of my work has been focused on Spark Core and Spark SQL and previously I spent many wonderful years at Databricks and at UC Berkeley working on Spark.
– I’m Cheng Su. A software engineer at Facebook, same as Sameer. I’m working at Data Platform team. I’m a Spark, Apache Spark contributor, mostly focused on Spark SQL. Previously, I work on Hadoop and Hive team at Facebook.
– So here is the agenda for this talk. We’ll first start off with a brief overview of a typical end to end machine learning workflow at Facebook. Then we are going to talk about a logical and physical data layouts that help influence many of our architectural design decisions. After that, we are going to take a deep dive into two important aspects of Feature Engineering, namely Feature Reaping and Feature Injection. And finally, we are going to conclude by sharing some of the problems that we are working on in this space. It shouldn’t probably come as a surprise, that most Facebook products and services leverage machine learning. Our newsfeed ranking algorithms help people see the stories that matter to them most. Our ads teams leverage machine learning to determine which ads to be spread to a user. And machine learning is really the, at focal point of several of our search language function and speech recognition algorithms. The speaker illustrates how a typical end to end machine learning flow looks like at Facebook. An ML engineer, starts by first identifying the data source that model will consume. This has typically spread across numerous high tables that are often filtered, aggregated, or joined with other peoples. Once the data source has been identified, the next step is to add and extract relevant features from the storage. This is followed by an offline training piece to build the model. And once the model has been made, there is a finding inference phase to now run the stream important production and this production model is now used for making a center of batch and to get time predictions. The response to these predictions. For example, likes, comments, shares on a particular post, can once again, then be logged on to the raw data or in the raw data that’s often leveraged to continuously and creatively improve the quality of our models in production. So in this talk, we are now going to focus on the two left boxes in this row and specifically talk about, one, how this data is laid out logically and physically and two, how do we leverage these smart data layouts to efficiently add, delete, and extract relevant features from raw data. So talking of data, the Facebook data warehouse currently has exabytes of data that are spread across millions of people. Specifically, for the feature engineering use cases, we typically have a number of use case specific reading tables that are used to store the screening data. These tables are typically huge and it’s not uncommon for them to spend multiple petabytes of data. In terms of schema, we are looking at about tens or hundreds of columns that are spanning user IDs, ad ID, and more importantly a set of features that are currently used in training. In this particular example, let’s say that we are creating on the age, the state and the country of a user to display them relevant ads. As far as the storage is concerned, we usually store these features as maps but the feature idea is the key and the actual feature as the value. These feature values themselves can be simple datatypes for example, there could ins, doubles or booleans, or they can even be complex types. For example, they can be maps of maps or sparks or heredes. Now in addition to the creating tables, we also have a large number of feature tables that are stored or that store all possible attributes of features that we could potentially train on. So in terms of schema we are looking at something that’s very similar to the printing table but they’re much smaller. So in some sense, like these feature tables can have tens of columns that contribute these additional feature records. As an example, these additional feature records could be gender of the user, the set of things that they user likes or dislikes and this all can be potentially used for training. So now that we know what creating and feature tables are. Let’s talk about two common sets of operations that can be actually done on this data. First, Feature injection is the process of extending base features with newer or experimental features to improve model performance. In this example, let’s say we have an ML engineer that decides to experiment the impact of using the gender of the user for ad placement. In order to do this, we need to effectively join the gender table with the training table and upgrade the feature map to include this gender information. So, one way to intuitively think about this operation is that you are now adding new keys to this map in the training table. Second, Feature reaping is this complementary process of removing unnecessary features from the training data in order to achieve better training results or even to just store safe storage space. For example, an MLM engineer may decide that using state or the user’s location to create a model no longer adds value to the quality of the results. And as a result they may choose to delete or reap these features from the underlying table to save storage. Intuitively you can think of this operation once again, as a means of deleting existing keys from a map. So now that we have talked about how this data is laid out in these screening tables and this feature tables. Let’s briefly talk about how this data is physically encoded At Facebook, we use a fork of Apache ORC as our internal data format. But many of you might also be using Parky, which behaves very similarly. Now in order to better understand how do we do these operations efficiently? It will be useful to have a quick recap of what an ORC file actually looks like. An ORC file consists of a group of rules, which is called raw data and they are referred to as stripes. Along with this auxiliary information that can be stored in the file storage. Within each Stripe, we have several streams of each columns and each of these streams can have a different encoding and compression strategy. For instance, in DGS can be run and encoded, streams could be inductively encoded and so on. So now let’s see how will feature map or a map is actually stored in the ORC. By default, the keys and the values in this map are stored in separate screens or columns. For instance, let’s consider that we have three rows in this table. We have keys, key 1 and key 2 and then we have values, B1 and B5 that are spread across all these columns. Now, in this case, each key and the value stream is individually encoded and compressed. This means that when we are trying read or delete specific keys from the table, we need to re decompress and decode the keys and values, which is quite inefficient To solve this problem, we added a new flattened map encoding in ORC spec. In a flat map, all these values that correspond to each key are now stored as a separate stream. Therefore, for the table that we, the example that we discussed in the last slide. In this specific case, we now have two streams, one for K1, which has all its values and the other one for K2, which has all its corresponding values. For those who might be familiar with how structure encoded, this is actually very similar concept there. Each track, basically has its own stream in the same way each key in this map has its own stream. Due to this, since each value stream is now individually encoded and compressed, as a result of this reading or deleting specific keys from this table now becomes very, very efficient. So let’s see how this makes this whole concept of feature reaping efficient. Just to recap, feature reaping was this process of removing unnecessary features from the training data. At Facebook, we have a number of high-level feature ripping frameworks that generate the SQL queries, which may be used to certain tables, partitions, and the feature IDs in the whole process. For each reaping query, Spark executor launches a SQL transform process that uses native C++ binary to do these efficient flat map operations. This is on the data path. On the control path, the Spark executer is now responsible for atomically committing the data to the partition. This allows us to not unnecessarily encode, decode columns or map keys that aren’t being touched, which makes it extremely efficient. So talking about performance. There are two cases that we actually evaluated. For case one, we took a training table of 20 petabytes in size that consisted of thousand features and reaped 200 features out of it. The solution was 14 times more efficient than reading and re writing all the data against the data. For case two, we took a training table that was much larger in size. This was 300 petabytes in size and consisted of 10,000 features for an order of magnitude more features and we still reaped 200 features out of it. The solution was actually 89 times better than the current base line. The induction once again is that, the complexity of the reaping here depends on the number of features that are being reaped and not the total amount of data in this table. With this, I’ll hand it over to Cheng who’s next going to talk about Feature injection.
– Feature injection is another important operation for feature engineering besides feature ripping we just described. In previous slides, we discuss feature reaping as a efficient solution to delete features. And here we discuss feature injection as an efficient solution to add other features. Okay, let’s go back to our previous slides for data layouts. On the left, we have a training data table which contains all the features we use in production, such as age, state, country, et cetera. The table is pretty big and it can contain thousand of or at least several 10,000 feature per row. As the other hand, we have multiple feature tables on the right, which collected all different kinds of features every day. Most of them are not included in training table. For example, when machine learning engineer wants to do a training model experiment to figure out why their gender in the feature table is a good feature to be used in production. So one way is to join training table and the gender feature together and then use a join algo for training. However, this is not acceptable. As a training table has very large volume and it takes too much computer storage source to compute for every features. To make this work in production, we have basically two requirements for the solution. The first one is to allow machine learning training experimentation to be fast, as fast as possible and the second is to save storage space as much as possible. So here we are introducing our solution. Aligned tables. Okay, what is a Align table and how it can be efficient? So intuitively we are storing as output of the join between training table and the feature table into separate row by row aligned table. So aligned table is the table that has seen layout as original training table. In the way that it has seen number of files, same file names, same number of rows, and their row order in each file. Here’s a simple example. On the left side, suppose we have a training table and it has two files. file_1.orc and file_2.orc. So each file starts, for example, user ID and corresponding production features. The features can’t be solved in [Indistinctive] which is pretty large. On the bottom, we have a feature table and it has one file. For example, file1.orc. So it start the feature, feature we want to do the experiment. F1, F2, F4 et cetera for each row. So we join the training table ID and the feature table ID together and it generate the aligned table on the right side, which has the same number of files with the training table you can see and the same row order in each file. For example. In first file, file1.orc, it has a feature of ID 1, 2, 5 et cetera. So why this solution is better than drawing joining table and the feature table together? So note, we only raise the ID column from the training table for the purpose of join, which is pretty fast. We do not read or write the production features column from training table at all. So read and writing production feature column has a major computer installed read columns here. And here is the secret source that how we generate aligned table. We dedicated Spark panels apart and example of Spark physical plan is on the right side. So note some of the query plan notes, such as a shuffle before the drawing and skip here for simplicity and the light green query notes are ended specially for align table. Lets go over that. After scanning of training table at our project operator is ended to annotate as a file name and the row order for each row. Then after the drawing, an extra shuffle is ended to shuffle our file name. This is to make sure all the rows with same file name will go to one shuffle position. And after the shuffle a local salt on the file name and row order is escalated. When writing the table, so we have the rows to be sorted are original row orders and all the rows essential are rated together. We have talked about how to write aligned table. Here is how we read aligned table efficiently. Our file of ORC and aligned table row by row merge reader is imitated. The reader reads each aligned table file with a corresponding original training table file in one task. And each task will read row by row according to the row order in the file. And for each row, the corresponding column will be merged together. As a example here on the left side. Reader task one will read the file one from the training table and aligned table and for each row for example, row with ID one, two and the five, the new feature F1 ,F2 now will be merged into the training table production features column. Here, we give out a performance comparison between our aligned table solution and other tool baseline solutions. The first baseline solution is what we described before, which is left outer join between training table and the feature tables and output will be a materialized training table with new feature. And the cons of this solution is pretty overwrites. For every training experimentation, we need to read and overrides all the columns of training table. We compared a lot in table solution versus the left outer join solution and we observed quite huge wins there. There is a computer savings to be 15X and the storage savings to be 30X. And there’s a second baseline solution we are comparing with it’s called, lookup Hash Join. It works in the way to load feature tables into a distributed hash table. Here we use laser, which is a internal Facebook distributed hash table service built on top of LogsDB. There is a paper link at the bottom. You can learn about laser and when reading each jar, follow the training table and look how hash join is performed. For example, on the join key ID column to merge new features from distributed hash map and the production features together. The cons of this solution is that, this needs an extra dependency in the system, which is digitability hash service. It needs several steps such as loading the hash table and join it from the hash table. This impacts latency, reliability and efficiency from time to time. In addition, the lookup hash join to is not free and as is it costly, as we need to do the drawing every time when we read in the training table. We compared aligned table solution versus lookup hash join solution and we observed 15, 1.5 as computer saving and a 2X of storage savings. To recap, we talked about four major points in this session. First, we give a overview of machine learning work of flow at Facebook. Then we talked about the smart data layouts and encoding we did ad for engineering and Facebook. Then the feature reaping solution to efficiently remove features and a feature injection solution to efficiently add features. So for future work in this area, there are two directions we are working on. The first one is to provide better Spark SQL interface for motion learning primitives. To recall for feature reaping, our current SQL interface is SQL transform and for feature injection, our current SQL interface is a specialized drawing and insert into aligned table. But we can provide a better interface for our ML engineer, such as absurd primitive. Here is a example to merge the certain features into a training table, which is much more cleaner and intuitive. The second direction is we are onboarding more and more ML use cases to Spark. Currently we are focusing on feature engineering, which is data preparation before ML training and inference and we are working on to onboard batching inference and the training use cases to our Spark platform. That’s all and this concludes our talk. Feel free to ask any questions and your feedback is very important to us. Thank you.
Cheng Su is a software engineer at Facebook. He is working in Spark team, Data Infrastructure organization at Facebook. Before Spark, he worked in Hive/Corona (Facebook in-house Hadoop MapReduce) team. Cheng received a Master's Degree specialized in computer system from University of Wisconsin-Madison, USA, and a Bachelor's Degree from Nanjing University, China.
Sameer Agarwal is a Spark Committer and Tech Lead in the Data Platform team at Facebook where he works on building distributed systems and databases that scale across geo-distributed clusters of tens of thousands of machines. Before Facebook, Sameer led the open-source Apache Spark team at Databricks. He received his PhD in Databases from UC Berkeley AMPLab where he worked on BlinkDB, an approximate query engine for Spark.