Join is one of most important and critical SQL operation in most data warehouses. This is essential when we want to get insights from multiple input datasets. Over the last year, we’ve added a series of join optimizations internally at Facebook, and we started to contribute back to upstream open source recently.
(1).shuffled hash join improvement (SPARK-32461): add code generation to improve efficiency, add sort-based fallback to improve reliability, add full outer join support, shortcut for empty build side, etc.
(2).join with bloom filter: for shuffled hash join and sort merge join, optionally adding a bloom filter for join keys on large table side to pre-filter rows for saving shuffle and sort cost.
(3).stream-stream join (SPARK-32862 and SPARK-32863): support left semi join and full outer join. In this talk, we’ll take a deep dive into the internals of above join optimizations, and summarize the lessons learned and future planned work for further improvements.
Speaker: Cheng Su
– Hello everyone. My name is Cheng Su. Today, we would like to talk about Spark SQL Join Improvement at Facebook more specifically, whether you’re focused on several join improvement we did internally or around the spark platform at Facebook. Before we start the talk, just a brief introduction about myself. I’m a software engineer working on the data platform team at Facebook. I’m a Spark contributor, mostly focusing on Spark SQL. Previously, I worked on the Hive and Hadoop team at Facebook. Here is the agenda for this talk. We will first start off with a brief overview of Spark SQL Joins, and then talk about our improvement around shuffled Hash Join. After that, we will take a brief discussion for ongoing work to leverage bloom filter for Join. Then, we will discuss the improvement around the stream-stream Join, and finally conclude by showing some of the problems that we are working on in this space. Let’s take a brief overview of our Spark SQL Joins. What are they, how they work, and when to use. Spark SQL has three different drawing operators. Broadcast hash join, shuffle hash join, and sort merge join. Let’s go over them one by one. As an example, we want to join two data sources: A and B. Broadcast hash join is to join a super small table with another table. Suppose data source A is pretty small, Spark driver will build the hash relation on top of source A and the broadcast A to every task. Each task will read a portion of table B, and hash join to look how to join with table A. The requirement for broadcast hash join is a data size of one table should be smaller than the config. Spark SQL auto broadcast joins threshold, which is 10 megabytes by default. The pros of broadcast hash join is there is no shuffle and sort needed on both sides. And it doesn’t have any skew issues. The cons is that if source A is not small enough, it can cause auto memory, OOM on driver side, when building a high stimulation. Next is sort merge join. It first shuffles both sides A and B on join key. And after a shuffle, each task will do a local sort on both sides. Now the join is performed in a sort merge way. The pros of sort merge join is it doesn’t have any OOM issues and it can handle large data sizes well on both sides. So if you have two large tables to join, sort merge join is a better choice. The cons is that it needs to shuffle and sorts before the join and this can be inefficient and costly for a large table. For example, the internal sort can be expensive if the table is big and needs to skew auto memory. In addition, if the join keys are skewed-, for example, has a lot of nouns as join key, sort merge join can have data skew problems. The third one is shuffle hash join. It first shuffles, both sides, A and B on join key. And after shuffle, each task will build hash table on smaller side and stream the other side to form hash table hash join similar to the broadcasted hash join. One thing to know here is that join is disabled by default, whereas spark SQL join prefers SQL sort merge join config, and the ones that should be smaller than spark SQL, auto broadcast join’s threshold, tend to shuffle spark SQL’s partitions. Which is around the two gigabytes by default. And the smallest size should be 3X smaller than the larger side. The pros of shuffle hash join is that it can handle large data as, as well, on one side. It does not need sort compared to sort merge join. And in some cases, shuffle join is more efficient than sort merge join. The cons are that it needs to shuffle before join. And it has similar skill problem as sort merge join. In addition, shuffle hash join has a unique challenge that it can own on cast side for building hash table. And we will talk more about how we tackle this problem shortly. Here, I hope everyone gets an idea of how spark SQL joins. Here, we talk about the improvement we did around the shuffle hash join. The first one, is to end code-gen support for shuffle hash join. The motivation is to improve CPU computer resourcing by leveraging spark SQL hosting the code-gen. Previously, only broadcast hash join and soft merge join had code-gen support. Here, we add the code-gen support for shuffle hash join as well. How do we do that? For implementation, we refactor broadcast hash join and code-gen logic into common parent class for HashJoin.scala. As broadcast hash join and shuffle hash join has similar logic to logic to high table look up join. For benchmark inquiry, we are seeing 30% of run-time improvement compared to non-code-gen code pass. And the PR is merged into our stream and it will be available in spark 3.1. The second one is to end the Full Outer join support. The motivation is to improve CPO and IO. Previously, only sort merge join supports full outer join, and the sort can be very expensive when table is large and needs to spill to disk. Shuffle hash join as the other hand does not need sort. And here is how we do that, for the join not only needs to output all the matched rows from both sides, but also needs to output all non-matched rows. And for stream side, this is a trivial problem. As for each row, will build a hash table look up. We know exactly whether the row is measured immediately, but it’s non-trivial for build side. For each row in hash table build side, we only know whether the row has a match or not after exhausting every row from the other stream side. So we need actual data structure to recall this information for build side. And here, we use hash side to recall if the rows have been met or not. Here is a figure for how full outer shuffle hash join works. First, we do shuffle on join keys on both sides. That would build hash table on smaller side for each task. When joining each row from stream side, we maintained a separate hash set data structure to recall the matches rows from build side. After reading all rows from stream side, we go over all rows from hash table again, and all rows that are not matched rows. For Benchmark query, We are seeing 30% of runtime improvement compared to Full outer shuffle join. And the PR is merged into screen our stream and will be available in sparks 3.1. The third one is to end sort-based fallback mechanism for shuffle hash join. The major fail point for shuffle hash join is the OOM on task side when building the hash table. There’s no fallback and no spill logic for the hash table right now for the shuffle hash join. Once there is not enough memory for building the table, the task will be filled immediately and the query will be filled immediately in hardware. Okay. With this OOM limitation, shuffle hash join is hard to enable by default. And how we can tackle this problem? Here we introduce fallback logic, when building hash table. Whenever we fail to get memory to insert current role to house table, we stop here, and sort both streams on build side. And do sort merge join, instead of shuffle hash join. The fallback is dynamic during the downtime result and in conflict tuning. And then, we enable this feature by default at Facebook for several years. We don’t see OOM problem for shuffle hash join after ending this fallback feature. And, the PR is Work in Progress and hopefully it can be available in sparks 3.1. Okay, here we go to the next topic. To leverage pro filter for joint operators. Bloom filter is a data structure to test the membership. For a given element, it answers the question, whether it’s in a set of elements or not. The motivation is to use the bloom filter, to filter all the rows as early as possible to save CPU. And I already saw it for shuffle hash join and sort merge join. First, the bloom filter will be appealed on join keys of smaller size. Now the bloom filter will be broadcasted to every task when scanning the larger side. In this way, it reduces the amount of data to process in followed shuffle and sort stage. The design is under discussion with several community members and we will submit the JIRA with design later. The last topic we want to discuss is a stream-stream join improvement we worked on in spark structured streaming. Here is a quick refreshment for stream-stream join. There is one join operator, for stream-stream join, which is called streaming symmetric hash join exec. And here’s how it works as an illustration. For structured streaming macro batch execution mode in each micro batch, the source A and B are shuffled on join keys and each macro batch will join with a state store on the other side, where a hash table look up becomes join mechanism. The state store basically contains rows from parent patches on the other side. After the join is done, the current macro batch of data will be stored into each system. And this is for this macro batch to join. Normally, the watermark in stream-stream join is used in SQL query to remove the stale rows from state store to where state store starts growing infinitely. And our first work is to, have the left semi join support into stream-stream join. And the motivation is, we found actually for some FB streaming workloads, left’s semi join is even more popular than left outer join. And here’s an example. We have a, suppose we have a stream of data of ads impression and ads the impression, you can think of it ads joined to the users and the another stream of data of ads click, which means the ad has a click from the user. And we want to get all the ads impression which has ads clicked from the user, by the way, its not clear what those clicks are. So, we only need to match it ads the impression and the match clicks. And as this is exactly in that semi join semantics, and if we do it with left outer join, it will be very verbose as we output all the ads impression looks. And how we do the left semi join? Okay. So let’s dive into the role for each row. We check if there’s a match on the right side state store. And if there is a match, we output the left side row immediately because there is a match for the last semi join semantics and the pathway does not need to put the left side row, the matched row, into left side state store because there’s no need to put the state store, anyway. If was there no match, and we do not output anything then, put the row into left state store. And when putting this into left state store, we mark the matched field to set to force into state store. Its the same way for the right side state row. We check if there’s a match on left side state store. For all matched left rows in state store, we output the rows with matched field as false. So this means we only output the left side rows, that matched for the first time to guarantee left semi joined semantics. And after that, we set all the left rows with matched field to be true, and it will not be joined and output again. The state store eviction logic is to evict rows from false left and right side state store below the watermark. There is no customer logic around that. And in that way, its the same as inner join watermark logic. And then for the PR, it’s merged into our stream already and it will be available in spark 3.1. The second one is to add the full outer join support. Stream-stream join already supports left outer and right outer join. And the full outer join, it’s kind of trivial in this case, because it is basically a combination between left outer and right outer join. Now, how can we achieve that? For left side input row, we check if there’s a match for right side state store. And if there’s a match, we output all the matched rows immediately and put the row in the left side state store. For the right side input row, we check if there’s a match in left side state store. And if there’s a match output all matched rows and update left side row state with matched field to set to true. And then we also put all the rows in this match into right side state store. For left sides, rows need to be evicted from state store but only output rows if it’s a non-matched row, because all the matched rows has been output already. And this is the same state store eviction logic for right side. The PR is work in progress and hopefully, it can be available in sparks 3.1. And to recap, in this session with the three major points. First, we go over spark SQL join implementations. Then, we talked about the shuffle hash join improvement we did internally. And then we briefly discussed, to leverage the bloom filter for the shuffle join and sort merge join. In the end, just saw some of the improvement we are working on for the stream-stream join instruction streaming improvements. For future work in this area, we are working on history based optimization to select best joint strategy at the wrong time. What observation in today’s data warehouse is normally a data engineer and a software engineer that data pipelines around them everyday daily. So mostly of the SQL queries, I repeated it every day. Just with different equal improvisations every day. So with this optimization, we can say like pre-teach today’s query runtime metrics with historical query runtime metrics. Such as join data size and with join data size, we can decide which joint to use. And that’s all. And this concludes my talk. Feel free to ask any questions and your feedback is very important to us. Thank you.
Cheng Su is a software engineer at Facebook. He is working in Spark team, Data Infrastructure organization at Facebook. Before Spark, he worked in Hive/Corona (Facebook in-house Hadoop MapReduce) team. Cheng received a Master's Degree specialized in computer system from University of Wisconsin-Madison, USA, and a Bachelor's Degree from Nanjing University, China.