Cheng Su is a software engineer at Facebook. He is working in Spark team, Data Infrastructure organization at Facebook. Before Spark, he worked in Hive/Corona (Facebook in-house Hadoop MapReduce) team. Cheng received a Master’s Degree specialized in computer system from University of Wisconsin-Madison, USA, and a Bachelor’s Degree from Nanjing University, China.
November 18, 2020 04:00 PM PT
Join is one of most important and critical SQL operation in most data warehouses. This is essential when we want to get insights from multiple input datasets. Over the last year, we've added a series of join optimizations internally at Facebook, and we started to contribute back to upstream open source recently.
(1).shuffled hash join improvement (SPARK-32461): add code generation to improve efficiency, add sort-based fallback to improve reliability, add full outer join support, shortcut for empty build side, etc.
(2).join with bloom filter: for shuffled hash join and sort merge join, optionally adding a bloom filter for join keys on large table side to pre-filter rows for saving shuffle and sort cost.
(3).stream-stream join (SPARK-32862 and SPARK-32863): support left semi join and full outer join. In this talk, we’ll take a deep dive into the internals of above join optimizations, and summarize the lessons learned and future planned work for further improvements.
Speaker: Cheng Su
November 17, 2020 04:00 PM PT
Machine Learning feature engineering is one of the most critical workloads on Spark at Facebook and serves as a means of improving the quality of each of the prediction models we have in production. Over the last year, we've added several features in Spark core/SQL to add first class support for Feature Injection and Feature Reaping in Spark. Feature Injection is an important prerequisite to (offline) ML training where the base features are injected/aligned with new/experimental features, with the goal to improve model performance over time. From a query engine's perspective, this can be thought of as a LEFT OUTER join between the base training table and the feature table which, if implemented naively, could get extremely expensive. As part of this work, we added native support for writing indexed/aligned tables in Spark, wherein IF the data in the base table and the injected feature can be aligned during writes, the join itself can be performed inexpensively.
Feature Reaping is a compute efficient and low latency solution for deleting historical data at sub-partition granularity (i.e., columns or selected map keys), and in order to do it efficiently at our scale, we added a new physical encoding in ORC (called FlatMap) that allowed us to selectively reap/delete specific map keys (features) without performing expensive decoding/encoding and decompression/compression. In this talk, we'll take a deep dive into Spark's optimizer, evaluation engine, data layouts and commit protocols and share how we've implemented these complementary techniques. To this end, we'll discuss several catalyst optimizations to automatically rewrite feature injection/reaping queries as a SQL joins/transforms, describe new ORC physical encodings for storing feature maps, and discuss details of how Spark writes/commits indexed feature tables.
Speakers: Cheng Su and Sameer Agarwal
October 15, 2019 05:00 PM PT
Bucketing is a popular data partitioning technique to pre-shuffle and (optionally) pre-sort data during writes. This is ideal for a variety of write-once and read-many datasets at Facebook, where Spark can automatically avoid expensive shuffles/sorts (when the underlying data is joined/aggregated on its bucketed keys) resulting in substantial savings in both CPU and IO.
Over the last year, we've added a series of optimizations in Apache Spark as a means towards achieving feature parity with Hive and Spark. These include avoiding shuffle/sort when joining/aggregating/inserting on tables with mismatching buckets, allowing user to skip shuffle/sort when writing to bucketed tables, adding data validators before writing bucketed data, among many others. As a direct consequence of these efforts, we've witnessed over 10x growth (spanning 40% of total compute) in queries that read one or more bucketed tables across the entire data warehouse at Facebook.
In this talk, we'll take a deep dive into the internals of bucketing support in SparkSQL, describe use-cases where bucketing is useful, touch upon some of the on-going work to automatically suggest bucketing tables based on query column lineage, and summarize the lessons learned from developing bucketing support in Spark at Facebook over the last 2 years