KFServing, Model Monitoring with Apache Spark and a Feature Store

May 26, 2021 11:30 AM (PT)

Download Slides

In recent years, MLOps has emerged to bring DevOps processes to the machine learning (ML) development process, aiming at more automation in the execution of repetitive tasks and at smoother interoperability between tools. Among the different stages in the ML lifecycle, model monitoring involves the supervision of model performance over time, involving the combination of techniques in four categories: outlier detection, data drift detection, explainability and adversarial attacks. Most existing model monitoring tools follow a scheduled batch processing approach or analyse model performance using isolated subsets of the inference data. However, for the continuous monitoring of models, stream processing platforms show several advantages, including support for continuous data analytics, scalable processing of large amounts of data and first-class support for window-based aggregations useful for concept drift detection.

In this talk, we present an open-source platform for serving and monitoring models at scale based on Kubeflow’s model serving framework, KFServing, the Hopsworks Online Feature Store for enriching feature vectors with transformer in KFServing, and Spark and Spark Streaming as general purpose frameworks for monitoring models in production.

We also show how Spark Streaming can use the Hopsworks Feature Store to implement continuous data drift detection, where the Feature Store provides statistics on the distribution of feature values in training, and Spark Streaming computes the statistics on live traffic to the model, alerting if the live traffic differs significantly from the training data. We will include a live demonstration of the platform in action.

In this session watch:
Jim Dowling, CEO, Logical Clocks AB
Javier de la Rúa Martínez, Data Scientist, Logical Clocks



Jim Dowling: Hi. I’m going to talk about KFServing, [inaudible] model serving, monitoring those models with Spark. Why we need a feature store. I’m going to do it with Javier de la Rua Martinez. So I’m going to start by talking about models that have been a huge success in recent years, deep learning. So we’ve had very information dense signals coming into the types of problems that we can solve with deep learning, image recognition, natural language processing. Images of huge amounts of information, natural language processing, natural language has huge amounts of data as well. In fact, the systems that we’ve used to train very complex behavior in this space, they don’t tend to need any external information, any context, any history. We’d like to call them JellyFish AI. They don’t have a brain, but they can be very complex and based on their autonomic behavior, based only on the input signal, they can learn very high complexity in terms of what they can do and achieve.
If we contrast this with enterprise AI systems that we have today, the types of signals that you get as input are not going to be as complex. You’re going to be very, what we call information light. And we’d like to enrich them with history and context. So for example, you’re searching on a webpage and typing in a character, but we’d like to enrich that with your history, your profile, your context and location. The same is true for problems in IOT where we have edge devices that are trying to communicate with a backbone and we’d like to do things like intrusion detection. We look at IP packets, but we also want to look at that device history, traffic flows in the network. We have this context and history that we need to help enrich our models.
And then finally, if we look at problems that where we’ve done some work in the area of money laundering. Well, you get a small input signal, just the amount of money being transferred between two customers and two banks, but we need to put in a lot of information context to a feature related to the customer’s credit history, what type of transfers they’ve made, and how many transfers they made in a period of time. So this whole space is encapsulated by something called a feature stores. What the feature store does is, it enables you to provide context and history to your [inaudible] models from the whole enterprise. If you’re building an AI enabled product, and you’re building that as a microservice and you only use the data available within that microservice, you’re going to have a very limited amount of data with which to make an intelligent application.
If however, you use all of the enterprise relationship with that user or system and all of the contextual information that’s available outside of that, you can build very rich predictive models by pulling in those features in the feature store. So we can think of a feature store as being… It’s a place where we centralize all of the data assets related to AI in the enterprise, we continually feed these updates as you can see a number zero from the enterprise we pipelines of [inaudible] features, and then our AI enabled applications can basically use models that retrieve features from the feature store to build very rich feature vectors with which they can make rich predictions.
So our feature store, Hopsworks, it’s backed by a database called RonDB. RonDB is a new database, it’s a new distribution of MySQL Cluster. It’s developed by Mikael Ronstrom, the inventor of MySQL Cluster. In a fact, in a single server, in experiments it’s been shown to be higher throughput than even Redis, but it scales also to many servers, up to 48 servers. It provides low latency, high availability, high throughput, and scalable storage. These are all important properties for an online database that’s going to provide these features at low latency, with high availability, with high throughput and scale it to large volumes of features in an enterprise. So how do we get features into the feature store? Well, we need pipelines, feature pipelines that take data from the enterprise data sources and then transform that data, aggregate that data, and push it into the feature store.
So some features may not even go to the feature store, they come directly from applications that need to be engineered in real-time. Other features will come into the system like [cafe], where you may have a streaming application or Spark streaming application to complete your features. Others may come to SQL databases, data warehouses, or data lakes. And all of these features that come in from these different sources, may be updated at different cadences. Maybe some will be updated every 15 minutes or hour. Others updated daily or even monthly. Our particular feature store, the Hopsworks feature store, provides a DataFrame API for adjusting features. So you can adjust any Spark DataFrame, so Scala, Java DataFrames are good. [inaudible] DataFrames are good. And then you can also use Python to ingest DataFrames, they’re going to be pandas DataFrames in that case.
And when we ingest them into the feature store, we have an extraction called a feature group that represents that DataFrame. So if you do many ingestions with feature, you’re really just adding new data to that feature group. And historically we can go back in time then to see those different ingestions. That feature group will also be available not just to this offline API, where we create training data… In particular today, we’re going to talk about the online API. Low-latency access to those features from the serving infrastructure, we’re going to achieve those features at low latency.
So one other point that hasn’t obviously always be known on feature stores is that, our particular feature Hopsworks, has a representation of models because features themselves are supposed to be reused across many different models, which is true. So our features in the feature groups, we can join them together to create what we call training datasets and a training data set typically is a one-to-one mapping with a model. For every model you have a different training dataset.
Often these training data sets materialize on disk, maybe on an object store, in a particular file format that’s suited to the framework that we’re going use to train the model it. So if it’s [inaudible], TFRecord, or [inaudible] or CSV for [inaudible]. One other point there is that we will also store the statistics of that trend dataset. And we’ll see that later on a model serving that’s going to be important because it will help us to identify drift. Okay, so we talked about the feature store, let’s talk a bit about an open source model serving framework. And our feature store is also open source, everything is open source here. I call it KubeFlow Model Serving or KFServing for short. So KubeFlow Model Serving when you have an online feature store, what you basically want to do is want an AI enabled application that uses this model that’s being served by KubeFlow.
So KubeFlow will make the model available over the network. You’ll be able to make secure RPC calls to the model, either HTTP or over GRPC. But we want to use the online feature store to enrich the feature vector to the model serving server. So one way of doing that, is to in your application or AI-Enabled product, to look up the features that you need. That’s mistake number one here, and get back these features and then merge them with your feature vector that you’re going to send to the model in KubeFlow. You can see the top right of the slide here we have the Python API to our online feature store, and you basically get the training dataset to reference that object and then you passing the IDs for the feature groups, the primary keys for the feature groups in that training dataset, and then it will retreat within a couple of milliseconds, you’ll get back feature vector that you can then send down to the model. Python [inaudible], send it to the model to make the prediction.
Another way of doing this is to actually utilize a functioning KFServing call the transformer. So the transformer is a stage that can be executed before the model makes the actual prediction. For AI-Enabled product, it doesn’t need to talk to the feature store it can go just to KubeFlow model serving and KubeFlow KFServing will have this transformer that we can see in the top right-hand corner. Some code for it there, and will execute this pre-process method before it makes a prediction. So at the pre-process stage, we can return the full feature vector from the feature store, and then set it out to the model, make the prediction and then stage four return our results.
So internally in KubeFlow what we can see is happening, is it’s little bit more than just a predictor and transformer. You can see that when the request arrives, you can have a canary endpoint or default endpoints to do AB testing on your models. That when the requests gets passed onto the transformer, it can also be passed onto an explainer, which can help you explain the predictions that were made. And the predictor itself is going to be the model that will make the prediction and return the results.
So typically you would plug in your online feature store at the transforming stage here, and there are many other features in KFServing such as multi-model serving and support from many different frameworks. So the kind of frameworks that are supported are TensorFlow, PyTorch, Scikit-learn through blast for example, XGBoost, ONNX and then TenorRT as well. KubeFlow model serving or KFServing is part of the KubeFlow framework. It’s a machine learning framework for Kubernetes, and it’s built on Knative, which is serverless functions in Kubernetes and then Istio which gives you secure networking in Kubernetes. And all of this can be deployed on GPU’s or CPU’s. You can deploy your models on GPU’s as their deep learning models or CPU’s, if you don’t need [inaudible].
How do we do a model monitoring with KubeFlow model serving and Hopsworks. One of the things is that model serving doesn’t exist in the vacuum. What you have is, you have the feature data that we use to train our models, and we also have the training data that are creative in those features, but then when we train the models we’re going to get a lot of training artifacts. And as well as the training artifacts, we’re going to get logs for those experiments. So MLFlow for example, you can take all the artifacts and metadata generated there. And then when we actually produce models, we just store them in a model registry.
And then finally, when we’ve got models being served, they’re going to create a lot of data as well. They’re going to inference request data, statistics on that inference request data and often new training data. So what can happen is you can take the prediction requests that are made on the model online, and then the outcomes of those predictions and join them together and what you’ll get as new training data. So this is all great because this helps us feed the AI data flywheel. And the other thing we need to consider a lot nowadays is also AI regulation. So in European union, there’s now proposals to regulate AI, and you have to really govern all of this data through the entire lifecycle, which makes it very challenging.
So what we’re going to talk about now specifically is how we have it support for Spark and Kafka to take the logs from KubeFlow model serving, the inference data, and how are we to support Spark Streaming and Spark to process the logs, to help close this loop, to help generate this AI data flywheel. And Javier is going to go in some details about the work we’ve done.

Javier de la Ru…: So I wouldn’t take it from here. As Jim mentioned in KFServing you can send the logs, the inference data to Kafka or Elasticsearch, but these are just events with different pieces of information. For example, HTTP code or you can have some [inaudible] ideas or the features itself. And also from the request and their response, these events they might change. You can see in the diagram below, you can see an example, using the credit card number as a request, but then we need to transform this to provide more context of the model and then we will return the final prediction. In order to analyze these kind of events we need to know the schema that the feature is in these events follow. In Hopsworks, we have [inaudible] between training sets and models is one-to-one.
So we can get this schema of the training sets that were used for generating the models. You can get the types of prediction and also that they [inaudible] input at the features and that are used as inputs. So we can log, we can transform these events before sending them Kafka, in order to follow these schema. So we can enable automated ingestion to the feature store and also facilitate live monitoring of inference data with Spark.
So this is the overview of online model monitoring with the Spark Streaming. On the left hand side, we have the online feature store where we ingesting continuously new data. So the online feature store will contain the most recent data. And then on their right hand side we have an AI-Enabled application which wants to make some request to the model, but the model doesn’t know the context of this request. So it needs to be achieved from the online feature store the most recent data, which is basically the context of their request [inaudible] prediction. And then we will log all this information to Kafka for further processing and analysis. And as well as we can see in the streaming [inaudible], we can achieve the baseline statistics computing in the online feature store to compare this inference data [inaudible].
When it comes to online monitoring, we have to deal with unbounded data streams and this type of data, when we analyze this type of data, at some point we need to keep some mistakes. And also we need to compute these metrics and analysis globally. So let’s assume that, with [inaudible] model and then we might have some replicas. So its replicas just a third of the inference data. So you cannot have an overview of the whole inference data in just of these replicas. So we need basically to centralize somehow this information and to compute these statistics globally. And one of the most common ways to compute this analysis is to compare, as I mentioned before, this computed descriptive statistics of this inference data and compare it with the baseline statistics that represents the training dataset that was used for during the model.
So here we can see an example of an implementation of our monitoring job in Spark Streaming. We see here that we can aggregate this data using Windows or [inaudible]. We applied the schema that we kind of get from the feature store and then we decide to compute some outliers based on the descriptive statistics, and also to compute data drift using distance metric metrics, like for example, [inaudible].
So by sending inference data into Kafka, we can run Spark or Spark Streaming jobs in a schedule base or streaming base, streaming [inaudible] to generate these reports, but this is not the end of the road. We need to store and provide some additional capabilities around these reports. So let’s say that as a machine learning engineer, you want to not only to have access to these reports or receive alerts, but you also want to interact with the system. So you can explore these metrics, they are different intervals or you can define more complex queries, like how is naive model performing compared with last week or how is this model performing in this specific context, like a different country or any other more specific data.
And on the other hand as a data scientist, you not only want to understand what’s the reason behind the behavior of your model, because you can get some feedback for preparing better training data, like maybe tuning some hyperparameters, or maybe considering a model architecture.
So far we have mentioned how to use spark streaming to a computer statistics on an analyzed inference data, but there is another approach, which is the following. We can leverage the data validation capabilities that are built in the feature store in order to compute [inaudible] statistics of these data, and then compare it with the baseline. So by doing this we can just write once expectations or rules that we want this data to follow, and they will be reused for both our feature pipelines and also for when ingesting inference data, which in the end can be considered as another feature pipeline. So here we have an example, [inaudible] it is a library for interacting with the feature store and we see here that we can create an expectation, which is composed of different roles.
For example, it has to have a minimum value of zero or maximum and so on and different levels of alerting. And then we attach these expectations to a feature group. So the feature store can evaluate these expectations at the ingestion time and similarly, we can do the same for inference data. We can create expectations with rules define using the values of the baseline statistics and then we can create, let’s say a feature group called logging that has to validate these expectations. So when we adjust this inference data coming in, we can also detect if this feature values have drifted or there’s some outliers among them.
So let’s now proceed with a demo. So for the demo we are going to see fraud detection use case using the online feature store. So let’s assume that we have our AI-Enabled platform that is tracking activity of a different credit cards of the users and sending these events to Kafka, then we have some ingestion pipelines that [inaudible] Kafka and just is values in the online feature store. So we have the credit card activity in the online feature store, the most recent one. And then let’s assume under some suspicious event the platform wants to check if that credit card number is fraud or not. So it will make a request to the model. The model will get the recent activity from the online feature store and the [inaudible] and so on.
So let’s have a look here. First of all, we are going to create some synthetic data for the transactions and credit cards. So first before that we will create four feature groups that will ingest the aggregated values in 10 minutes, one hour… I’m sorry, hours of these credit card activity. So we define first the schema. Here we can see, and then we create a four empty DataFrames and we will use [inaudible] to define the metadata for these feature groups. In this metadata we can specify if we want to serve these values and then online [inaudible] and also if we want to complete the statistics, which are the primary keys and then we can just save it and create the feature group. So after that, we can see here in the UI, that the new feature groups are added and we can take the schema that they have.
And also we can check if the some [inaudible] information about when they were used for which training data sets they were used. And after that, we will some generate something to the data using [inaudible] here. I don’t want to go too much into the data for this, so we can just create a different grade numbers, transfer amounts and so on. So the transaction we have this format here with the label if it’s fraud or not. Then we will modify a little bit these transactions, and we will send them into the Kafka topic to simulate the ingestion pipelines.
So here next [inaudible] notebook, we have our ingestion pipeline that will read from this Kafka topic and it will aggregate these values in 10 minutes, one hour, 12 hours. And then we will use again, HTTPS, as we can see here to ingest the streaming data that is coming in and we are aggregating the corresponding feature groups. So after that, we can also check which values we have right now in the online feature store. [inaudible] queries.
And then we can also insert this information in the offline feature store, which [inaudible] in the background. So after that, we can get the feature values from these feature groups. We can define a specific query that joins the features from different feature groups, and use this query to create a new training data set. And in this training data set, we can specify how we want to split the data. Again, if we want to compute the statistics and so on. After that, we can see this information here in the UI as well, together with the statistics of these values. And then we can just create a simple model that will predict if it’s fraud or not.
So after creating this model, we can go for example to the UI and just click here to the model that we just emulated and export it. And we can define a transforming script that this here together with the Kafka topic, where we want to send the inference data. I will not create it right now, because it’s a really created here and running. We can see that both transformer and predictor are running here and sending logs to this topic. So how the transformer looks like, is like this. We have to implement a transformer glass who can connect to the feature store in the misallocation method. And then in the pre-process method, we can just get the keys that we are receiving in order to get the serving vectors and build the whole input that will [inaudible] to the model for making predictions.
So we can test here already in the [inaudible] notebook, since it will run in a similar environment, we can test it here and we can make sure that it’s working as we expect. And we can just tune this file in the surrogate UI, just deploy it together with the predictor. So after that, we can make some inference request. Here we get a sample of the credit card numbers and use them to make prediction. We can see that we are aggregating if it’s fraud or not, mostly are not fraud. Fraudulent, sorry. And then at the same time, I have here a monitoring job with a similar implementation that is as you saw in the previous slide. In this monitoring job we are computing statistics and storing them in [inaudible] files. And also we are detecting outliers and data tree and sending this analysis to Kafka topics.
After this, [inaudible] job has finished, we stopped it or just continuously analyzing the inference data, we can go to this other notebook and we read this [inaudible] files and check the statistics that are being competed in a Windows base processor. And then also we can check the distributions of this inference data, and the correlations and covariance between the different features in the inputs. After that we can also read from Kafka, from the two Kafka topics that I mentioned before, in order to check which outliers has been detected here and also when this data has drifted and how have we detected it. We can see here that there are the different distance metrics that we specify before, with the corresponding values and which is the detection time for each feature.
So this is all for the demo and for the talk. If you have any questions just don’t doubt to contact us here at logical clocks or [inaudible] hopworks.ai, our organization in GitHub. So if you have any other questions, you can also ask these questions in question and answer [inaudible].

Jim Dowling

Jim Dowling is CEO of Logical Clocks and an Associate Professor at KTH Royal Institute of Technology. He is lead architect of the open-source Hopsworks platform, a horizontally scalable data platform ...
Read more

Javier de la Rúa Martínez

Javier de la Rúa Martínez is a researcher at Logical Clocks,. His research centres on the intersection between large-scale distributed systems and machine learning, being his main research intere...
Read more