Enabling Vectorized Engine in Apache Spark

May 26, 2021 03:15 PM (PT)

This talk explains how to enable a vectorized engine in Apache Spark to accelerate Apache Spark programs. Vectorization is an exciting approach to maximize performance as Delta Lake and other commercial database use. On the other hand, the current Apache Spark does not use the vectorization technique yet because it is not easy to use vector instructions in the current Java language.

First, this talk reviews Vector API for ease of use of the vector instructions in Java 16. Then, this talk discusses three possible approaches to vectorize Apache Spark Engine by using Vector API: 1) replace external libraries such as BLAS library, 2) use a vectorized runtime such as a sort routine, and 3) generate vectorized Java code by Catalyst from a given SQL query. Finally, this talk shares analysis and performance results by these approaches.

Here are takeaways of this talk:

1. Overview of Vector API to vectorize Java programs
2. Multiple approaches to use a vectorized engine in Apache Spark
3. Analysis and performance results by these vectorization approaches

In this session watch:
Kazuaki Ishizaki, Researcher, Senior Technical Staff Member, IBM



Kazuaki Ishizak…: Good morning, afternoon, and evening, depending on where you are watching. Thank you for joining this session. While there are many power sessions at the time. My name is Kazuaki Ishizaki.
Today, I would like to talk about how to enable vectorized engine in Apache Spark. I’m sharing the analysis result in kind to Apache Spark. I put this presentation into my site after this talk soon.
Let me introduce myself. I’m a researcher at IBM Research Tokyo. My expertise is compiler optimization. I became a committer of Apache Spark these three years. I have been working on these packages, [inaudible] whole stage project, and in conjunction with Java virtual machine.
I have been working for IBM Java virtual machine over 25 years. In particular, I was responsible for research and development for Java Just-in-time compiler.
So here is an outline of this talk. First, I’ll explain two terminologies, vectorization and SIMD. During that, I’ll talk about how SIMD can improve the performance of a program. Next, I will give you an example of VectorAPI, to ensure SIMD potential in Java virtual machine.
Then I will talk about where and how we can use SIMD in Spark. There are three candidates. External libraries, internal libraries and Java code by Catalyst.
First what is vectorization? Vectorization is to do multiple jobs in a batch at one time. It can improve performance of the whole job. On the left hand side, it’s Scalar. Each row is read at the time, totally, four times. On the other hand, right hand side, it’s vectorization. Four rows read at once. For example, vectorization is used to read multiple rows in a table, or to compute multiple rows at once.
So Spark already used vectorization to multiple purposes, which already improves the performance of the Apache Spark program. Vectorized Parquet Reader, Vectorized ORC Reader, Pandas UDF employ Spark.
Next, I talk about what is SIMD? Single Instruction Multiple Data. SIMD, that’s one instruction can apply the same operation to the primitive-type multiple data, for example, four integer, eight double, et cetera.
In the left hand side, you can see the conventional Scalar instruction, that can apply to the operation of only one data. In the right hand side, one SIMD instruction can upgrade the same operation to multiple data, for example add to data, in the example. In this case, SIMD can increase the parallelism by 8x. So vectorization can also use SIMD to do multiple computations at once. Of course, vectorization can be implemented without SIMD too.
So SIMD is already used in many BigData software to improve its performance. Commercial and OpenSource database and SQL engines. For example, Data Runtime in Spark are a long time.
However, Spark does not use SIMD explicitly yet. Why? So Spark is running on Java Virtual Machine. Until Java 16, Java Virtual Machine cannot ensure whether a given program will use SIMD or not.
We rely on HotSpot compiler in JVM, to generate SIMD instructions or not from Java code. Sometimes, unfortunately, a slower scalar native model will be used. While fortunately, faster SIMD code will be used by automatic three-powered optimization in HotSpot compiler. So we write Spark as Java code once and place anywhere.
Java 16 just introduced new up-launches, VectorAPI as incubation. If we like the Java program, using Vector API especially. Java Virtual Machine can always generate SIMD instructions if hardware supports.
On the left hand side, two from the Array method read bountiful data from the Java Virtual Array, using SIMD vload instruction, on the right hand side. Next, Java add method purpose, multiple operations using SIMD vadd instruction. Then into the Array method, stores multiple data into Java, into the Array, using SIMD vstore instruction. So performance will be improved.
Now, we have a question. Where can we use SIMD in Spark? We have three candidates. One is the external library. One example is a BLAS library. This approach is already implemented in Java 33882 and this is already mounted into master. The second is the internal library, such as Sort and Join. Third is a generated code at runtime by Catalyst.
Then, how can we use SIMD? In the first two candidates, we have to write VectorAPI code by hand. In the last candidate, we need to enhance Catalyst to generate VectorAPI code from a Spark program. From now, let us see one by one.
Across there, you’ll see External Library. We have three approaches to implement matrix library, such as matrix multiplication. Ones are JNI, Java Native Interface Library. It is call highly-optimized, handwritten library. For example, implemented by Fortran or C++. These libraries are coded through the JNI from a Java program.
Second is to use SIMD code in Java. As we can see, we can do it by writing library using VectorAPI. The last thing is to execute the Scalar code by using the naïve Java code.
Here is an example for one independent vector multiplication using VectorAPI. In the blue part, the operations are executed by using SIMD instructions. So performance will be improved in the part of the application.
I compare the performance among these three approaches, by running two benchmarks. The first one is a benchmark with large-size data. Obviously, JNI approach has passed this by more than 10 times. The next one is a benchmark which is small data size, since JNI approach includes overhead. VectorAPI is the fastest in this case.
Here is a summary. In the most used cases, JNI library approach is the best one while data copying overhead in all carts. We usually use loads more data in Apache Spark. SIMD code is also good if we cannot use JNI or we have to handle small data, but this approach regards Java 16 or later version, or the Java Virtual Machine.
Next, text is the internal library such as Sort and Join. We know the advantage of the SIMD instruction in this area. There are lots of academic papers for Sort and Join. Also many databases and SQL engine used SIMD instructions.
Here, let us see Sort operation. Now, Current Spark uses two Sort Algorithms. One is a Radix sort, the other is a Tim sort. There are multiple SIMD sort algorithms that a current researcher already proposed. Here, I choose Comb Sort. That is fast for data in CPU data cache. This is good for the fast type in this stock.
I implemented Comb Sort using VectorAPI, and then the standalone Sort benchmark program. Here is the result. Good news is that SIMD Comb Sort is 3.5x faster than Tim Sort. But the news is that Radix Sort is still 1.4x faster than SIMD Comb Sort.
So there are two reasons why Radix Sort is still faster. One is the computation order. Radix Sort order is small. Order N. On the other hand, Comb Sort order is N log N. Another reason is that VectorAPI cannot fully exploit platform-specific SIMD instructions yet. I will show you an example in the next few pages.
So here is an example, a simple Sort, for pairs of Key and Value. This is frequently executed in Sort. First, we compare two keys, one and five. Then get a small key, one under each value minus one in green. Next, we compare two keys, seven and three. Then get a pair with a small key. Three and its value, minus three in blue.
If we can use SIMD, we can do these operations in parallel. If we could use C code instead of Java code, these operations can be written as this example, compared to the set of the keys, and get the mask small array as a comparison result. Then create a mask into variable mask logic by shifting a value in variable mask small a.
Mask logic is a mask vector to get a pair of smaller keys and its variables. This code is not a shuffle instruction, which moves the data in a similar way. So similarly, in the example, is a zero, x[0-3], or a maska, maskA. This shuffle instruction direction is very important to achieve that better performance on x86.
On the other hand, VectorAPI version describes a core shuffle instruction, basically because this kind of VectorAPI cannot shift the result of the compare, instead of the previous page. Thus we have to shift the value to be compared before. Then we create a mask vector by comparison. So this causes a performance program.
Then here, we learn the different benchmark program. Data RAM Sort program. It takes about 550 or 750 milliseconds, depending on Sort categories. If we could use SIMD Comb Sort instead of Tim Sort, we could increase the performance by 20%. So it is good.
On the other hand, in the Radix Sort case, the Sort spent only 50% in the whole program. So it is not easy to achieve the quasar Radix performance improvement only by improving the Sort implementation.
When we visit the [inaudible] of the Sort program, we can see exchange operation in that. This is a costly operation because it translates that data among the nodes. We have to take care, as we are doing it, in Spark Runtime too.
So here is lessons I’ve learned in these candidates. They are storing the Generated Code by Catalyst. How divvying up DataFrame Program is translated into Java code, to be executed on Java Virtual Machine.
So Catalyst performance is transformation, as you can see here, on the right hand side. The current Catalyst in Spark 3.2, it’s the latest in the master generator, like this code. The blue part is to read the data. The red part is to compute our data.
So to read data, it’s already vectorized in the Current of Spark. However, to compute the data and to read the data, it’s not vectorized yet, even in the greatest master. The red part is action in the whole stage, whole generation. So I have Prototyped Generated Code that reads and backdates complication, if possible.
Enhanced Code Generation in Catalyst. In blue, to generate and backdate computation code by using the analyst information in Catalyst. I have Prototyped two types of the vectors for generation. One is to use Scalar variables. The other is to use VectorAPI.
This is a generating code, in the past implementation. This will back perform the computation floating, and marked vacation using the computation Scalar variable.
This is another implementation, and then a generated code. This blue part performs the computation, floating at multiplication, using the VectorAPI, like a floating vector normally, mask and other methods. It will use SIMD in sections when this method will be compiled by Floats Compiler in Java Virtual Machine.
I ran micro-benchmark programs against C implementation. Here are the results. So good news is that vectorized version is 1.7x faster than the current version. So not good news is a SIMD version. It achieves only 1.2x for improvement, compared to the Vectorized Scalar batch.
I ran Nano Benchmark, to clarify advantage of a SIMD instruction in this computation. So this Nano Benchmark executes the same computational job in the previous Benchmarks. If we execute on these computational parts, the SIMD version computation is about 3x faster. So we verified the SIMD has an advantage for computation part.
In the generated code, we already vectorized read part and computation part. So now put better is a bottleneck part, which is not still vectorized. It is presented in the bottom, in red.
Here is our lessons learned in this candidate. So we showed vectorization is very effective. Also, SIMD is useful for Vectorization, Computation part. But we have room to fully exploit our SIMD capability in other parts.
So this is the last page of this talk. Here is the takeaway of this talk. We learned how to use SIMD instructions in Java, by using a VectorAPI. So there are three possible areas to use SIMD instructions in Apache Spark, by using the VectorAPI. The fast area has been matched in the latest master already.
The second, in some areas, we saw performance improvement, but we still have room to improve the performance, to fully exploit with the advantage, by improving the Spark Runtime and the VectorAPI. Thank you for your attention.

Kazuaki Ishizaki

Dr. Kazuaki Ishizaki is a senior technical staff member at IBM Research - Tokyo. He has over 25 years of experience conducting research and development of dynamic compilers for Java and other language...
Read more