Note: This is a replay of a highly rated session from the June Spark + AI Summit. Enjoy!
At Microsoft, we store datasets (both from internal teams and external customers) ranging from a few GBs to 100s of PBs in our data lake. The scope of analytics on these datasets ranges from traditional batch-style queries (e.g., OLAP) to explorative, ‘finding needle in a haystack’ type of queries (e.g., point-lookups, summarization etc.). Resorting to linear scans of these large datasets with huge clusters for every simple query is prohibitively expensive and not the top choice for many of our customers, who are constantly exploring (and demanding!) ways to reducing their operational costs – incurring unchecked expenses are their worst nightmare. Over the years, we have seen a huge demand for bringing ‘indexing’ capabilities that come de facto in the traditional database systems world into Apache Spark.
Among many ways to improve query performance and lowering resource consumption in database systems, indexes are particularly efficient in providing tremendous acceleration for certain workloads since they could reduce the amount of data scanned for a given query and thus also result in lowering resource costs. In this talk, we present our experiences in designing, implementing and operationalizing Hyperspace, an indexing subsystem for Apache Spark that introduces the ability for users to build, maintain (through a multi-user concurrency model) and leverage indexes (automatically, without any changes to their existing code) on their data (e.g., CSV, JSON, Parquet etc.) for query/workload acceleration. We will cover the necessary foundations behind our indexing infrastructure including the API design, how we leveraged Spark’s Catalyst optimizer to provide a transparent user experience and also discuss our development roadmap. Through presentation, benchmarks, code examples and notebooks, this will be one fun session, so come join us as we get started on this journey. Hyperspace has been recently open-sourced at https://github.com/microsoft/hyperspace
– Hello everyone. I hope you’re having a wonderful day so far. Today we’re gonna be talking about Hyperspace, an Indexing Subsystem for Apache Spark. Before I begin talking about the details about the project, let us introduce ourselves. My name is Rahul Potharaju and I am joined here by my awesome colleague Terry Kim. Both of us are from Microsoft. We are part of the product that Microsoft has launched recently called Azure Synapse Analytics. We are part of the Spark team at Microsoft, and it’s probably obvious by now, but I just say it, we work on everything Spark. We offer Spark-as-a-Service to Microsoft customers. Which includes both internal customers like Office and Bing, and also external customers. Where possible we contribute back to Apache Spark and the open source majority of our work. Today I will be covering the background of the pilot project, the vision, some of the concepts that help you understand what’s happening inside this project. Why is it something that you have to care about. And my colleague here will showcase some real world examples to hopefully convince you by the end of this talk, that Hyperspace is an awesome Spark. So before we dive into anything technical, let’s just start with the most foundational question, which is what is an index? I can give you the most obvious answer from the textbook. In fact, I just lifted this out of a textbook. In databases, an index is a data structure that improves the speed of data retrieval, which in other words, it’s Grady X completion at the cost of additional rights and storage space. But if you’re anything like me, then a real world analogy would be tremendously useful in just understanding what really is an index. So let’s take another crack at it. Right from, since we were kids you might remember pretty much going to the back of your textbook, trying to figure out like where exactly is a particular key phrase appearing in the textbook. And at that point in time you might have come across an index from the back of your textbook. For instance, if I wanted to find where in the textbook, the phrase nested loop join up, I would quickly go to the section, which has every word starting with the word, the letter N and I would look up nested loop and I will immediately see that it’s appearing between 718 and 722 page in the textbook. And that’s an index for you. In short one can imagine an index is a shortcut to accelerating some of the quiz. So let’s begin with the overview of Hyperspace. We have some pretty broad goals in Hyperspace to begin with our first and primary goal here is to be agnostic to data format. Our second goal is to offer a path towards low cost index meta data management. What do we mean by this? Well, we wanna make sure that all of the index contents, as well as the associated meta data is stored on the link, and this does not assume any other service to operate correctly. Our third goal is to enable multi-engine drop. Four, we wanna enable extensible indexing infrastructure. In other words, what you see here today is more of a beginning than an end. And finally, we wanna make sure that we satisfy all of security, privacy, and compliance requirements. At the bottom most, as you can see here, there is a data Lake and on the data Lake, there are datasets that you already have, or potentially structured like parquet or even unstructured CSV TSV. The biggest assumption as I’ve laid out in my earlier slide is that we assume that the index is also living on the data Lake, both the contents and the meta data. Our first area of investment is the indexing infrastructure. This is pretty important. And you can think of this as the layer that has no intelligence, but it is providing you with raw APIs in order to be able to do everything that you can with Hyperspace. To begin with one of the first pieces of work that we’re investigating is index creation and maintenance EPS to name a few things like log management API. So how do you ensure that you’re able to provide concurrency across multiple engines on the same index? Once we build all of the index creation and maintenance APIs, next comes the complimentary deal of the indexing infrastructure, which is the query infrastructure, this whole work deals with, how do you ensure that an optimizer is index aware? Today’s optimizer in Spark is not really aware of what an index is for instance, right? So as part of this work, what we’re trying to enable is a transparent query rewrite in the background, which is to say if there is an index and we figured out that the index will potentially provide any kind of acceleration, then be transferred into the rewrite this query one of the most elegant benefits that you end up getting with this is users do not have to change a single line of code other than setting a configuration through Spark. And finally, do we want all of this in the manual way? Obviously not. Like I think the holy grail of all of the world of indexing is index recommendation, which takes a user squarely workload and provides them with the recommendation of top gain this is that they can build in order to expect some kind of acceleration in their workload. Let me take a moment to just show you how simple using Hyperspace is going to be. Today, we offer Hyperspace in three languages, one the Scala, second Python and third is .NET. Although this slide shows you the APIs in Scala, the first set of APIs we provide is the usage APIs and as expected Usage APIs you’ll be able to create, delete, restore, vacuum, and rebuilding. The second set of APIs that we offer is the Smart. If you’ve used Spark, then you’ll probably already know about df.explain or dataframe.explain what it does is it takes your query it produces the logical plan, the optimized plan, the analyze plan, and the physical plan. This Hyperspace.explain is built with a similar philosophy. What do I mean by that? It can take your query and it’ll give you a very cool diff between how it has figured out what indexes it has used, and it’ll show you what exactly are the index it has used. How did the plan end up changing? The two other APIs that we plan on introducing in a couple of weeks is the Water API, which allows you to hypothetically test whether an index will benefit your query or not. And the final holy grail of the Smart, which is the recommended API, which will take a workload from you and provide you with a list of top gain with recommendations that you can then go ahead and build in the background. One question that might be lingering in your mind at this stage is as I’m using all of these indexes, where exactly are the indexes getting in creative, right? The cool thing about what I mentioned and laid out in the goals and vision of the project is the index live on the Lake. Why is this important? It’s very interesting because now if you start looking at your file system like HDFS, or maybe ADFS, or even AWSS3, for instance, right? Imagine you have your data laid out as I’ve shown in the bottom. It’s basically just some path to data files. Now, the moment you start creating an index, what you’ll observe is the indexing API create index takes a name for the index and once you provide the name of the index, it creates a folder and the folder contains all of the information in a self-describing way to point back into the original dataset that you have created. Why is this important? You need to capture pretty much every detail of what exactly you have considered on the original dataset at the time of indexing, because you wanna be able to detect if the dataset changed from the time you created the index to ensure that you’re not providing any incorrect results. And I’m sure you must have noticed at this stage, the presence of something called a Hyperspace log, obviously, right? It’s in a different color. So it’s probably something of interest. Hyperspace log is a progress log or an operational log, which is the heart of our entire design. When you initially create an index, it captures the operation create if let’s say the underlying data ended up changing, then perhaps you might want to run a refresh. Once you do a refresh. And the refresh completes what ends up happening is the index is moved back into an active state and there is a pointer that gets laid out in this case, index directory two and index directory three. What you’ll notice is… Hey, wait a second. Why are we seeing index directory one, two and three that’s because we maintain different snapshots and different versions of the index to enable multi reader and multi writer concurrency. And one of the very interesting aspects of having the index on the Lake is it provides several benefits just to name a few, one now index scan scales. The second biggest benefit is our index today is laid out in an open format and that open format as parquet. And one of the major advantages you get, if you’ve been lingering around in the Spark community, then you already probably know that there’s probably close to a decade worth of investment in making sure arcade as a format is highly optimized and you don’t need to worry about it anymore. Whatever optimizations are present for parking, you automatically end up kind of leveraging all of those optimizations as part of the index themselves. And finally, as you can tell, we have enabled a server less access protocol. What do I mean by this? It’s simple. Like Hyperspace does not really depend on any external service. So before we move on to my colleague doing a demo, I’d like to introduce Azure Synapse Analytics, which is a product that Microsoft has released recently. And Hyperspace has built out of box inside Azure Synapse Analytics. So now I switch over to my colleague, Terry Kim, who’s going to show you the end-to-end experience of how Hyperspace looks like and how you can use it in production.
– [Terry] Okay. In this demo, I go over Hyperspace APIs for creating and using indexes. I’ll be using Azure Synapse Analytics, which comes with Hyperspace out of the box. And we’ll share this notebook after the session. Okay, let’s get started. First I’m going to create two data frames, one for employees and one for departments. And here are the contents of the data frames. Employees data frame has employed ID, employee name and department ID. Department data frame has department ID, department name and location. And these are the two data frames I’ll be using throughout this demo. Now that we have data frames to work with, let me create a Hyperspace object, which has methods for creating and managing it exist. Next, I’m going to create few indexes. To create an index you need to have a index config object and specify name, index columns, and included columns. Index columns are the columns used for join or filters, included columns are otherwise used for selecting project. Once you have this index configure object, you can just say Hyperspace that create index and pass the data frame, which you want to build index from along with the conflict object. In this example, I’m going to create one index for employee data frame and another one for departments data frame. And the reason I chose department ID as an index column is that I’ll be joining employee data frame with departments data frame on this column. We also have an API for listing indexes we created. To do that you do Hyperspace, that index is that show and this will display all the indexes that are created. These are the two indexes we just created. And this also has more information about the index such as index column, included columns, index location, and so on. The object returned by indexes is a Spark data frame. So if you want you can perform any data frame operations on it. Now let’s start utilizing the indexes we just created. Before doing that, I’m going to disable broadcast join because the indexes we created are applicable to so much on. This is the join query we will be executing. I’m joining employees data frame with departments data frame on department ID and selecting employee name and department name. And I’ll be showing physical plan and then the results of the join. And this is the output. So this is the physical plan of joining two data frames. So you would see sort merge join and sort dept stage. And this is the result of the join. Now finally, let’s enable Hyperspace, how do we do it? We just say Spark that enable Hyperspace. And what it does is it’s going to inject the Hyperspace optimizer rules and start utilizing the indexes available. And we will be executing exactly the same join query. And this is the output. However, I think it’s a little hard to see what really changes after enabling the Hyperspace. And that’s where the explain API comes into play. So you do Hyperspace that explained and pass the data frame, and it will give you an overview of what changes if Hyperspace is enabled. And this is the output. So this is the output from the join query that we just excluded. And this is the physical plan with Hyperspace on, and this is the fiscal plan with Hyperspace off. And the highlighted portion is the difference. And you see that sort and exchange have been removed from the physical plan with Hyperspace on. And this also lists the indexes that are used, and these are the two indexes we created in this demo. And finally, it also shows you the physical operator stats. For example, what is saying is with Hyperspace disabled, their work to show for exchange physical notes, but with Hyperspace enabled the number of this note goes to zero and is the same for the sort physical operator. Hyperspace also works with sequel comments, and this is equivalent to the join using the data frame APIs. So basically this is joining these two tables on the department ID, and this should give you the same result as expected. Okay, let me switch a gear a little bit and talk about some of the APIs for managing indexes versus delete index. If you wanna delete your index, you can do Hyperspace.delete index and pass being the name of the index you have to delete. This will be a soft delete, meaning that it’s not going to remove the index from the file system. If I look at the output here, you’ll see that the state has changed from active to delete. And once the index goes to delete, the state is not going to be picked up by Hyperspace. And this API is useful if you want to disable some of the indexes temporarily. Next is restore index. So this API is used if you want to bring back that deleted index, let me show you the output. So if you perform restore index, it would change the state of this index from deleted to active. And once the index goes to active state, it will be used by Hyperspace once again, next is vacuum. Vacuum is a hard delete operation, meaning that it will physically remove the index from the file system. The prerequisite is that you have to first delete the index. So this is the sequence. So you first need to delete index and your vacuum index. And here is the output. The index goes from active state to deleted state. And when you do vacuum index, it will be removed from the file system. And last but not least, we have a refresh index API, as opposed we have new employees who were added to our original data. Now that the index and data are out of sync, Hyperspace is smart enough not to use the index because they will give you a wrong result. However, you can use the refresh index API to rebuild the index and once the data and index sites in sync Hyperspace will use the index and that’s the end of the demo, and we can switch back to the slide.
– Thank you so much, Terry. Wasn’t that demo exciting? I’m itching to actually try this out. In fact, I’ll provide you with pointers in the later half of my presentation. It shows how you can try this Hyperspace thing on your boxes today. The covering index, the covering index in other words, creates a copy of the original data in a different sort. So there’s no black magic. Like I mentioned at the beginning of my talk an index provides squarely excavation at the cost of additional rights and storage. So let’s take one example just to make sure that we can make the concept here. Imagine a table having three columns. And you’re trying to execute a query. This query is super simple It says select B where A equals red. The only way to execute this query would be to do a full scan on top of this table. And this full scan would end up taking linear time. Obviously I’m not considering optimizations like min-max, but yeah I’ve tried to dumb it down and simplify just to get the concept across. Now, you can technically build a covering index on column A and include column B. Now, when you re-execute this query that I’ve shown you here select B where A equals red. Now you can resort to doing a binary search and this steaks logarithmic time, while the concept is pretty simple, it dates tremendously in doing things like free to acceleration, not convinced that it works. Well, let’s go into something more complex now. Imagine now you’re joining two different tables, table A and table B on column A and assume that you are running this query, select B, C where you’re joining on the column A from both the tables without an index Spark will decide to shuffle both sides of the table into the same number of partitions. And remember that the data is not sorted. So as part of step two, Spark ends up going and sorting both the sides before it decides to go ahead and do a merge in step three, with an index, with the right index is build the Hyperspace extinctions that we have built, enable the Spark optimize it to pick the right index. And remember the index is already pre shuffled and preselected. So Spark can happily move into step two, which is the merge step. And I’m pretty sure you must have noticed by now that the shuffle got eliminated. And since shuffle is the most expensive step potentially in a query now this query can scale to much larger data sets and potentially run even faster. Now, having learned these two particular concepts, let me switch back to my colleague to show you some of the deep dives into real world complex workloads and hopefully convince you that we can really get some nice acceleration.
– [Terry] Okay now I do a deep dive on two quarries, TCP-DS, which is one of the well-known big data benchmarks. And I show you how much Hyperspace can speed up those quarries. First, this is Query 93, which is a fairly simple query. You are joining two fact tables and a dimension table followed by aggregation. Now let’s take a look at how the execution plan looks in the Spark UI. This is the plan executed by a Spark. And as you can see, this is the join of two fact tables followed by a join with the dimension table, followed by an aggregation. For the first join we see that there were about 140 gigabytes of data being shuffled and sorted. This is a good opportunity to utilize the Hyperspace covering indexes to eliminate the shuffle. Now, the window on your right is the plan executed by Spark with the right set of index is created. As you can see the shuffle and sort have been removed, but the rest of the plan remains the same. Let’s compare the execution time. This benchmark was run with scale factor 1000 and the data is stored in parquet. The execution time has significantly gone down from six minutes to 32 seconds giving you about 10X speed up. Next, let’s take a look at Query 64. This is a fairly complex query with multiple joints and aggregations across about 15 different tables. So let’s take a look at the Spark UI for the executed plan. Let me zoom out a bit to show you the overall plan and sure enough, it’s a complex one. If I scroll down a bit, I see shuffle and sort of about 110 gigabytes of data. 50 Gigabytes of shuffle, 110 gigabytes of shuffle, 50 gigabytes of shuffle and so on after creating and applying the indexes, this is the new plan on your right. I’ll make it a full screen. The big shuffle and sort have been removed here, here, here, and here. This is an interesting scenario where we create an index. If you cannot create a cover index on one side of the join, because it involves aggregations joins and so on, you can just have the covering index on one side if applicable, and it can also eliminate the shuffle for that side. Finally, let’s compare the execution time. It went down from 10 minutes to 3.4 minutes to give you about 3X speed up. So we just took a brief look at the speed-ups in two of the TPCs quarries. Now, while we talk about the improvements we observed across all the TPCs quarries.
– Thank you so much, Terry, as you’ve seen Terry has just demonstrated Hyperspace acceleration on a couple of non-trivial queries. I’m pretty sure the moment he actually showed you this whole plan zoomed out at least I zoomed out. I zoned out in fact. Thanks a lot, Terry. And now you’ve made me even more curious to try this out. Now, as he has shown you deep dive into two queries, let me talk about Hyperspace performance in general. The compute configuration that we’ve used to test this out is on the left for anybody who’s interested in recruiting some of our benchmarks. At the top is a graph that shows a workload evaluation derived from PCT benchmark or in short PCT hedge. The scale factor that we’ve used is thousand and the evaluation was performed using a participant 2.4. We will end up getting support for Apache 3.0 Spark 3.0 subsequently. And remember one very interesting thing. All of this base tables of TPC-H are laid out in parquet. And as you already know, parquet is a highly efficient columnar format. And all of the explanation that I’m about to show you in this workload as well as the next one is on top of partly data. The graph here, the X-axis shows the TPC-H query number, TPC-H has 22 queries and that’s what you see here. And the Y-axis shows the duration that it took to execute that query and the dotted line that you see here indicates the game that we achieved by using the indexes in this case. I want you to observe this and just digest how much gain they’re able to get, right? Obviously you can see a spectrum of gains. You see gains of as low as like about 1.1X, but you can also see gains as high as 9X. Now let’s look at the second workload derived from the same TPC benchmark DS, or in short TPC-DS for brevity we just show the top 20 queries, TPC-DS workload consists of a total of 103 queries. And what you can notice here is a similar pattern. Again, there are no regressions. And in fact, up to 11 regressions to be had. The overall benchmark is really key because it conveys one other important aspect, which is we don’t regress in any of the queries, right? So if you look at both TPC-H and TPC-DS one thing that will immediately stand out is we’re getting a pretty high gain of 2X and 1.8X respectively. I’m also pretty thrilled to announce that we are open-sourcing Hyperspace as of today. I agree it’s version 0.1 it’s a start to recap Hyperspace as an extensive indexing subsystem for Apache Spark. It’s a simple add on, we have not changed anything in Spark core and rest assured you can feel thrilled that it’s the same technology that powers the indexing engine inside Azure Synapse Analytics. And it works out of the box with a purchase part 2.4 and the support for this Apache Spark 3.0 is going to come in in the next few weeks. And we offer Hyperspace in three languages, no compromises. You can choose whatever language you feel comfortable with. Give us feedback. What would you like to see in the upcoming versions And we’ll build it for you. And today you can access you can download, you can build, you can also use Hyperspace from one of these two URLs. They both take you to the same place. Hyperspace is currently on GitHub. You can feel free to explore the code base, give us suggestions in terms of what you’d like to see. Having said that I would also like to take a brief moment, thanking everybody who made us reach this point in time. In this slide, I’m going to discuss a couple of areas that we are planning on investing and probably also areas of opportunities where we could collaborate. The first area is meta data management and lifecycle. Do you like what you’re seeing? Did we capture enough meta data for instance? The second area of investment is indexing enhancements. Well, what we demonstrated today is indexing or immutable data sets or static data. How would we do the same thing for incremental indexing? What are the things that will change? What is the performance implications? How would you add support for things updatable instructors like dental Lake or Uber code, for instance, right? The third area of investment is optimizing enhancements. Like I mentioned, this is a very humble beginning in this space we are by no means experts at optimizers. And this is where we could definitely use your help in making sure that we’re taking the right decisions while optimizing queries. And one of the area of active investment that we’re exploring is quality explain ability. Why did you use a particular index? Why did you not use a particular index? The fourth area of investment is more index types, more on this in the next slide. The first one is documentation and tutorials. Well, indexing is not a silver bullet. And in all honesty, we would like to tell you that indexes are always going to work, but I can’t really tell you that you should take care while using the indices, making sure that they really accelerate your workload. There are lots and lots of questions you need to ask yourself before you go ahead and decide to build an index. This is what we want to be able to capture as part of our best practices, gotchas, and probably what we want to be able to run more reproducible experiments for our community. And finally, the holy grail when my awesome colleague Terry was showing you some of these cool demos, and I was showing you some of the performance evaluation across all of these complex queries. I’m sure one thought must have struck across your mind, which is wait a second how did you decide what indexes to even build in the first place, right? That was done through a prototype that we have implemented over the last few months in the form of an index recommendation engine. It’s quite simple today. What it does is it takes your workload and recommends the top key indexes that you can go ahead and use. It’s not perfect, but we plan on open sourcing that over the next few weeks. Let me answer the final question which is what type of Hyperspace is gonna build together right, today in our talk, we covered one type of a Hyperspace, which is the covering index, but in Hyperspace, it is very important to remember that the index term is used extremely broadly to refer to a more generic class called a derived dataset. In short, a derive dataset as the name states was designed from the original data set, but it aids in providing some hints to optimize it, to accelerate the quick, that’s the simple definition. So what are some other types of Hyperspace that can be built together? We only have coding indexes, but we plan on going and implementing more types like the chunk elimination index, which is something you can use to do pointed look-ups or the materialized views, which has an utterly complex area. But it’s also very, very useful, especially when you have canned queries that you’re running and you have multiple users who are doing this, and finally also provide potentially statistics and maintain them so that a cost-based optimizer can utilize them to provide you with higher query acceleration. With that, I’d like to conclude my talk by mentioning a couple of key things. Number one, today, I’m super thrilled to announce the open-sourcing of Hyperspace, a tech that we have built as part of Azure Synapse Analytics it’s LB version 0.1. It’s obviously not ready for you to use in production. It’s just a humble attempt at giving back something to the community. And point number two, the tech was buried without really modern thing, a single line of Spark core, which gives you some tremendous powers in terms of putting it on your own Spark clusters, without having to worry about deploying Spark itself. And because the tech is also deployed as part of Azure Synapse Analytics rest assured you can feel safe that there’s a tremendous amount of product investment that’s going on in it. And point number three, we’ve demonstrated a very preliminary acceleration on some of the key workloads. And of course, I’d like to emphasize this as more of a beginning we might be making some mistakes. There’s definitely scope for improvement. It’s not perfect, but that’s where we really need your guidance in making sure that we will Hyperspace into something that our users would really love and use in their day-to-day workload. And last but not the least, I do have the URL right here on the right side bottom. If it’s the whole Hyperspace project today is on GitHub, you can try it out you can let us know if you run into any issues, please feel free to open an issue on the GitHub thing. So I on behalf of my entire team, look forward to seeing you folks on our GitHub page and look forward to collaborating with all of you. Thank you so much for your talk. Thank you for tuning in and have a great rest of the day.
Rahul Potharaju is a Principal Engineering Manager at Microsoft’s Azure Data group working on Azure Synapse Analytics. He has led several open sourcing efforts including Hyperspace and .NET for Spark. His work is widely published at top conferences, and has won awards at venues such as SIGMM and TOMM. Previously, he worked as a Researcher in the Gray System’s Laboratory (GSL) at Microsoft. He earned his Computer Science PhD degree from Purdue University in a joint industrial collaboration with Microsoft Research and Computer Science Master’s degree from Northwestern University. He is a recipient of the Motorola Engineering Excellence award and the Purdue Diamond Award. Rahul’s work has been adopted by several business groups inside Microsoft and has won the Microsoft Trustworthy Reliability.
Terry Kim is a Principal Software Engineer at Microsoft’s Azure Data group, focusing on scalability, performance, and query optimization. His current work involves enabling Apache Spark for .NET developers.