Real-time Feature Engineering with Apache Spark Streaming and Hof

Download Slides

Feature Stores for machine learning (ML) are a new class of data platform for the organization, governance, and sharing of features within enterprises. A typical feature store is a dual database architecture, where pre-computed features for training are stored in a scalable SQL platform (Delta Lake, Apache Hudi, Apache Hive), while features served to online applications are stored in a low-latency database or key-value store (MySQL Cluster (NDB), Cassandra, or Redis). Feature Stores, however, do not provide a solution for real-time features (such as user-entered data or machine-generated data) that cannot be pre-computed or cached. If the feature engineering code that transforms the raw data into features is embedded in applications, it may need to be duplicated outside the application in pipelines for generating training data.

In this talk, we introduce Hof (Hopsworks real-time feature engineering) that provides transformation of raw data to features at low latency and scale using Apache Spark Streaming, Pandas UDFs and PyArrow. Applications use Hof by sending raw data to a HTTP or gRPC endpoint and receive the engineered features, before sending the full feature vector to the model for prediction. Hof enables the real-time feature engineering pipeline to be reused across both real-time and offline use cases (when creating training data for the same features). Hof can also enrich real-time features and build complete feature vectors by joining real-time features with features from the online feature store. We will show how the core feature store principles can be extended to real-time feature engineering: code tracking, feature pipeline reuse, ensuring the consistency of features between training and serving, and automated metadata and statistics for features. Finally we will show how the Hof architecture enables real-time features to be debugged, audited and saved for re-use in training models.

Speaker: Fabio Buso


– Okay welcome. I’m Fabio, I work for Logical Clocks and in this presentation we’re going to discuss real time feature engineering, with application spark and . We’re going to discuss real time feature engineering in the context of feature stores. Feature stores are topic nowadays in the ML loft space. We have several books additions, regarding the feature store. But to summarize it essentially the feature serves as a repository for curated machine learning features, which are ready to be used by data scientists, both for training models and also for selling them in productions when the models is being served. There are a lot of reasons why you want to have a feature store in an enterprise. The main reason I would say is that, you want to ensure consistency between the data that you use for training and the data that you use and that you are serving and to enrich inference vectors before sending them to the model for predictions. There are others reasons why you want to use that. For governance for instance, to speed up development process when it comes to generating new models and training new models, but also as a centralized place for storing metadata, storing statistics, and tugging and building up a cover logo features and so on. As I was mentioning, we have like previous talks that you can go more in details, that you can look at. But essentially the concept of feature stores builds on a dual database setup where basically you have a large data warehouse like and database, it can be opportunity, it can be a data lake. And what you basically store the back of your feature data that you can use for training a new models. And on the other hand you have a low latency real-time database, like in the case of opposites is my secret NDB cluster. We can use Cassandra or this essentially to serve the same features live in productions. There is still one use case, that we have encountered several times when working with customers is essentially a real time feature engineering problem. The previous setup the database set up requires, data to be known in advance. So it can be featurizer and it can be cashed in the two databases. In this specific case, the data is not known in advance is actually sent by the client at the moment that the client needs to do the inference request. So we can’t be pre-computed, it cannot be store and it needs to be processed real time. And the reason he wants to do this processing is that you have to basically make sure that the data sent by the client is actually consistent with what the model expects. For instance you might want to do one of the coding of some features you might want to scale those features numerical features down so that you achieve numerical stability, or you might want to do windows advocates over for instance, the number of transactions that user’s done in the past X minutes for instance or past hour. Additionally this is a good point where basically declines is not necessarily knows that all the feature vector that needs to be sent to the model but like all their pipelines might be feeling the rest of the vectors. And so the data coming from the client, needs to be joined with data coming from the online register. There are a bunch of more requirements to make the problem more interesting. One is essentially you want to add from architectural point of view, all the complexity from the clients. Clients may be deployed in different settings so the simplest they are the better it, they are maintainable let’s say. Most of these use cases from our experience are user facing. So essentially you need to have strict SLS and terms of latency for computation and feature engineering and jobs so that these features can be sent to the model and eventually sent back to the client. And additionally always on the topic of adding complexity you want to avoid doing any feature engineering in the client itself. For the same reason you need to be able to ensure consistency between what the client sends to the model and what the model expects. And if you have underserved clients up there that are making a request, they need to be all consistent with the model that you’re currently serving that becomes an impossible task. So there are different ways that you can sort of achieve real time between engineers and ready. The first way we’re looking at is actually the transform module or of the TensorFlow packets. Essentially when you build a modern TensorFlow, you can package it and you can define what’s called a signature function. That’s what the model expecting as an input. But you can also define our pre-processing function essentially the backdoor, when it comes in to the model for prediction, it goes through this pre-processing function and that can be a place where you can do some simple feature engineering essentially. There are some drawbacks of this approach. One is there is a code is actually packaged together with a model. So you don’t really have any visibility of what the model is actually doing, in terms of pre-processing. You can’t also reuse the same processing function over and over in different models, on different versions of the same model. So it becomes a bit tricky to keep up to reuse, and keep track of them. The other two issues are the processing function is executed on every single vector. I come again so there is no concept of state, and there is no concept of… There is no concept of windows and it’s impossible basically to do aggregations over multiple requests come again. And this is also a single thread that operation which means that it doesn’t really scale with like lots of features, being available. Another solution could be to use a KFServing Transformers. Essentially KFServing is a set of tools that allow you to basically deploy models on top of a Coobernetti’s it’s part of the cube flow family in one of these components is actually autonomous. The idea is that you implement the pattern interface you can see on the right side, and you can run a Python code to actually do the transformation. Again there are sometimes size also here, the main one is that you are rewriting the pipeline to transform the data, to do the feature engineering process. Essentially you are going back home like before having a feature store where you have like different codes, for antrostomy features for draining, antrostomy features for serving. There is also, does it scale with number of operations, certain number of number of features. It’s not possible again to single the inputs are processed one by one so there is no concept of like aggregations and windows and so on. And it doesn’t come out of the box with support for fetching features from the online feature store and zoned them together. And the same time the data gets pre-processed, gets sent to the model, and there gets thrown away. So there is no easy way to actually save the speechless data, to be able to use later. Third approach which is what we actually I’ve been building, it’s called Hof. We try to basically solve all the issues mentioned before this Hof service. Essentially Hof as service is a backend service and runs and accept requests from clients. The pre-processing is not done on Hof itself, but it’s done on a spark streaming application, so you can write your own transformations and you can use the code both for training and both for generating data for training but also for real time. Essentially you can write basically pretty much postmark. You can write , you can write Java code. Hof comes with first cost support for joining real time feature engineering data with data coming from the online feature store. And it’s also a plugable architecture in a sense that allows us to and we see later allow you to basically extend the architecture to save data and so on and so forth. In data Hof is basically a gRPC/HTTP endpoint, clients submit these requests. The client itself is mostly state less, the processing as we’re saying it’s not done by Hof itself but it’s offloaded to a spark application. The stateful part is actually only to manage the routing of messages we will see later on the message queue. Essentially you can have as many off instances running in powder. We run them on docket container on Coobernetti’s, so you can scale them up and down depending on the load. What essentially buffet of Dallas is a sensitive question message queue. In we use Kafka but it can be extended to support and manage services like the one provided by Amazon. The message was sent to a queue on the other side that is a spark streaming application reading. The same the feature engineering, and then the messages are sent back on another queue and Hof pick them up and send them back to the user. In more details if you look at the all the messages are sent around essentially, if you see the the diagram on the right side, user sends a message to Hoff, there is transformation that we have registered in our feature store, and there is one Kafka topic, which essentially acts as a input topic for the spark application. And in the example is the middle one. And then there is one topic for each running of instance. Hof itself use the obstacle rest API to basically manage the life cycle of the topics. So it starts and stop center… Sorry its answer and removes them, when new instances are added to the port basically. When the run Hof instance receives a request from the user, it figures out which information the user wants, sends to the correct Kafka topic, and when it does that in course instance itself in the key of the message. So that on the other side, the spark application knows, knows to which user to send it back. On the other side of the brokers, of the topics, there is a spark application running. One point is that at this point, Hof does not enforce any schema on the incoming data, that allows for two things. First of all it saves us to do one additional decellularization, reducing latency. Second of all, allows us to use use the same Hof instances across multiple transformations. So we can scale the number of instances independently the Hof is independently from the number of transformations that you are currently running on the feature store. On the other side of the same as mentioning that is part of spark streaming obligation. Now here it really depends on what you’re trying to do. If you are doing self-contained transformations so in the example before you are processing one by one, you can potentially have multiple spark applications running at the same time, pulling data from the same Kafka topic, so that you have basically achieve HA and you improve throughput. If you are doing more complicated stuff like if you are actually doing processing windows processing and so on, then you need to state and you need maintain the state. But at that stage you can potentially split in transformations over multiple applications and have them anyway consuming from the same topic. The only difference is how you set up basically the… How you set up basically the consumer group. And finally once the transformation has been applied, the spark application knows which instance, which Hof instance generated this message, because it’s encoding in the key and it sends back to the correct, to the correct Kafka topic, Hof instance picks it up and sends it back to the user. And we have a demo later on and we can see exactly our API can actually work in that sense to help out with this message routing and so on. As I was mentioning at the beginning the offer architecture is I should have put the, it can be expanded and extended with the downs. and we do have support for some of the downs at the box. Some of them can be righted by the user themselves. One of the things you can achieve is basically you can save incoming data. So feature data coming in for business formations, on an S3 docket, or on a observers dataset. There is a you might want to do that is because that in the future you might decide to do additional transformations, on those data and that can help you to basically as a source of data or . You might also want to save the transformation the data transform themselves… Data after transformation. The reason you want to want to do that is for everything for instance, so any given time you can go back and see exactly what your model was predicting on. But you can also save it for generating new data set in the future. So what you can do, you can basically save the data come again save the featurized, and leave empty basically the label the prediction that is gonna be filled in later on down the road when the traditional actually becomes known. Innopsis you can save this data in a cash feature group, backed by auditable so it allows you to later like in a month time to sign whatever, allow you to go back and do an upset on the table and delaying all the labels that you have discovered basically. And that becomes a nice source of data or level data that you can use for training new models. And finally, this is a good place also for looking for data drifts. Basically data coming in, this spark applications has access to the, with train data set during the asset statistics. So any given time it can compute if the data if the distribution of the data, is somehow drifting away from the expected, for which the model has been trained on. And at that point you can decide if you trigger an alert or essentially trigger a new training for a new model. This is what the API look like in terms of like from a client prospective. As you can see as essentially before there is the possibility of joining together online and streaming and we see data API for that. This one has only the streaming section API. You can specify a transformation that you want to apply. Again off supports like is independent from the transformations that you have registered on the feature store. If you have the specified which one, which one do you want to apply so that Hof is able to discover the specific Kafka topic to which some data. And then you basically have to postulate itself is it just a Jason file? It’s free form. And the application on your side will ensure that the data is actually, match the expected incubator. From an application perspective, we have to ask one is the feature group definition. And here we basically the top part we are basically defining a function. And we are basically registering this function with the feature store, so that we are able to let around to retrieve it and execute it basically. We have parameters you can pass in the create streaming feature group. I felt mainly if it’s a bad use of continuous and advanced of configuration for the feature group itself. And on the other side, we have the processing itself so when you actually running the code and the method itself, you basically get the call from the feature store and you call apply on the transformation. So you could apply on the feature group, and that basically triggers and set up the job and make sure that the messages are routed correctly on different topics and so on and so forth. So that removes and hides away all the complexity of essentially setting up all the routing. As I was mentioning not all the data needs to come from the client itself. And most of the cases this is not true. So most of the cases the client needs data, additional data from the online feature store and the way actually we do it with office basically officer is responsible for on behalf of the user fetching the data to the online feature store. If you see the diagram before on the right, you can see that we have an additional spark applying reading data from S3 docket and writing on my shippers along the B cluster which is the online feature store for the opposites platform. And if you look at the transformations… If you look at the request at the client now sends, if you want to join different features coming from both streaming and online, and the client also provides also an online, an online section, that allows user to specify, which strain dataset to use as a base for the features. So in opposites when you create a three-year set we save all the all the features, from which feature groups they come, and in which order they are present individual vector, so that we don’t break the model basically. So here you basically can specify that you want the user specified that he wants data from model version one. You can specify a filter, allowing you to select exactly, like for which in this case if we use customer but essentially or which primary key you want the data from. You can also specify some shown here in the UI in this slide. You can also specify a specific model. If you have trained a model in all sorts, with providence we can define, we can figure it out automatically and implicitly, which it was used to try in which model. So you can also specify a specific model that you want to use as a online use of data. So in this time we are going to see how we can use the API presented before, applied on our specific use case. So the use case here is trying to predict frauds in credit card transactions. We have a bunch of notebooks, Jupyter notebooks so we can use to see the APIs in action. We can start from the last one which is the… Are supposed to be this notebook represents the client itself. So this is what the client is gonna be sending to the backend service, in particular this is a section with data itself. This is what the client knows, the category the date, the credit card, and the amount has been used, it has been spent. So this is what is gonna be sent to the actual off end points for transformation. But transformation itself is provided a separate notebook. Here we are for the sake of the demo, we are using a Jupyter notebook, but on actual platform, it provide confirmations as a GFI or a Python file and transformation can be pushed from an external CICB pipeline for instance. So you can keep your codes for transformation committed and argued to the poster. In notebook suffix for the simple we are on top of doing a bunch of inputs. We are establishing a connection with the objects feature stores, and this is the transformation one transformations for one the feature engineering step. Here we are basically one of the thing called the category feature. Second is a personal care, travel tech. There is gonna be outputted and return, to the client for inference. So that I can make multiple of them so we can wrap that around into this transform function. Transform function needs, experts of data frame and applicate ID. This is what is gonna be called by the API, by the HSF/API in the actual application in actual streaming application at every batch that comes in to the spark streaming basically. And finally we registered this code as a streaming feature group. We’ll give it a name, give it aversion, and we actually save it. What this save does behind the scene does a bunch of things. One of the thing is being we can actually save the notebook at a specific version of the notebook in the officer’s platform itself and this allows for two things. The first one is you’re always able to see what cause of transformation is running. And the second one is, to be able to retrieve this notebook and the notebook can be converted into a Python file and the Python file is then attached to the application doing the streaming transformation itself basically. The second thing it does is also set up a bunch of topics. I actually want on one single topic, as this is a topic that the office gonna push data to. And this box streaming application is gonna consume consume data from. Just want to mention that all of this is actually monitored. We have dashboards alert thing set up for either cuff cuff and also for the spark application themselves. Here nothing has been pushed in Kafka for the past five minutes there is nothing really to show. But in this way you can always control that your application is collecting saving and collecting applying transformations. So we have a final notebook, which is the apply notebook. This is actually the actual notebook that actually set up the spark streaming application and does the actual transformations. So the notebook is fairly simple as well. The top is the same. So basically setting up a connection with the objects feature store. Here we are fetching both the streaming feature group and the cash feature group. What I did I basically created, the same feature group, both in streaming and the cash version. The reason for that you should remember before I was mentioned how you can extend the Hof architecture to be able to save the transformations. Certain transformations on a cash feature group so that they can use them later for training new transformations the model basically. So that’s what actually we are doing here. So on the streaming feature group icon we apply, and we’re passing the cash feature group as a additional output basically. And we are also passing a bounce of addiction with a bunch of arguments. In this case is only one. And these are arguments are gonna be provided to the spark application when setting up the strings itself. In this case we are passing the group ID. So basically the idea is that because transformation is stateless, I can run multiple instances of the , and if I put it down on the same group ID, then basically each mess is coming in, will be processed on it once by either one of the instances of the notebook running. So this is how the transformation looks and to end. Okay thank you. Thank you for listening. You can look at the code for all the reports today at You can follow us on Twitter, and you can visit Data+AI to trade out in our platform and to play around with it. Thank you very much.

Watch more Data + AI sessions here
Try Databricks for free
« back
About Fabio Buso

Logical Clocks AB

Fabio Buso is the head of engineering at Logical Clocks AB, where he leads the Feature Store development. Fabio holds a master’s degree in cloud computing and services with a focus on data intensive applications, awarded by a joint program between KTH Stockholm and TU Berlin.