Portable UDFs: Write Once, Run Anywhere

May 27, 2021 11:35 AM (PT)

Download Slides

While most query engines come with a rich set of functions, it does not cover all the needs of users. In such cases, user defined functions (UDFs) allow users to express their business logic and use it in their queries. It is common for users to use more than one compute engine for solving their data problems. At Facebook, we provide multiple systems for users to solve their data problems : adhoc, batch, streaming / real-time. Users end up picking a system based off of their needs and problems at hand. Every system typically has its own way of allowing users to create UDFs. If a UDF was defined in one system, sooner or later there would be a need to have similar UDF in the other ones as well. This leads to users having to re-write the same UDFs multiple times to target for each system they want to use it in.

In this talk, we’ll take a deep dive in the Portable UDF. Portable UDF is our way of allowing users to write a function once in an engine agnostic way and use it across several compute engines. We’ll present the motivation, design and current state of Portable UDF project.

In this session watch:
Tejas Patil, Software Engineer, Facebook



Tejas Patil: Hi, everyone. This talk is about portable UDS, a feature that we built at Facebook. Before we get into the technical side of things, I want to spend a few seconds about introductions. So Rongrong is a Presto committer, and she’s a tech lead in the Presto team at Facebook. I’m Tejas, I’m a Spark committer and I’m a tech lead in the Spark team at Facebook.
This is the high level [inaudible] so let’s dive into the first section about the motivation. So why do we need to portable UDFs. To set the context, at Facebook, we provide multiple engines for people to solve their data problems. We have Presto, we have Spark, we have digraph, which is a processing engine, and we have some stream processing engine as well. And we’ll let our users pick any one of these systems based off what problem they want to solve.
Let’s say if a user wants to create a report for employees and in this report, they want to print the first name of the employee in uppercase. If some user wants to write this query, they will use a function, called a stopper, which is already pre-ordered in all these systems. They will refer to this upper function in the query and then they will get their outputs. Now imagine a case where the same user wants to create a report on the employees and they want to associate certain score to every employee. And the way they associate a score to the employees is based off some custom logic, which is specific to the business. And so in case of upper function, it was neatly provided in the systems because it’s a very basic function. But if you are to associate scores based off your certain business logic, the database would not have those functions inbuilt for you. So what happens in this case?
In this case, user will try to alter this function by themselves. So here on this part, I have an example where a user starts off writing a query in Presto, and they search for that function in Presto and does something that is not able to function in the engine. And they ended up creating a new UDF for Presto, which implements their business logic. And so what does it mean when a user has to implement their UDF in Presto?
On this slide, I have an example of the array contains UDF in Presto. So this is just to give you guys a feel of what does it mean to implement this? First off the user would have to learn about Java because Presto functions, are written in Java. And they also need to learn about the interface of altering functions in Presto. And they’ll also learn about some intelligence about Presto, for example, what are data types in Presto. And how to represent rules in Presto. So after learning all these things, they end up creating a Presto UDF for their business use case. Now let’s say, after doing all this, the user wants to run the same UDF in Spark, and then so when they try it, it fails. Why does it fail? The reason is because Presto UDFs are not same as the Spark UDFs.
So here is the same array contains functions. This time on the left-hand side, you can see array contains for Presto. On the right-hand side, you can see the same UDF for Spark, and you can see that both of these are different. The first thing that I can notice is that the Presto function is in Java and the Spark function is in Scala, so completely different language.
And the way we express data types in both these engines is different. And even the rules, how we express the rules is also different. So users will have to learn about this. And in case of power users, who care about performance, they might go one step ahead and try to learn about [inaudible] in Spark so that’s one of the ways where people can get the maximum performance from a Spark function.
After doing all these things, a user ends up creating two versions of this UDF, one for Presto and other one for Spark.
So what are the pinpoints for users? Number one, a user would have to learn about engine specifics. As we looked earlier, they will have to learn about the interfaces of defining UDFs in both these engines. And if they want to refer the same UDFs, in say a graph processing engine, they have to learn about its internals as well.
The other pinpoint is that for here, is that people will have to relearn their logic two times. So going back to the previous slide, as we said, if you look at all these functions, when it’s for Presto, when it’s for Spark, they both are different and you cannot just copy paste the code from one into the other one. So it’s basically a manual rewrite that one has to do.
The third problem that we see is inconsistencies. Since the rewrites are done manually, it might happen that a user might have different semantics targeted for Presto and for Spark, it might just happen unintentionally. And we see often, this in case of a corner scenarios, per se, semantics funnels is different. The same UDF might throw an error in Presto, but it might [inaudible] in Spark. And it’s not really good for users because the users expect, if they call the same function, no matter which engine they call it from, they want to see the exact same progress for this function.
One more problem that we’ve noticed is versions. Let’s say a user starts off and they create a version one of UDF in Presto, and in Spark. And then as time evolves, they want to change their business logic. And that means that they want to create a new version of this UDF. So whenever they do that, they have to be very sure that they make this change across both Presto and Spark versions. So if they create a V2 for Presto, and if they don’t create a V2 for Spark, that means that both the engines would return your different outputs for the same UTF, which is bad. So a user has to make sure that both the copies of this function are always in sync.
And the last problem that we noticed is around releasing. The way we ship these UDFs to our clusters, is that we package all the different functions from different users into a single binary and ship it to clusters. And in case of, let’s say, if a user wants to get their function out quickly, it might happen that a Presto release might go faster or a Spark release might go faster. So your UDF might be in production at different points in time and so you’ll have to wait for the release to go out. And at times there might be say, test is failures, which might delay your releases. And that means that you’ll have to wait longer for your function to get in production. So these are the problems that we found when users were altering functions. And in next section, I’ll have Rongrong talk about how we tried to solve-

Rongrong Zhong: Thanks, Tejas for the introduction and now let’s get into it, we have a problem so how do we solve it? Or how do we develop portable UDFs to solve the problems? Let’s take a look at what UDFs look like. Basically what we want to do is to allow user to write a single version of the function in some way that it’s independent from the engine and then in all of the compute engines, we provide a service in Facebook, we would implement, or we will make these engines be able to run these functions. This is a Hello World example of how the portable UDF look like. Our first version of the portable UDF is supporting in Java UDF. This is basically, as you can say, pretty vanilla Java implementation. We have a class, which the class name will actually map to the function name.
And then we’ll have a convention of the package would be catalog schema. So basically here where you can see the package has fb.example so at the end of the day, this function will be fb.example.hello_world. And to specify implementation of a function, we’re using annotation. For example, on line 16 and line 21, you can see the @ScalarFunction annotation. This marks that this function would be an overload of this.
There are two versions of this function you can learn in this class. One is a traditional hello world that doesn’t take any input and returns hello world. The other one, obviously we developed that testing purposes, takes a bunch of different types, and this is also a demonstration that these are all the types that we support. Basically how we’re treating the types is we allow users to develop functions using native Java types. And we have a mapping between the Java type and the SQL types. Here you can say, we have booleans and we have the numerical types, and then we have string or varchar. And then we have the collection type of array and maps here.
In addition to these, we also added some annotation for data management. For example, there is ownership and description so it will be easier to find who is maintaining the function and give users a better understanding of what does the function do. And there’s also routine characteristics, which is used by the engine to decide on call conventions, determinism, et cetera.
So in summary, we have three different types of function metadata that we’re collecting. One side is the function management metadata, which includes things that, interests don’t really need, but then as a warehouse, we need them for better management and understanding which our ownership description et cetera. And then there is the function resolution metadata, which is primarily used by the planner or analyzer side of the [inaudible]. These would include the function, signatures code combinations, et cetera. And then there’s an execution side of the metadata. These will be, what is the packaging for? Where do we find the func… Where do I find this? And what are the versions? Whether the implementation is boxed or unboxed, et cetera. These are things that we try to directly extract from the Java implementation so user don’t need to specify.
Here’s an example of all the data that we automatically extract once the user, write the portable Java function. Once they write the function and the code is reviewed and they are happy with the results and they commit this to the source code or the version control systems. Once it’s committed, we’ll run a background drop to upload, extract all these information, build the package and then upload those to a function metastore. And the function metastore is what all the crew engines would contact to when we’re trying to execute the function.
So now let’s see that whether if this works for all the problems that we’re trying to solve. You’ve seen the slides before when Tejas mentioned this, right? So the number one reason we want to develop portable UDFs is we don’t want to require users to learn all the engine specifics. This is solved because we’re providing an engine agnostic API and the API is pretty basic that doesn’t require users to understand the type systems or anything in the engine. They basically still use the native Java types and the next set of the problems, rewriting logics or inconsistencies in rewrite or maintaining multiple versions. These are all originated from having multiple versions of the code because we allow user to write a single version of the coding in a single API, these problems are solved as well because now user only need to worry about is a single implementation.
And then another problem is the release cycle problem. So how we solve this problem is basically when user commit on these functions, we will build packages and the code base is maintained independent from the engine code base. And the release cycle is also maintained independent of the query engines. And the query engine would fetch these at run time and load these libraries into their runtime at runtime. In this way, we also solve the problem that the user no longer have to use our no longer have to worry about different release cycles from different engines because the functions are independently released. And also the arm for the most time made available in the query engines only a few minutes after they commit their code. Now it seems like we have a design that can solve the problems that we mentioned, let’s move on to talk about how these are actually implemented. I’ll hand it back to Tejas to talk about how it’s implemented in Spark.

Tejas Patil: Thanks, Rongrong. Let’s look at how portable UDFs are executed in Spark. I have an example query over here, it’s a select fb.example.hello_world and this portable UDF, I’m passing a text as an argument. When a user writes this query and submits it to Spark, let’s see what happens.
When the Spark driver gets this query, it will parse this query and understand that this query is a friend to a portable UDF. And whenever it sees that, the next thing that Spark driver would want to do is fetch the metadata about this function. To do that, it talks to metastore and it gets some metadata about this function.
This is some example of what metadata will get, and I know that Rongrong had already covered this in her slides, so the first set of metadata that Spark gets is round up some basic information about the function. So here we have the function name, the description, and who’s the owner. And the other set of metadata that we care about is the function resolution metadata. This is needed for Spark for doing the query planning. Over here, we have the argument types, it’s the function that it will or not, that sort of information. And the last piece of information that Spark needs is a function execution information. For example, if it’s a Java function, we want to know where is the chart which has the code for the UDF? What is the class name? Which method should Spark execute? So after Spark driver gets this metadata, the next thing that it would do is fetch logic for the UDF. And so it talks to the Maven server and fetches the code for this portable function.
And after it gets it, it will do some basic checks on it. Like a simple check would be that, A, does the chart even has the class that we want or not. And so after it does all these checks, it will ship this chart to all the Spark executors and begin query execution. The way Spark is currently executing portable UDFs is similar to how it executes hive UDFs. That is, the Spark executor that add the portable UDF chart with class plot and at runtime whenever Spark tries to execute, say the function, it will just load this class, in this class plot and run the required method.
Since we have three different ways to implement functions in Spark, after this, we’ll try to do some performance benchmarking, to see which one is better.
In this draft, I have plotted at the performance profiling results for across these three approaches. In the orange, you can see Spark native version. And so, as we expected, if you implement your function in Spark natively, it will run super fast because over here, we can do whole stage coaching and that is extremely fast.
Now the second approach to build functions is through Hive UDFs. As you all know, Spark supports us to invoke Hive UDFs and that is, as we expected, is slower than the Spark native way.
And the third data point is on portable UDFs so that is the blue bars in this graph. Over here, we see that portable UDFs are a little bit faster than Hive UDFs, in certain cases. And the reason is that the data comes at a cost. The way Spark has to run a Hive UDF, Spark has to convert the Spark internal tool into something that Hive UDFs can understand so that the writable objects. And the cost of doing that is higher compared to when Spark runs portable UDFs. So there, Spark has to convert the internal tool into primitive Java objects, which is faster. Let’s move to the next section and understand how Presto executes portable UDF functions so I’ll hand it over to Rongrong.

Rongrong Zhong: Thanks Tejas for introducing how we implement this in Spark. Now let’s move on to see how we do it in Presto. So first, we supported this in Presto as an external function, external functions is a SQL concept and basically we can say, create function and external with the reference to where to find this external function. For this example, we’re still using hello world. We have external name, that’s basically the URL we’ll need to use to look up this function in our functional metastore. And how we execute that in Presto, we are actually running them as remote functions in a separate cluster, thus where people who are familiar or not so familiar with Presto because Presto servers don’t provide query isolation so running arbitrary customer code in Presto cloud clusters can potentially cause reliability issues because we cannot fully trust the quality of this code. So we decided to run them outside of the JVM for the worker, to provide some isolation just in case these functions fails so it doesn’t fail all the other queries.
Let’s look at how do we make that happen? So basically what we do is when we receive a query at Presto coordinator, we need to do some planner change. Basically what planner is trying to do is taking a project to where these expressions will be, or these functions will be, and we’ll analyze the expression to figure out which part can be run locally and which are the functions that we need to remote invoke remotely, we’ll do a planner change according to this. The reason we want to do this is so we can still provide a batch operation so when we’re sending the query plan to the Presto workers, and then the Presto worker, in case of learning remote functions need to talk to UDF server.
What we’re doing is we’re sending a batch of rows to the app server together rather than invoking the UDF server on every row. Since these are remote calls, invoking the functional at remote could be quite expensive so that’s what we’re trying to avoid.
Let’s take a look of this example, it’s a bit tiny, so it’s not easy to read, but highlighting what is going on here we actually have three remote functions. Number one is a separate projection of a remote function. Number two… And number three is actually depending on number two, so we need to compute two before we can compute three. So the planner, what the planner does is basically instead of single project, we’re actually breaking this into four projections with two local stages and two remote stages, highlighted here. And for these remote stages, you can say that we can run the remote function one and remote function two together in one stage. But then we need to run a local and then remote function number three actually needs to depend on the results of all these things. So these are broke into different project operators so on each operator, we can still process a set of rows in a batching API.
So at runtime, this is what it looks like on Presto worker. Presto worker would see that this plan would need to run local project, which business as usual. And when he sees remote project, it will try to send a request to UDF server. So this API is basically, logically, just to invoke function with a function handle and the input data. And UDF server, when we see those function handle, which is more allows, you can understand it as you were all were mentioning previously, we’ll give this URL user to retrieve the whole function, metadata from the metastore. And then based on that information, UDF server can figure out, Okay, these are the packages I need to load. And these are the method I need to invoke. And all of those can be cogent and we’ll process this on all the inputs and then send all the results back. And yeah, so that’s how it works in Presto. That’s all for the details of how we implement it in Presto. And now let’s open up for questions.

Tejas Patil

Tejas Patil is a Spark Committer and Tech Lead in the Spark team at Facebook. For past 7 years, he has worked on several projects related to building large scale distributed data processing systems re...
Read more