The RAPIDS Accelerator for Apache Spark is a plugin that enables the power of GPUs to be leveraged in Spark DataFrame and SQL queries, improving the performance of ETL pipelines. User-defined functions (UDFs) in the query appear as opaque transforms and can prevent the RAPIDS Accelerator from processing some query operations on the GPU.
This presentation discusses how users can leverage the RAPIDS Accelerator UDF Compiler to automatically translate some simple UDFs to equivalent Catalyst operations that are processed on the GPU. The presentation also covers how users can provide a GPU version of Scala, Java, or Hive UDFs for maximum control and performance. Sample UDFs for each case will be shown along with how the query plans are impacted when the UDFs are processed on the GPU.
Jason Lowe: Hi, I’m Jason Lowe. I’m one of the software developers NVIDIA working on the RAPIDS accelerator for Apache Spark. And I’m here to talk to you today about speeding up User-Defined Functions with GPU’s using the RAPIDS accelerator. Today’s talk will cover the RAPIDS accelerator for Apache Spark, what it is and how it works. I will discuss why User-Defined Functions can be a problem when using the RAPIDS Accelerator from a performance standpoint. And then we’ll discuss solutions to that as such as the Scala UDF Compiler plugin, as part of the RAPIDS Accelerator and UDFs that provide RAPIDS code. And then we will conclude the talk with discussions of future work in this area.
So first let’s talk about what the RAPIDS Accelerator for Apache Spark is and how it works. The RAPIDS accelerator for Apache Spark is a plugin for Spark that can accelerate SQL and data frame operations with GPUs without any code changes at all required to the SQL or data frame operations or the code. And on the right hand side is an example query that is running on the CPU and didn’t have any changes to it when using the RAPIDS Accelerator. Again, that’s the emphasis I want to drive home is the RAPIDS Accelerator does not require you to use different operations for your SQL to use GPUs, doesn’t require to use different data types to use GPUs. It’s the same query that you had before, you drop some jars in the class path, set some configuration settings and it can accelerate your query using GPU hardware on the notes. And this can accelerate anything in Spark that’s using SQL or data frame operations, such as the Scala API, Java API, PySpark, Spark SQL of course, and so on as you see on the left.
So how does the RAPID Accelerator perform? And to measure that we used the NDS benchmark. This is NVIDIA decision support benchmark. It’s an internal benchmark used to benchmark RAPIDS Accelerator. It is based on TPC-DS. It’s using the same SQL queries as TPC-DS, but it has different setup scripts and data format, which means you cannot directly compare the NDS results with TPC-DS.
We used approximately three terabytes of raw data, which is one terabyte of compressed parquet, and that is partitioned using doubles for decimals and it’s stored in HDFS. And to benchmark this data set, we used a cluster of 8-nodes EGX hardware. You can see here, it’s got the 2 AMD EPYCs, 2 Ampere A100 and half a terabyte of RAM on the host. We have NVMEs for storage. That’s where the HDFS is also hosted on these NVMEs and Mellanox networking and of course, as I mentioned, HDFS Hadoop 3.1 is the distributed file system and the Spark 3.0.2 in stand-alone node was the Spark cluster.
And these are the results from NDS across the 100 plus queries in that benchmark. We’d see the CPU was over 4,000 seconds to run all those queries, turning on the RAPIDS Accelerator, running the same queries without modifying those queries, just setting configs and jars was over three times faster and considering the additional cost of those A100, but then also take into account how much faster the workload run. The cost savings is almost half. So how does the RAPIDS Accelerator for Apache Spark work? How is it able to accelerate SQL and data frame operations without modifying the code?
First off, what we did is we leveraged RAPIDS. RAPIDS as a suite of software libraries that is designed to perform data science at scale using GPUs. And one of the flagship libraries from RAPIDS is cuDF, which is a C++ library called loop cuDF using Apache arrow data layout on the GPU for the data format, and there is a Python binding to loop cuDF and that’s called cuDF. It provides a Pandas interface and combined with Dask provides a very powerful, distributed processing framework for those familiar with the Pandas framework. On the right hand side, what we did is we took that same C++ library that cuDF is using, and we provided Java bindings, JNI bindings that allow Java JVMs to call into loop cuDF and leverage the power of those algorithms on GPUs. And then we tied that into Spark through the data frame and SQL APIs. More specifically, inside the core of Apache Spark, there is a SQL optimizer called catalyst, and that takes data frame operations, sequel operations and it provides a plugin interface for optimizers.
And that’s how the RAPIDS accelerator hooks in to Apache Spark as a plugin. It plugs into catalyst and at a high level, you can see on the left, there’s the pseudocode where we look at the plan and for every operation and the plan, we check to see if an operation and a particular data type combination can be supported on the GPU. And if so, we call out to the RAPIDS library to implement that behavior. If we cannot support it on the GPU, then we will fall back to the CPU implementation. And this is key for the compatibility where you don’t have to change your code. So it’s not required that your query can be completely translated to the GPU, parts of it can be parts of it may not be. And it doesn’t mean that if not all of it is translated, the performance won’t be accelerated.
It all depends on where the processing bottlenecks are in your query and whether those can be translated to the GPU or not. And again, we leverage those JNI bindings, [inaudible] RAPIDS and RAPIDS of course based on CUDA. On the right hand side, the RAPIDS Accelerator provides an optional shuffled plugin. It’s not required, but it’s there if you want to leverage it. That shuffled plugin, new shuffle manager can leverage RDMA capable networking, such as rockier InfiniBand, and also GPU to GPU interconnect, such as NVLink to accelerate data transfers when the data’s being processed on the GPU. And that also uses JNI bindings that ties into a library called UCX. And I don’t have time in this talk to talk about the Spark shuffle plugin for the RAPIDS Accelerator, but I’ll give you pointers to other talks about it at the end of this presentation.
Okay. So a little more detail on how it works. Here’s an example query, catalyst is going to take that query, the series of either SQL in the left or a data frame operations on the right and form a logical plan from those operations. And then it’s going to take that logical plan, optimize it, and produce a physical plan. And that physical plan is executed on the CPU row by row using RDDs of internal row. RAPIDS Accelerator hooks in at the physical plan layer after catalyst has produced that physical plan, it looks at the physical plan node by node and we’ll swap out parts of the plan with GPU versions of those nodes, which execute using RDD of columnar batch. Columnar batch processing is more efficient for parallel processing, CMD, CMT type architectures. And it’s more cache coherent. It’s easier to accelerate at the Apache arrow layout as columnar for example.
And so that’s why we’re processing in columnar batch on the GPU and the CPU is of course doing rows as it does today. Let’s get into a very more specific example. Taking a simple aggregation query. This is a query that loads from parquet does an aggregation and then writes back out to parquet. And on the left, we see the CPU physical plan, this showing what you would see from the SQL web UI and Apache Spark, a web interface for this query. I don’t expect you to read the individual nodes, but just give you a feeling of this is what this query looks like on the web UI. And on the right is that same query with the RAPIDS Accelerator translating it. And we’ll go through the query step by step. And at the top, we have reading a parquet, of course, there’s a GPU version of that.
cuDF supports reading parquet files, specifically it can take parquet data and decompress it and decode it all using the GPU and the data remains in GPU memory. So we take the data from the distributed file system, send it down to the GPU without trying to decode it at all. And the GPU does all of the decode and does it quite quickly. On the left side, we see that after we read the parquet file on the CPU, the CPU is wanting to execute row by row. Parquet is a columnar format. And so there needs to be a conversion from that columnar format to row format. So the rest of the query can proceed. On the right-hand side that has been optimized out because parquet is a columnar format. The GPU wants to process in a columnar format so it’s already ready for processing. Then after that is a first stage aggregation, again, there’s a GPU version of that.
So the Hash Aggregate on the CPU becomes a GPU Hash Aggregate on the GPU version. After that first stage aggregate is a shuffle in exchange between stages and the job. The CPU is shuffling rows. And it’s important note here on the right hand side, the GPU is also shuffling but shuffling columns. So we do not have to translate from columns back to rows to the shuffle. It remains columnar. And then on the right hand side, there’s an extra step the GPU does to combine shuffle data. Shuffle partitions tend to be small. Sometimes it’s over partitioned and using the GPU has a non-linear behavior when it scales up. If you give a GPU twice as much data as you did before, often the time it spends is less than twice the amount. So we want to give it as much data as we can.
So we’ll coalesce tiny partitions together into larger partitions to advertise the cost of evoking the GPU. And then there’s a second stage aggregate. Of course, again gets translated to a GPU version of the Hash Aggregate. And then finally, on the CPU side, there’s a right of the parquet file where the CPU will encode the parquet data and compress it and then write to distributed file system. And there’s a GPU version of that as well. The GPU will encode the data in parquet, compress it with snappy, and then will fetch that snappy compressed parquet encoded buffer out of the GPU memory and send it to the distributed file system. And then finally on the right hand side, every query in Spark is expected to produce a row at the end, row data at the end, of course, all the data is actually not sent at this part. It’s written to distributed file system via the right parquet command, but Spark injects this commercial columnar row at the end, but no data is actually sent through that. So it has no impact on performance.
Okay. So now that we understand what the RAPIDS Accelerator is, how it works, why would User-Defined Functions potentially be performance problem at this kind of setup? And the answer comes down to the nature of UDFs are opaque functions by design, right their arbitrary logic behind a generic interface. And that becomes a problem for the RAPIDS Accelerator because it’s designed to translate query operations into equivalent GPU operations. And if it cannot detect what an operation is, then it kind of has no choice but to fall back to the CPU because we want to make sure that the query runs and UDFs are hiding that logic in a way that it’s difficult to discern what it’s doing. And that could be particularly frustrating in cases where the UDF… We do have the functionality and RAPIDS to do this operation, but we can’t detect that, that’s the operation being performed because it’s behind this opaque interface.
So what are some solutions? So the other problem we have is besides the UDF not being translated to the GPU and the opportunity cost of being able to speed that function up, there’s also the performance hiccup of columnar and row conversions. Besides the fact that we have to take the data from GPU memory that was being processed on GPU and feed it back into CPU. We also have to translate it to format, the GPS operating columnar, the CPUs operating row. So we have to do a columnar row transition when we come from the GPU and CPU, and then coming back from the CPU to the GPU, we have to do a row to columnar transition. And these transitions we’re optimizing them all the time. We’re accelerating them for different data types, but it will never be zero costs. So there’ll always be some overhead from transitioning between the GPU and CPU.
So what are some solutions to this UDF opaque function problem? And one of them is the Scala UDF Compiler. This is an automatic solution that can automatically translate some simple Scala UDFs into equivalent catalyst. And if you think about what the RAPIDS Accelerator is, it’s a catalyst plan to GPU translator. So if we can take a User-Defined Function and effectively exploded out into equivalent catalyst expression tree, and then feed that back into the original catalyst tree and replace the UDF with it, then that allows us to translate the UDF automatically. It uses bytecode analysis of the UDF to figure out what it’s doing and attempts to translate it into catalyst logic. And right now it supports common math operations, typecasts, conditionals, of course, common string operations and date and time parsing.
Now let’s look at a specific concrete example of this. So this is a translation of a Scala UDF. On the left is this UDF called myudf. It takes two parameters, X and Y along in a string. And it takes that long, multiplies it by two. And then concatenates the string cast to that to whatever Y was and a literal string in between space conical space in this case. And on the left, you see, we register it with Spark and then use it in a query selecting myudf of column C and column S as some new UDF column from some table. On the right hand side, we see what happens with the [inaudible] Scala UDF compiler turned on and the catalyst tree that it generates from analyzing the bytecode of that UDF. It realizes that, oh, okay, in this specific case, we’re taking column C and we’re multiplying it by two.
So catalyst has a multiply expression. So it goes and builds that node to a subtree up. Then it realizes the result of that multiply as a cast to a string so it adds that. And then it takes the result of that string cast and concatenates it with column S and the literal string. And the result of that concatenation is the result of the UDF. And it plugs that back into the catalyst plan and the RAPIDS Accelerator is able to translate that. And so we were able to automatically translate an opaque cuDF into catalyst and translate it without any user intervention.
And we can see this on the web UI, the SQL web UI and Spark. On the left-hand side is before this cuDF compelled was turned on. We see the snippet of the query, where we run the GPU from a parquet load or wherever we wish also [inaudible] performing. And this UDF was invoked so without the Scala UDF Compiler, we don’t know what it does. So we have to go back to the CPU to process it. We do the columnar row transition. We do the UDF invocation on the CPU to project. And then we go back, wrote a columnar to continue processing on the GPU after the UDF invocation. On the right-hand side is what we see for this same snippet of the query when the UDF combined was turned on for this particular case.
And we can see that the transitions have been optimized out. We were on the GPU before, we stay on the GPU for the project, and we keep on the GPU afterward, their transitions were removed. And you can also see, this is a small example, a trivial set, but you can see that in the timing spent on the project was just the time spent just doing the row to columnar transition, not even including what was spent doing the CPU version of the UDF.
Okay. Now that being said, there are obviously limitations to this approach, right? The UDF has to be translatable to catalyst and many UDFs are not. So you can’t have any looping constructs, higher-order functions aren’t supported right now. And there are corner case semantic differences, such as the way division by zeros handled and some of the corner case scenarios. And so by default, it’s not turned on, but you can turn it on and see if it can translate some of your Scala Lambda UDFS automatically. Okay. So, that’s one solution to this problem. What is another solution? Another one could be a UDF with a RAPIDS implementation and what do I mean by that? I mean, a case where a UDF can provide implementation for the CPU and for the GPU. The CPU version would obviously execute row by row as it does today. The GP version though, could execute in a columnar form using RAPIDS cuDF columnar batches.
And this allows you to do GPU specific algorithms and optimizations that you otherwise normally would not be able to do with just a CPU implementation. You can not only avoid the column to row transitions, but you could also possibly significantly speed up your UDF using the power of GPUs.
This technology is currently supported for the following UDF types. We have Sparked Scala UDFs to supported Java UDFs, Hive Simple and Hive Generic, essentially any the UDFs where you can translate a row of inputs to a row of an output for the CPU version and the GPU version, a column of inputs, you have some number of columns of inputs become a column of output. Row counts are the same. And the way this is done is relatively straightforward. There’s a RAPIDS UDF interface that you can implement with your UDF class. It is a single method.
It called evaluate columnar and it takes some number of column vector arguments, the same number of arguments as the CPU version. So again, using that example before, if you had a CPU, a UDF that was using, let’s say a string and an integer where the two arguments and it produced a string, then you would… In this case, your evaluate columnar would receive two column vector arguments. The first would be a string column. The second would be an integer column, a batch and they have the same number of rows as input. And then you would produce a column vector of string as output with the same number of rows as the row counts on the inputs.
Let’s go through a concrete example of this. URLDecode. So this is a URL the UDF that we encountered, where somebody was doing a UDF that needed to decode URLs. And this is just a simple implementation of it. I’ve removed most of the error checking to keep it brief for this presentation, but it’s a relatively simple, straightforward translation, right? So it’s a row by row implementation that every time a string is called, if it’s no, will return no. If it’s not no, then we’ll just invoke the JVMs URL decoder class to decode this URL encoded string. So relatively straightforward UDF, it’s pretty useful if you have UDFs or URLs that you need to decode. And this is the GPU columnar RAPIDS implementation of it. Again, at the top in green, we see we added the RAPIDS UDF interface that we extend that interface and override the method.
And here’s the columnar implementation. Now, in this case, cuDF Java bindings have functionality for decoding URLs. So that’s very convenient and why this is a really simple short example. A lot of the work was already done by cuDF. And in this case, it was almost a straight translation. The one hiccup is in cuDF URL decoding only translates the hexadecimal escape sequences. It doesn’t translate plus the space as the Java version does. So to be completely compatible, it goes ahead and does that as well. So we can see here, we take our initial column vector argument, which is going to be a string argument.
Again, I’ve removed error checking and things like that from here, you could have those in your UDF. We take that first argument, it’s going to be a string column. And then we construct two scalars, a string scalars plus in space. And then we call it string replace on that input to replace plus with space on the string, get a replaced column intermediate result back. And then we can call URLDecode on that cuDF column vector to do the URLDecode. And again, you can see that we’re using [inaudible] with resource here on all the intermediate values to make sure they freed in a timely manner because they correspond to GPU resources. And there’s our RAPIDS Accelerated version of this UDF.
Now, how does it perform? So we tested this on that same EGX cluster I mentioned earlier, the 8-node cluster with a 4.4 billion rows, about 4.4 terabytes of data. And it was six times faster. You can see the CPU version is almost 200 seconds. The GPU version was about 33 seconds, 35 seconds. And so it was almost exactly six times faster with not a lot of codes. So it was a pretty nice payoff. It’s important to note that in that example I showed, we were using the cuDF Java bindings made for a really short example for this presentation, but it’s not required that you are limited to just Java bindings that are in cuDF. Arbitrary custom native GPU code could be supported with this.
You could use other CUDA libraries. You could use custom CUDA code. You write yourself either using thrust or just raw CUDA files. And there are examples of these kinds of things in the RAPIDS Accelerator repository. Specifically, I wanted to call out cosine similarity has been implemented, which shows how you can handle array inputs of floats. So this takes two float array inputs as vectors of floats and computes the cosine similarity between those vectors.
Okay, now that we’ve seen some solutions, what are some future work that we’re looking at in this area? Obviously, one of the things we’d like to do is expand this to other types of user find functions, such as aggregation functions, UDAFs, and also Hives table functions UDTFs. And other thing that we’re working on is improving the UDF data transfer for Pandas UDFs. And what do I mean by that? So for Pandas, if you’re familiar with Pandas UDFs in Spark, the CPU is… Spark is executing on the JVM. Pandas of course, is executing in a separate process in Python. And every time that Spark needs to talk to the Pandas UDF in the query, it has to transfer the data over to Python and Pandas operates with the arrow format, a columnar format and Spark on the CPU is executing row by row.
So every time it talks to Pandas, it has to take the rows and produce a column for arrow data, send that over to Pandas. And then that Pandas operates on that columnar arrow data produces a columnar arrow for a result. And then that’s sent back to the CPU of the JVM, which has translate the arrow backed into rows. So again, the row to columnar conversions happening here for Pandas. We noticed of course that the GPU is already operating on arrow data and in GPU memory. Pandas wants arrow data. So it’s relatively simple. It’s less work for us to just simply fetch the data from the GPU. It’s already in arrow format. We put it in CPU memory, send it over to Python. It’s already in the right columnar format, Pandas operates on it, produces columnar format. We just send it down the GPU, but we don’t have to translate the data format. And we’ve done some early experiments showing this is about 1.75 to 2X faster than the original row to columnar transitions done on the CPU.
So, how can you get more information about this? The RAPIDS Accelerator and other things we’re working on? So I highly recommend you check out other RAPIDS Accelerator talks. There was one from last year’s Spark AI summit called a Deep Dive into GPU support and Apache Spark 3. There are also two talks from this year’s GTC. One was running large-scale ETL benchmarks with GPU accelerated Apache Spark. And the other one is accelerating Apache Spark shuffle with UCX. So if you were interested in the Spark shuffle manager, I highly recommend you check out that talk.
And of course the RAPIDS Accelerator is an open source project. So if you’re interested in learning more, there’s documentation there, the code is there. I highly recommend you just check it out. There’s getting started guides. It’s relatively easy to try it on your queries. See how it works. It works in the cloud environment pretty well. So you can also just fire up some cloud instances and try it there. So, I highly recommend you file issues, feature requests, things like that. So with that, thank you for watching. Thank you for attending and I hope you have a great DATA AI SUMMIT.
Jason Lowe is a Distinguished Software Engineer at Nvidia. He is a PMC member of Apache Hadoop and Tez. Prior to Nvidia, he worked for Yahoo on the Big Data Platform team on Apache Hadoop and Tez. He ...