Bucketing is commonly used in Hive and Spark SQL to improve performance by eliminating Shuffle in Join or group-by-aggregate scenario. This is ideal for a variety of write-once and read-many datasets at Bytedance.
However, Spark SQL bucketing has various limitations:
As a direct consequence of these efforts, we have witnessed over 90% growth in queries that leverage bucketing cross the entire data warehouse at Bytedance. In this talk, we present how we design and implement a new bucketing mechanism to solve all the above limitations and improve join and group-by-aggregate performance significantly.
– Hello everyone, thanks for attending this session. The topic is “The Next Generation of Bucketing Improved SPARK SQL Perfomance by Removing Shuffle.” I’m Guo Jun and my English name is Jason. I’m the lead of Data Engine Team at ByteDance. Let me introduce who we are and all we do. We are a Data Engine Team of ByteDance. We build a platform of one-stop experience for OLAP, on which users can build a PB level data warehouse and analyze the PB level data by writing SQL without caring about the underlying execution engines. We offer open API and self-serve platform. Besides, we optimize Spark SQL, Presto and Hive data engines.
And the same time we design data architecture for many business lines in ByteDance. My presentation will consist of four parts, the first one is Spark SQL at ByteDance. Next one is, what is bucketing? Then I will introduce Spark bucketing limitations and the last I will illustrate what a way to optimize Spark bucketing. Let’s go to the first part, how Spark SQL work at ByteDance. We introduced Spark into ByteDance at 2015.
Before 2016, Spark SQL… We just run Spark SQL for small scale experiments and at the end of 2017 Spark SQL support parts of ad-hoc queries workloads in ByteDance. Then at the end of 2018 Spark SQL support most of ad-hoc queries and a few ETL pipelines in production. And at the end of 2019 Spark SQL support a majority of ad-hoc queries and most of ETL pipelines in production. Now Spark SQL is the main engine in data warehouse area at ByteDance.
Let me briefly introduce what is bucketing. Now first we should create a bucketed table into two ways in Spark SQL. On the left side we can create a bucketed table in the stack way. Create a table order using parquet, CLUSTERED BY user_id sorted by user_id and into 1024 buckets.
In this example we can say that we need to specify a user, we need to specify the packaging key set with CLUSTERED BY clause and we also need to specify the sorted by key set, we sorted by clause. And what we need to know is that the sorted by key sets could it be different from the CLUSTERED BY key sets.
And on the right side we can create a… How compatible bucketed table. Create a table order CLUSTERED BY user_id and sorted by user_id into 1024 buckets stored as parquet.
Is stored as parquet not using parquet, so with this method we create a table which is compatible with Hive engine. When we get a bucketed table we can insert some dataset into the bucketed table like this. INSERT INTO order select order_id, user_id, product, amount FROM order_staging. We can say that we just insert some data into the bucketed table. Like what do we do to insert the data into our non-bucketed table? In another word, when a user insert some dataset to the bucketed, she do not need to know whether the table is bucketed or not. When we get a bucketed table and populate the table, how we can benefit from the bucketing?
Let’s take ShuffledHashJoin as an example. We know ShuffledHashJoin is one of some popular used shuffle mechanism in Spark SQL. When shuffled hash table is selected, Spark SQL need to ensure that, that both tables are co-partitioned. That is to say if we want to join the order table and the table on user_id, Spark SQL needed to ensrue that both of them are co-partitioned on user_id. If they are not co-partitioned Spark SQL will add some shuffle. In Spark SQL we use the exchange operator for shuffle, so in this picture we can say to exchange nodes. Most of them are on user_id so that after you exchange most of them are co-partitioned on user_id, so that ShuffledHashJoin can be used. But if both of them are bucketed table, that means that we set the table to bucketed table before we populate them. Then we do not need to add some exchange nodes before the join because most of them are pre-shuffled on user_id. This is why we want to use a bucketed table. Let’s go to the SortMergeJoin. SortMergeJoin is another commonly used join mechanism.
For SortMergeJoin, SortMergeJoin requires that the child or (mumbles) should be co-partitioned and also they should be sorted on the join key set. For example, do we want to join the order and the user_id? The Spark SQL will ensure that most of them are co-partitioned on user_id and each partition is sorted with user_id. Otherwise Spark SQL will add exchange operator and sort operator to ensure this. But if both of them are bucketed table and also sorted on user_id before we populate the table. Then we do not need…
We do not need to add exchange and sort node again because most of them are pre-shuffled and pre-sorted. This is why we want to set the table into bucketed table. Let me introduce what’s the limitations of current Spark SQL bucketing mechanism. The most…
One of the most important limitations is small files. Let me take the below SQL… Take the below SQL as an example, we can populate the table with INSERT INTO order SELECT order_id, user_id, product, amount FROM order_staging. When the SQL is running we take a snapshot on how many files are there in a single task folder. We count this by hdfs dfs -ls and then we count the files. And we found that there is 988 files under a single task folder. Each task will generate… Each task will generate up to 1024 small files. The 1024 is the bucket number. Just remember that we created the other table with 1024 bucket number, and there will be up to 1024 multiply M small files in total. M is a task number. When M is 1024 there will be up to 1 million small files, and in practice M may be much larger than 1024. For example 10,000 ,then there will be up to 10 million small files.
We know that for hadoop echo system small files may incur some disaster because when there are too many small files, the Spark SQL need to communicate with HDFS NameNode frequently, and the HDFS NameNode is single point, will be the bottleneck. So we woun’t wait to it acts this way, the SQL will run very slowly.
Even when we finish the data populating, the downstream SQLs will also run very slow because they needed to open too many small files and which is very slow. So I think this should be one of the most biggest limitations. Also we can use some other mechanism to solve this problem. For example, we can add the DISTRIBUTE BY clause and also we need to set up the spark.sql.shuffle.partition configurations to some value. There are up to 1024 files when 1024 is multiple of M and there will be up to M files when M is multiple of 1024.
Otherwise, there will still be up to 1024 multiply M small files. M equals to the value spark.sql.shuffle.partitions.
Yes, we can reduce the file counts by this configuration and the DISTRIBUTE BY clause but it’s difficult for any users to set the configuration, so we need a mechanism to reduce the files automatically. The next limitation is that Spark SQL bucketing is incompatible across SQL engines. For example, Spark SQL bucketing is different from Hive bucketing,and also Spark bucketing is different from Presto bucketing. Presto and Hive bucketing are compatible.
There are two reasons why they are incompatible. Firstly, Hive will have… Writes the data into Hive bucketed table.
An extra shuffle will be introduced to ensure that it should reduce task, will write exactly one bucket file. But for Spark SQL there will be no extra shuffle, so each task will write into… Up to M bucket files. All the files, each of which will be very small. Secondly, they use different hash mechanism. For Hive, Hive will use HiveHash but for Spark SQL Murmur3 will be used, so the data distribution will be very different. That’s why they are not compatible. Because they are not compatible, so exchange and Sort are required when joining tables with Hive bucketing in Spark SQL or joining tables with Spark bucketing in Hive.
In 2019, we migrated thousands… Tens of thousands of Hive SQLs to Spark SQL, but many of them used bucketed table.
So when you have to make them compatible so that we can migrate the SQLs automatically.
Besides, in Spark SQL extra sort are needed since the bucket in Spark SQL will consist of more than one files, so Spark have to sort the files within a single bucket to ensure that the whole bucket is sorted on the joining so that SortMergeJoin can be used.
Another limitation is that Spark SQL requires that if user want to join two tables and if they want to use Spark and join, most of them should be clustered on the key and the bucket number should be the same. For example, if the other table have 4096 buckets on the left hand side and the user table have 1024 buckets on the right side, then Spark will exchange the user table into 4096 buckets, so that it can be joined with the other table. So in this picture we can say that even though both of them are bucketed table, but exchange is introduced because they have different bucket number. Another limitation is that Spark SQL requires most of the table are bucketed on the same key set as the join key set. For example, if both table are bucketed on a user_id but we want to join them on user_id and location_id, then exchange will introduce for both of them. The next one is that when we use the Union All clause, exchange will be needed. For example, maybe the other will come from both web and mobile and we want to Union the other web and the other mobile and then join them with user_id.
In this example, exchange will be introduced because after Union the outputPartitioning and the outputOrdering will be set to unknown, and Spark SQL cannot know that the underlying tables are bucketed table, so the exchange will be introduced. Let me introduce how we optimize bucketing at ByteDance.
Firstly, we align Spark bucketing with Hive.
Previously I introduced that the two difference is that the file number is different. For Hive there will be, it’s only one file for each bucket but for Spark there will be more than one files for each bucket. So we need to ensure that in Spark each bucket will consist of exactly one file. And another thing we need to know is that Spark use Murmur3Hash and Hive use HiveHash, so we changed Spark SQL
to use HiveHash when bucketing and when we
read or write back to the table.
The First… So we need to do the below things to ensure that Spark will,
Spark will write the data to bucketed table in the same way as Hive. Firstly, we change the required distribution of InsertIntoHiveTable plan and we set, and we set the,
and we set the value into HashClusteredDistribution with HiveHash on bucketing keys, and also we override the required ordering into SortOrder on bucketing keys with ascending. In this way, we ensure that the table,
an additional shuffle will be introduced to ensure that each task will write only one bucketed table and the task number will be the same with the bucket number. So that there will be exactly M bucketing file and each of them are for single bucket.
So how we can read this data in the same way as Hive.
The next step is that we need to… Spark need to recognize the Hive bucket table, so we override the other pre-partitioning of HiveTableScanExec into HashPartioning with HiveHash on bucketing keys and then we override the outputOrdering into SortOrder on bucketing keys with acending. Let me illustrate more with this picture. On the right… On the left side we see that without our change the outputPartitioning of HiveTableScan is UnknownPartitioning and outputOrdering is Nil. This is why Spark still needed to add the exchange and SortNode for SortMergeJoin even though they have bucketed table. Because the requireChildDistribution for the SortMergeJoin is HashClusteredDistribution and the requireChildOrdering is SortOrder.
And the outputPartitioning for the HiveTableScan is UnknownPartitioning, which does not satisfy the requireChildDistribution of SortMergeJoin. Okay. After the change we set the outputPartitioning into HashPartitioning on the right side and which satisfies
requireChildDistribution of SortMergeJoin, which is HashClusteredDistribution. And also the outputPartitioning of HiveTableScan is changed to SortOrder which can satisfy the requireChildOrdering of SortMergeJoin, which is SortOrder. So exchange and sort are not needed anymore.
In this way Spark SQL can read the data from Hive bucketed table and Spark SQL can join two of them without shuffle and sort.
Now the Spark SQL and Hive bucketed table are compatible.
The next thing we do is that we support one to many bucket join. Let us take this as an example. For table A, we see that there are three buckets and for Table B, there are six buckets. For Spark, if we join them in Spark the extra exchange will be needed because they have different buckets. But in our company we need to support this case without exchange. The first method is that we can combine bucket 0 and bucket 3 in Table B and combine bucket 1 and bucket 4 of Table B, and combine bucket 2 and bucket 5 in Table B. So that after the combination the Table B will have three buckets so that they can join with Table A without shuffle. On the right side we can see that we can combine them with sort, yes you say. So the physical and will be like it says, we had scan the Table B and sort the Table B then, you know, it can be joined with Table A without shuffle. Besides we provide another mechanism to support the one to many bucket join because the previous methods have some limitations. The parallelism will be three, which may be too small and we provide another mechanism so that we can use six as a parallelism, and there will be six tasks which were to use the bucket join. But how can we do that? We can clone the Table A, so that Table A will have six buckets and mostly we clone them by a new mechanism named
Bucket Union. We know know that Spark support a mechanism named Union but if we simply use the Union, the Union operator, the outputPartitioning will be unknown, so we create another… We provide another new (mumbles) named Bucket Union, so that the outputPartitioning and the outputOrdering were kept. So after the Bucket Union it can be joined with Table B without a shuffle and a sort.
Yeah, even though we remove the shuffle and the sort, there are some other problems. For example, this mechanism worked well if B left join A and be B left semi join A or B anti join A, or B inner join A. But if we join the B and A with B right join A, or B full outer join A, or B cross join A, there will be some repeating records because we just scanned the table A twice, right?
We add some other… We dynamically add a filter to resolve this problem. For example, the filter will be like this, the hash, the join ID and then more the number of buckets
and we need to ensure that the result is the same with the bucket id. Let’s take the bucket 0 as an example. There are six records 0, 3, 6, 9, 12 and 15 for bucket 0,
and what we need is 0, 6 and a 12. So we hash the ID and then both ways with 6 and 4, 3, 9 and
15, which is not the target numbers. And when we (mumbles) them with 6 the results will be three, which is differed from the bucket id 0. Then 6, 3, 9 and 15 will be fetched out. With this mechanism there will be no duplicating the records anymore. The next thing we support is that we support joining on more than the bucketing keys. One the left hand side we can see that the table X is bucketed on A. A is the column name and on the right side, the table Y is bucketed by A, and A is also the bucket column, the column name. But the SQL query requires that we need to join Table X and Table Y on both A and B so exchange and sort are reduced.
You exchange on A and B and sort on A and B.
This limitation requires the user’s designs the table
bucketing very carefuly and sometimes it’s not very easy for users to pick up the best bucketing mechanism. For example because our frequent queries are like below, select from Table 1 group by A, B and C and D, and then select from Table 2 group by B, C, D, and select from Table 3 group by B, D, E. And sometimes users may want to join Table 1 and Table 2 on
A, B or, and join 2 and Table 3 on B, C, and join Table 1 with Table 3 on B, D. How can we set up the bucketing keys? If we bucketed Table 1 with A, B, C and D,
so we can use bucket join when we join Table 1 with Table 2. To resolve these problems we support one, we support a mechanism which support joining of more than bucketing keys. Let’s go back to the example, the Table X is bucketed on A and Table Y it’s bucketed on B. In this way we do not need to introduce exchange and we need to do is that we can sort on A and B for Table X and Table Y. Then we can join Table X and Y with SortMergeJoin and without exchanging nodes. By this way we can design the bucketing key sets very easily. Let’s go back to the previous page. In this example we can say that we can just set column B as the bucketing mechanism. So because the B is the bucketing key set,
so one of these queries can benefit from the bucketing mechanism for when we’ll join Table 1 and Table 2 with the new bucket join because B is bucket key set and the join key set is the superset of B. Now also we can join Table 2 and Table 3 with bucket join because the join key set is B and C, which is a superset of bucket key set. The last one is that we support bucketing evolution. Let’s take these two cases as an example. Case 1, a non-bucketed table is partitioned by date and a user want to convert it to bucketed table without overhead.
By default if we want to change the bucketed table, the non-bucketed table as a bucket table.
We either to convert the existing details into the bucketed distribution or when we query the existing data, the query will fail because the existing data is not bucketed yet. And Case 2 is that the bucket number may be X and the user need to enlarge it to 2X because the data volume increased. To resolve these problems we support… We provide a mechanism named bucketing evolution.
To achieve this, we’ve put the bucketing information into partition parameter. We know that in Spark SQL the bucketing information is in the table property not in the partition parameters. And by putting the bucket information into the partition parameters we can know the bucket information for each of the partitions and if… And only if all target partitions have the same bucketing information where is the table be read as bucketed table? Otherwise, it will be read as non-bucketed table. For example, the table is non-bucketed and today we converted it into bucketed table and the partition for today’s data will be distributed by and sorted by the bucket key set. Tomorrow when we query on this table and if the only targeted partition is today, then Spark will know that it’s a bucketed table and bucket join can be used. But if the query needed to read data for both today and yesterday’s data, Spark found that the targeted partition have different bucketing information. So Spark will read this table as a non-bucketed table and
reading a bucketed table as non-bucketed table only impact the performance but not the correctness, so it works well in our practice.
Jun Guo is in charge of data engine team at Bytedance. His team is focusing on data warehouse architecture development and optimization for a EB level data platform. Spark SQL is one of the most important engine in this team and Spark SQL process hundreds of PB of data each day. Prior to Bytedance, he worked for Cisco and eBay, where he focused on data platform and data warehouse infrastructure optimization.