How We Optimize Spark SQL Jobs With parallel and sync IO

May 27, 2021 04:25 PM (PT)

Download Slides

Although NVMe has been more and more popular these years, a large amount of HDD are still widely used in super-large scale big data clusters. In a EB-level data platform, IO(including decompression and decode) cost contributes a large proportion of Spark jobs’ cost. In another word, IO operation is worth optimizing.

In ByteDancen, we do a series of IO optimization to improve performance, including parallel read and asynchronized shuffle. Firstly we implement file level parallel read to improve performance when there are a lot of small files. Secondly, we design row group level parallel read to accelerate queries for big-file scenario. Thirdly, implement asynchronized spill to improve job peformance. Besides, we design parquet column family, which will split a table into a few column families and different column family will be in different Parquets files. Different column family can be read in parallel, so the read performance is much higher than the existing approach. In our practice, the end to end performance is improved by 5% to 30%

In this talk, I will illustrate how we implement these features and how they accelerate Apache Spark jobs.

In this session watch:
Jun Guo, Head of Data Engine Team, Bytedance



Speaker 1: Hello everyone, thank you for taking this session. The topic of this session is how we optimize Spark SQL jobs, with parallel and asynchronous IO in ByteDance.
My name is Guo Jun. And my English name is Jason. I’m the lead of the data engine team at ByteDance.
But let me introduce who we are. We are data engine team at ByteDance. We build a platform of one-stop experience for OLAP, on which users can analyze PB level data by writing SQL without caring about the underlying execution engine.
And what do we do? We manage Spark SQL, and the Presto and the Hive workloads. We offer Open API and the self-serve platform. We also optimize Spark SQL, and the Presto and the Hive engine. We design data architecture for most business lines of ByteDance.
Agenda for this session. Firstly, I will introduce Spark SQL at ByteDance. And then, I will explain why does I/O matter for Spark SQL? Then I will introduce, how we boost Spark SQL jobs by parallel and asynchronous I/O. Lastly, I will introduce the things, anyway we are do in the near future.
Let’s go to the first part. Spark SQL at ByteDance. We adopt Spark SQL in 2016 for small scale experiments. And then in 2017, we use Spark SQL for ad-hoc workload. In 2018, Spark SQL is used for some of the ETL pipelines in production. In 2018, Hive is most commonly used solution engine for ETL jobs. And few ETL pipelines are running on Spark SQL. And then in 2019, most of the newly created ETL pipelines are running on Spark SQL. In 2020, Spark SQL is our main engine, in the warehouse area, both for ETL jobs and ad hoc queries.
Now, in 2021, Spark SQL has totally replaced the Hive for ETL jobs, which means that all the existing have ETL jobs, are micro [inaudible] to Spark SQL. So, in ByteDance, all the ETL jobs are running on Spark SQL. We don’t have any more..
Okay, let me explain, why does I/O matter for Spark SQL? We notice that I/O performance has been improved in the past years, but we think I/O is still a bottleneck for big data processing. Let’s go to our first part. I/O performance has been improved. We know that NVMe SSD perform much better than HDD. Maybe by two magnitudes. More and more new hardware have been invented in past years, such as AEP. The performance is much, much better than HDD, even though it’s much expensive. So, many papers shows that I/O is faster than CPU. What we need to optimize is CPU, not I/O. But as in our practice, we observed that I/O is still the bottleneck for big data processing for industry.
Firstly, the total cost of ownership is one of the most important factors for huge data storage. Performance is important, but the TCO is also another important factor for huge data storage. And most of the servers have a lot of HDD disks, especially for Hadoop clusters. In ByteDance, a large amount of Hadoop clusters have a lot of HDD, and they may contain a few SSD, or even no SSD. So, and we observed that I/O cost contributes more than 30%, or even more than 40% of total latency of Spark SQL’s ETL jobs.
So this is why we think that I/O this year is problematic for big data processing. And that’s why we think it’s [inaudible] to optimize the I/O for Spark SQL.
Okay, let’s go to the next part. How we boost Spark SQL jobs by parallel and asynchronous I/O. Before that, I’d like to introduce the Parquet. As everyone knows that, Parquet is a columnar storage format. In ByteDance, Parquet is most commonly used file format, especially for data warehouse. In this picture, the black box is Parquet file, and each Parquet file may contain the header, four [inaudible] header. And the header is matched to number.
After that, there will be one or more row groups, following by a header. Each Parquet file will have a footer. For each row group, each row group may contain one or more column checks. Each column check will contain all the tuples, all the elements for that column. For example, there are four columns for this Parquet file. Column A, B, C, and D. So, there will be four column checks for each row group. And the column check A will contain all the elements for column check A. And also, column check B will contain all the tuples, all the elements for column B. And each column check consist of one or more data page. Data page will contains a header and the repetition levels, definition levels, and the non-null values. Different column chunks may have different number of, different pages. But in a single row group, each column chunk will have the number of tuples for different column chunks, might be the same. Because Parquet will split the data by row, into different row groups.
And then splits the data by column, into different column chunks. And besides the row group, there will be a footer for each Parquet file. In the footer, there are many data on the google level metadata. For each row group, the footer will store the columnar data for the sample, the mean and the max values. And the number of values. And also, the offset of first data pages, the first index, and the offset for the first index pages.
And it’s worth noticing that row group is a small list you need for Spark to read the Parquet file. In other words, a row group can only be read by a single task. It’s not possible for Spark to read a single row group by different Spark tasks, okay? As a whole, Spark reads the Parquet file.
Let me introduce how Spark SQL split a large Parquet file. Spark SQL will split a large Parquet file into a group of splits, each of which will contain one or a few row groups. Each task will read these row group sequentially. For example, in this picture, this Parquet file contains six row groups. Row group one, two, and three, four, five, six. Six row groups in a single Parquet file. And if all this Parquet file is large enough, so Spark decided to read this Parquet file within two Spark tasks, and each task will read three row groups.
Let’s take into, for task row one. Task one will rate three row groups. Row group one, two, and three. And this task reads this row groups one by one, which means that only when the task finish reading row group one, can the task one read the next row group. Okay.
If the Parquet file is small, there may be a large number of Parquet files. Then, Spark SQL can combine a group of small Parquet files into a single split. Each split will be handled by a single task. Each task will read these files, small files, in a single group sequentially. For example, there are four Parquet files. Each Parquet file contains only one row group. And each Parquet file are small Parquet files.
So Spark decided to read the four Parquet files with two tasks. And each task will read two small Parquet files. For example, task one will read Parquet file one and Parquet file two. Again, the task can only read this two small files, one by one. Read the file one, and then read the file two.
Okay. Let me compare the sequential I/O and the parallel I/O. By default, Spark can only read the data one by one. So, it means in sequential way. So I/O and computation are handled in a single thread. And I/O and computation are handled sequentially by the same thread. That is only one thread. And the tuples in a single task are computed sequentially. I/O for different files or different row groups are handled sequentially. So by default, Spark will read a group of data, and then finish the computation. And then read another group of data, and then do the computation.
What do we do, is that, when we want to do this in a parallel way, and we separate the I/O and the computation, into different threads. We introduce a buffer to separate the I/O and the computation. I/O and the computation will be handled in separated threads, or thread pool. I/O for different files or row groups can be done in a parallel approach.
Yeah, these two pictures shows how the row group level parallel I/O and the file level parallel I/O. On the left side, the picture shows the row group level parallel I/O. In task one, in the middle of task one, there is a buffer. We introduce a buffer, to separate the I/O and the computation. On the left of the buffer, there is an I/O thread pool. There are more than one threads in the circle. And on the right of the buffer, there is a computation thread. So the thread pool will read the three row groups in a parallel way. Maybe there are more than three threads. So each thread can read a row group. And this row group can be read concurrently. And all the data will be pushed to the buffer. And then, there will be a single thread to handle the computation.
Okay, this is how we implement the row group level parallel I/O. On the right side, the right picture shows how we implement the file level parallel I/O. For task one, there is also a buffer. And on the left side of the buffer, there is I/O thread pool. And then, there will be two threads. Each of the threads will read a single Parquet file. So the two Parquet files can be handled in parallel.
Okay, let’s go to the next part. Column level parallel I/O. For column level parallel I/O, we split a logical Parquet file into a group of column family, which is a physical Parquet file. And each column family contains a few columns. Spark SQL will read different column family in parallel. By default, for example, in this example, there are six columns for the Parquet file. By default, all the six columns will be in a single Parquet file. But we introduce the column family, so we can put the column A, B, and C in a column family. In a physical Parquet file. And then, we put the column D, E, and F in another column family, or another physical Parquet file.
So the task one can read these columns in parallel. In data warehouse area, sometimes there are maybe more than 100 columns for each table. So without the column family, the Parquet file may contain more than 100 columns. And each task can only read these columns in a single thread. But, after we introduce a column family, we can split the 100 columns into, for example, 10 column family. So we can read 100 columns in 10 threads. In our benchmark tests, the parallel I/O boosted the Spark SQL by 20%.
Okay, let’s go to the asynchronous I/O. To be more specific, this should be asynchronous spill. On the top of this picture, it shows a vanilla Spark spill. And a Spark spill have two steps. The first step is that the Spark task will handle the data, finish the data computation, and then, push the data into the buffer. The buffer is in the memory. And when the buffer is full, then Spark SQL will stop the task. Stop the task and then, flush the data from the buffer to the file. After the flush, then the buffer is clean. It’s empty. And then, the new data can be calculated and pushed to the buffer.
We notice that during the data spill, the calculations is stopped. Okay, and then we implement asynchronous spill. With asynchronous spill, the spill will have three steps. And will split the buffer into two buffers. Buffer one and buffer two. So, step one is that Spark will calculate some data. And then push to buffer one. When buffer one is full, we got to next step. Then Spark will flush the data from buffer one to the spill file. But at the same time, the new data will be handled, will be calculated and then pushed to buffer two.
When buffer two is full, then Spark will flush the data from buffer two to the spill file. But at the same time, Spark will keep processing the new data, and push to buffer one. So, with asynchronous spill, Spark will never stop processing new data. This is why we can boost Spark SQL jobs, with asynchronous I/O.
Okay, let’s go to the last part. The future work. In the near future, we will keep doing some optimizations for the Spark SQL. The first part is I/O. And the second part is computation. For I/O optimization, we will implement adaptive column family. It means that we may adaptively split the data into different column families according to the query history. Besides that, we will implement a smart cache. We will cache the data, or cache the result, or cache the intermediate data, or even cache the raw data, according to the query history, to accelerate Spark’s queries.
The second part is computation. Now, Spark won’t recognize the reader, but the calculation is in a tuple in a timed way. So, we want to implement vectorized computation. Vectorize the exclusion engine. Another plan is that we’re trying to implement some of the commonly used operators, such as hash storing, and hash aggregates, ways making [inaudible] to accelerate the computation.
That’s all for my sharing, thank you.

Jun Guo

Jun Guo is in charge of data engine team at Bytedance. His team is focusing on data warehouse architecture development and optimization for a EB level data platform. Spark SQL is one of the most im...
Read more