Jun Guo is in charge of data engine team at Bytedance. His team is focusing on data warehouse architecture development and optimization for a EB level data platform. Spark SQL is one of the most important engine in this team and Spark SQL process hundreds of PB of data each day. Prior to Bytedance, he worked for Cisco and eBay, where he focused on data platform and data warehouse infrastructure optimization.
May 27, 2021 04:25 PM PT
Although NVMe has been more and more popular these years, a large amount of HDD are still widely used in super-large scale big data clusters. In a EB-level data platform, IO(including decompression and decode) cost contributes a large proportion of Spark jobs' cost. In another word, IO operation is worth optimizing.
In ByteDancen, we do a series of IO optimization to improve performance, including parallel read and asynchronized shuffle. Firstly we implement file level parallel read to improve performance when there are a lot of small files. Secondly, we design row group level parallel read to accelerate queries for big-file scenario. Thirdly, implement asynchronized spill to improve job peformance. Besides, we design parquet column family, which will split a table into a few column families and different column family will be in different Parquets files. Different column family can be read in parallel, so the read performance is much higher than the existing approach. In our practice, the end to end performance is improved by 5% to 30%
In this talk, I will illustrate how we implement these features and how they accelerate Apache Spark jobs.
November 18, 2020 04:00 PM PT
In data warehouse area, it is common to use one or more columns in complex type, such as map, and put many subfields into it. It may impact the query performance dramatically because: 1) It is a waste of IO. The whole column (in map), which may contain tens of subfields, need to be read. And Spark will traverse the whole map and get the value of the target key. 2) Vectorized read can not be exploit when nested type column is read. 3) Filter pushdown can not be utilized when nested columns is read. Over the last year, we have added a series of optimizations in Apache Spark to solve the above problems for Parquet.
These include supporting vectorized reading on complex data type in Parquet, allowing subfields pruning on struct columns in Parquet, among many others. Besides, we designed a new feature, named materialized column, to solve all above problems transparently for arbitrary columnar storage (not only for Parquet). Materialized column works well in Bytedance data warehouse in the past year. Take a typical table as an example, the daily incremental data volume is about 200 TB. Creating 15 materialized columns on it improved the query performance by more than 110% with less than 7% storage overhead. In this talk, we will take a deep dive into the internals of materialized columns in Spark SQL, describe use-cases where materialized column is useful.
Speaker: Jun Guo
November 18, 2020 04:00 PM PT
Parquet is a very popular column based format. Spark can automatically filter useless data using parquet file statistical data by pushdown filters, such as min-max statistics. On the other hand, Spark user can enable Spark parquet vectorized reader to read parquet files by batch. These features improve Spark performance greatly and save both CPU and IO. Parquet is the default data format of data warehouse in Bytedance. In practice, we find that parquet pushdown filters work poorly resulting in reading too much unnecessary data for statistical data has no discrimination across parquet row groups(column data is out of order when writing to parquet files by ETL jobs).
Over the last year, we've added a series of optimizations in Spark to improve parquet pushdown performance. We developed a feature named LocalSort adding a sort step by some columns when writing parquet files resulting in obvious discrimination of statistical data across parquet row groups and higher compression ratio(according to history queries automatically and no need to modify ETL jobs). Furthermore, we developed a feature named Prewhere. Prewhere parquet reader selects low overhead columns from pushdown filters and reads batch data of these columns and filters data using pushdown filters and skips unnecessary batch of other projection columns. As a direct consequence of these efforts, we've achieved 30% average query improvement, 40% storage improvement of some tables and only 5% overhead.
In this talk, we'll take a deep dive into the internals of LocalSort and Prewhere, describe use-cases where LocalSort/Prewhere is useful, touch upon some work to automatically suggest sort columns based on history queries.
Speakers: Ke Sun and Jun Guo
June 24, 2020 05:00 PM PT
Bucketing is commonly used in Hive and Spark SQL to improve performance by eliminating Shuffle in Join or group-by-aggregate scenario. This is ideal for a variety of write-once and read-many datasets at Bytedance.
However, Spark SQL bucketing has various limitations:
As a direct consequence of these efforts, we have witnessed over 90% growth in queries that leverage bucketing cross the entire data warehouse at Bytedance. In this talk, we present how we design and implement a new bucketing mechanism to solve all the above limitations and improve join and group-by-aggregate performance significantly.