Hyperspace is a recently open-sourced (https://github.com/microsoft/hyperspace) indexing sub-system from Microsoft. The key idea behind Hyperspace is simple: Users specify the indexes they want to build. Hyperspace builds these indexes using Apache Spark, and maintains metadata in its write-ahead log that is stored in the data lake. At runtime, Hyperspace automatically selects the best index to use for a given query without requiring users to rewrite their queries. Since Hyperspace was introduced, one of the most popular asks from the Spark community was indexing support for Delta Lake. In this talk, we present our experiences in designing and implementing Hyperspace support for Delta Lake and how it can be used for accelerating queries over Delta tables. We will cover the necessary foundations behind Delta Lake’s transaction log design and how Hyperspace enables indexing support that seamlessly works with the former’s time travel queries.
Rahul Potharaju: Hello everyone. Hope you’re having a wonderful day so far. Today, we will be talking about Hyperspace for Delta Lake. Let me introduce myself and my team members. My name is Rahul Potharaju and today I’m joined by my colleagues, Terry Kim and Eunjin Song. We work in the Spark team at Microsoft. Our work includes a lot of projects that span around the runtime, the service, anything that has to do with debugging and diagnosing production workloads. All of the work we do, when we offer it as a service to Microsoft customers includes a variety of runtimes, including the runtime that powers Azure Synapse Analytics, HD insight, and also Spark on Cosmos. Where possible we also contribute significant amount of code back to Apache Spark Core. For instance, my co-presenter today, Terry Kim has been contributing quite significantly to Spark all the last one year or so and when it does not make sense for the work to land in Apache Spark Core, we open source as an independent. So some examples for this is Hyperspace, which is what we’re talking about today, and also an older project called .NET for Spark, which we opensource roughly about two years ago.
Today’s agenda is quite simple. I’ll be talking about the background, giving you some of the technical concepts. And my colleague, Terry will give you a demonstration of the capability that we have enabled recently in Hyperspace, along with doing a performance deep-dive into one of the TPC-DS queries. So let’s start with the simplest question. What is Hyperspace? Hyperspace is a new extensible indexing subsystem for Apache Spark, which we have open-sourced just a couple of months ago. It is the same technology that powers the indexing engine inside Azure Synapse Analytics and a couple of months ago, we have already demonstrated that it offers accelerated performance on key industrial workloads like TBCH and TPC-DS. It allows you to create and maintain your indexes through a very, very simplistic set of user GPS and we have already seen Hyperspace being used in a variety of scenario. If you never got a chance to try out Hyperspace, we welcome, check it out and talk to us or come say hi on our GitHub site.
And like I was saying, the system was open-sourced at Spark Summit last year, I’ve also included a couple of links here in case you want to check out our year talk from last year or just browse it on what exactly is this project and read through some of the blogs that were published. At this stage some of you might be wondering, “Okay, we’ve already presented at Spark Summit last year. What are we doing coming back again?” So let me explain. When we talked about Hyperspace last year, we received a tremendous amount of positive feedback from both people who have attended our talk at Spark Summit, as well as some of our Microsoft customers. However, one particular question stood out significantly. The biggest top user requests that came through was will Hyperspace work for Delta Lake? Come to think of it, it actually makes sense.
Delta Lake is for a single table and you would decide on the physical layouts of your Delta Lake table depending on the majority query pattern that your production workloads end up seeing. But what will you do if you have more than one majority query pattern. Traditionally, secondary indexing filled this void in the land of traditional databases and we hope that Hyperspace fills the same void for Apache Spark. Without further due, let’s now dive right to understanding how Hyperspace works for Delta Lake. Before we can get into these kinds of technical details, it’s important to understand first, how does index maintenance work inside Hyperspace? Second, if you constantly need to go update your indexes for datasets that are constantly changing, it can become annoying quite soon so in order to circumvent and overcome this particular limitation, what we have done or the last few months is to introduce a novel hybrid scan operator inside Spark.
What it does is if let us say Hyperspace encounters an index that is stale because your underlying data has changed, Hyperspace can switch to this mode called the hybrid scan. What it does is it leverages the index to the extent possible and then performs a linear scan only on the data that got added since the dataset was indexed in the first place. Once we understand index maintenance, as well as hybrid scan, then we will move on to understanding ACID data formats and how exactly Hyperspace works with an ACID data format like Delta Lake.
So let’s briefly look at the index maintenance story inside Hyperspace. For folks who have attended last year’s talk, you may remember that we did not have a very good index maintenance story. All we could do was essentially a full refresh. What this means is that imagine that you have a particular dataset and you ended up building an index. In this case, the data consists of imagine five files, 1, 2, 3, 4, and 5, and you ended up building an index which has its own layout. Now, when you issue a query, the query, assuming that Hyperspace is enabled it, goes out and leverages the index. Now, when you add a new amount of data, 6, 7, 8, 9, and 10, you’d have to do a full refresh in order to go rebuild the entire index so that when you now issue a query, Hyperspace will now decide to use the newer version of the index that got built.
This was the only story that we had as of last year. Now, you can perform two more levels of incremental refresh. The first mode is a slow refresh with the fast query. What it allows you to do is now let’s go back into the same [inaudible] where you have this data along with an index. Imagine I added blocks 6 to 10. What ends up happening at the stages is if you now perform a refresh incremental, then Hyperspace builds an index only on the data that got appended to the original dataset and now when you show a query, Hyperspace is smart enough to leverage both these indexes which span one to three and then four to six. The third, and the finding mode of incremental refresh that we support is called quick refresh. The basic idea behind quick refresh is let’s go back to the exact same scenario.
We have the data spanning blocks, 1 to 5. You have the index. Now, when you do have some newer data that comes in and gets appended to the original dataset, you can now call a refresh incremental with a metadata-only flag. What that does is it updates only the metadata so that when you now issue a query, what Hyperspace does is it’s smart enough to realize that, “Oh, I only have an index which is partially covering the underlying dataset. I will use this index and in addition, I will go ahead and do a linear scan on top of the underlying newly added data.” The way this happens is through the novel hybrid scan operator that hyper Hyperspace introduced recently. Now let’s understand very briefly in terms of how hybrid scan works because it’s critical that we know some of these details, which will then fill up how Hyperspace works for Delta Lake in the first place.
Now let’s take this particular example. Imagine you have some original dataset, blocks spanning 1-6, and this is the initial data set that got created at V1. Now, the user goes ahead and ends up creating an index. Now imagine that in addition to creating the index, just to complicate the whole scenario, I’ll also delete one of the blocks right here, which is 3. Now, in that case, if you consider this particular index I1 that got created, it will span blocks 1, 2, 4, 5, and 6 because the block 3 never existed in the first place. In the third step, imagine that the dataset got updated to version two wherein, in addition to the block 3 that was already deleted, the user goes ahead and deletes block 4. But, notice that I1 was never updated, which means that it will still cover 1, 2, and then 5, 6, and also this block that got deleted. Now, what will happen if a user comes in and issues a query on the dataset at V2?
Well, this is where hybrid scan comes in. What hybrid scan does is it operates in two steps. In step one it computes the diff of the underlying dataset since it was indexed. In this particular case, Hyperspace is smart enough to quickly compute that, “Oh, block 4 got deleted and blocks 7 and 8 got added.” Once it computes this particular diff in step two, it will go ahead and rewrite the underlying table scan as what is called as a hybrid scan. The way this works is, imagine you’re joining table A with table B. Then what Hyperspace does is it goes and replaces this particular piece of the query plan wherein you’re scanning table E but this new subtree wherein it says, “Hey, go ahead and use the index to the extent possible by applying some filters in order to get rid of all the rows that have disappeared because the user has deleted block 4,” and then it goes ahead and shuffles only the small, newly appended data here, which in this case is 7 and 8 and then finally takes a union of the results preserving pretty much the entire plan and also optimizing just the scan on table A to the extent possible as I mentioned.
Now that we understand hybrid scan, let’s revisit some of the ACID data formats that are pretty popular in the open-source community. There are popularly three ACID data formats. One of them is a Delta Lake. There are two other things called Iceberg and Apache Hudi. At a high level all of these systems offer three fundamental properties. One is snapshot isolation that allows readers to use a consistent snapshot of the underlying table and all table updates are pretty much atomic in this particular model. The second aspect is they allow the query engine to perform what is called as distributed planning and pretty much remove any kind of dependency that they may end up having on older metastores like Hive Metastore, for instance. And finally the third and the most fundamental feature that’s offered by pretty much all of these systems in one way or the other is version history, rollback and time travel which allows people to pretty much jump between different versions of the underlying dataset with the practical ease essentially.
So now you might be able to observe in terms of okay, if you want to bring in Hyperspace support for a system like Delta Lake it’s non-trivial because Delta Lake is offering some of these huge features. So let’s take one particular feature of Delta Lake, just [inaudible]. So if you were to understand how time travel works, imagine you have a Delta Lake table and in this table the user has pretty much started adding a bunch of files and every time they add a new set of files, there is a version that is created by Delta Lake. So in this particular example, the user initially started off with one, two and three files. Then they added more files than they deleted them and continuously kept adding more and more until they ended up going into version seven of the dataset. Now, this is a sample piece of Spark code.
If you were to just read this particular Delta Lake, as you would any other data format inside Apache Spark, then by default, Delta will default in version seven because that’s the latest version. Of course, the power of Delta Lake comes when the user ends up specifying ‘versionAsOf’ configuration property right here, wherein, they can now jump back and forth between versions of the underlying Delta Lake table. So in this particular case, the user appends this particular option, ‘versionAsOf’, 2 in order to seamlessly jump into the older version of the dataset and will only see results from these four files right here. Now, if you look at this time travel, let’s understand how Hyperspace ends up working in this particular case. Let’s go back to the same example. We have Delta Lake with multiple versions, which are recorded inside Delta Lake’s transaction log.
Now, imagine this is the timeline for Hyperspace. In this particular case, the way to understand this diagram is right after version three of the underlying Delta Lake table has been created, imagine the user went ahead and created an index and the version of that index is V1 and apparently the user went and changed something else in the underlying Delta Lake table to get it to version five and then imagine that the user did a refresh operation on Hyperspace and so on and so forth. So currently the index is in version three and the underlying dataset is in version seven. Now let’s understand how Hyperspace works and optimizes this for the underlying Delta Lake tables. Imagine a user is querying the Delta Lake snapshot at v3. What ends up happening is this is the simplest use case here.
Hyperspace kicks in and it says, “Hey, I have an index that’s already created here so just go ahead and choose the index at V1.” Life is good. Let’s go into something a little more complicated. Now when the user queries the snapshot at V4, now look at this particular choice that Hyperspace has, right? Well, this comes right after the index was already created at V1 so what Hyperspace does is it chooses to use hybrid scan. How does it use hybrid scan? It basically says that “I will use the index at V1 and then I will go ahead and compute the diff of the files between these two versions of the underlying Delta Lake table and then perform just a linear scan on top of it.” And this is where hybrid kicks in. Finally, let’s go into something more complicated. If the user queries the snapshot at version six, now Hyperspace faces a couple of issues here. One, there are two versions of the index and imagine the user has already built version three, now there are three versions of the index. So what should Hyperspace do? Well, in the current version of Hyperspace is that it compares cost of issuing a hybrid scan over two potential query paths.
One is Hyperspace can either choose the index at version two and then perform a linear scan or the diff of the data files between versions five and six, or it can choose to pick the index at version three and then do a linear scan over the data files resulting from the diff of V6 and V7. Whichever’s cost is the minimum, Hyperspace ends up picking that particular cost and then goes out and executes the query plans essentially. Now with that, I’d also like to call out that Hyperspace, like I said at the beginning of the talk, it’s the same engine that powers Azure Synapse Analytics. It’s exactly the same engin. All of the code is completely opensource on GitHub and we support like three languages, Scala, Python and .NET. And now I’ll hand it over to my colleague, Terry, in order to walk you through the end-to-end experience of Hyperspace for Delta Lake inside a notebook in this particular case, but you can pretty much take the entire code, run it on your own machine, compile the code on your machine itself.
Terry Kim: Thank you, Rahul. So what I have here is Azure Synapse Notebook which comes with Hyperspace outer box and in this demo, I’m going to show you how Hyperspace index works on Delta tables and some of the new features of Hyperspace, such as hybrid scan and incremental refresh. Okay, let’s get started. First, I’m going to create two Delta tables, one for departments and another for employees. Next, I’m going to join these two tables on the department ID so we can see which departments employees belong to. For this demo I’m going to disable the ‘broadcasthasjoined’ to simulate the scenario where these two tables are pretty big. If you look at the plan from Spark UI, we see that each side of ‘join’ has a shuffle stage.
And now, how can we use Hyperspace index to speed up this joint query? One way is to create an index for each table where the index key is the department ID. This will help eliminating shuffle stages before joining. Let’s create these indexes. First, you need to set this config to register a source builder class that knows how to create an index on top of Delta tables. I’m also enabling the lineage which keeps check of the source data because I have some examples where source files are deleted later in this demo. And finally, let’s create two indexes, one for each table on the department ID call. Now that I created Hyperspace indexes, I’m going to call ‘enableHyperspace’ to leverage these indexes in subsequent queries. Let’s now run the same join query with Hyperspace enabled. If you look at the output of Hyperspace that explain, we see that Hyperspace indexes are replacing the Delta tables and also eliminating the shuffle exchanges and sorts on both sides of the SortMergeJoin. Let’s double confirm with Spark UI and we see that shuffle exchanges and sorts are removed. Now let’s add few new employees to the employee Delta table and validate that they are actually added.
I’m going to run the same join query with hybrid scan off. In this case, no index was applied because the index is not up to date and does not contain this new employees. As you can see from the physical plan, the shuffle exchanges are required again for both sides of the join. One way to utilize the existing index is to use the hybrid scan. So in this cell, I’m going to turn it on and run the same join query. We now see that both the indexes are applied, but we also see a new physical operator called ‘BucketUnion’ and only one shuffle exchange has been removed instead of two. So let’s take a look at Spark UI to see what happened. If you look at the department table, it’s replaced with index as expected because there was no change in the table and this eliminates the shuffle exchange and sort. Now, if you look at the employee table, you see that new data is red and shuffle exchanged and bucket union with the existing index.
The bucket union is a new physical operator that Hyperspace injected so that data only in the same pockets are unioned. So this requires shuffling just the new data to match the hash partition of the existing index. Note that the Sort operator still exists because Bucket Union does not handle Sort orders which is something we can improve later on. Hybrid scan is very useful, but the performance may degrade over time as you add more data because these new data will need to be shuffled. This is where an incremental refresh can help. Incremental refresh will create index using only the new data that has not been indexed and this is different from the full refresh which rebuilds the whole index from scratch. Thus, it will be much quicker to perform an incremental refresh than a full refresh. So now let’s do an incremental refresh on the employee index.
If you look at the index log entry, we see that it is referring to two versions of index, V0 and V1. V0 is the first index we created and V1 is the new index created with just the new data. We can also confirm this by reading just V1 index. Now let’s run the join query and see how the refreshed index is being utilized. If you look at the output of Hyperspace that explain, under the ‘Indexes used’ section, there are now two versions of employee index being used, version zero and version one. Let’s also check the Spark UI. As you can see, with the incremental refresh the bucket union and shuffle exchange have been removed. We still have Sort because Spark does not handle sort orders if there are more than one file in a bucket. Another scenario that Delta table supports is an update.
I’m going to update the name of the new employee to an append ‘special’ to the name. If we check the Delta table history, we see that the latest update caused one file to be removed and the one file to be added. Let’s see how Hyperspace handles this update scenario. If you’re on the join query, we see that the updated name appears in the result and the output of Hyperspace that explain shows that indexes for both versions, zero and one, are being used. How is it handling the deleted file in the Delta table? Let’s check the Spark UI. As I mentioned before, the update caused one new file to be added and this part is handling the new file using the hybrid scan. For the deleted file you’ll see a data file ID column is being pushed down as a filter. This is the file that was removed by Delta and Hyperspace filters out any records that belong to this file by simply using this filter.
Note that for this to work, you have to enable the lineage which I did earlier in this demo. One last scenario I’m going to show is the time travel. In this example, I’m going to time travel all the way back to the initial version of Delta table. Remember that we also created the indexes at this version. Let’s check if Hyperspace can handle the time travel by running the same join query. The result doesn’t show new employees as expected and if you look at the output of Hyperspace that explain, we see that the initial version of Hyperspace indexes are being applied, which is version zero and this shows that Hyperspace is aware of the snapshot of Delta tables and will pick the right index to apply. Now, let’s go back to the presentation.
Rahul Potharaju: Thank you so much, Terry. Hope you see how easy it is to use Hyperspace for Delta Lake. Now one of the things which is a conundrum in our minds has been, how do we benchmark the performance? Definitely there is nothing like, “Hey, just go use TPC-DS in order to benchmark”, because this particularly nature of Hyperspace support Lake includes the ability for somebody to append or delete data in their underlying datasets and then seeing how the index performance act changes. Because we do not have any standard benchmarks that are already defined, what we’ve done is we’re trying one synthetic benchmark based on some of the most popular customer scenarios in which we’ve been helping out some of our users. So let me briefly describe the experiment setting right here. So if you’re already familiar with this TPC-DS industry benchmark, there are two tables. One table is called ‘store_sales’.
In this case, what we have done is we have store_sales divided up into 200 file blocks and the format of this particular file block is Parquet and each of these file blocks is 1GB in size. So in total, the size of this particular dataset is about 200 gigabytes. There is a second table that we have taken from TPC-DS, which is the item table, which also contains 200 blocks and each block is again a Parquet, but in this case, it’s roughly about 200 kilobytes. So in total, the size of this particular table is 40 megabytes. What we’ve done is we show you the performance of building in one particular index that can accelerate the TPC-DS query 44. Once we do that, what we do is we simulate a particular workload. The way this works is we append more data into the larger table, which is a pretty much the most common customer scenario that we’ve seen.
In this case we append blocks 201-250, each one again, of the same size of one gigabyte and then what we do is we repeat the performance measure of Hyperspace in two cases. One is without refreshing the index, which will show you how Hyperspace can leverage some of the hybrid scan operators that they’ve introduced in order to still give you a reasonably well performance and the second aspect is in the second case, we end up doing an incremental refresh, and then show you how the performance actually changes. But that I’ll hand it over to my colleague, Terry, to show you the second part of the demo.
Terry Kim: In this demo, I’m going to share some benchmarking numbers when Hyperspace index is used on Delta tables. I have Azure Synapse notebook opened and it’s attached to Spark pool which has 112 course. For this benchmarking, I’m going to use TPC-DS query 44, which uses store_sales and item tables. I’m going to create these two as Delta tables. The size of store_sales is about 200 gigabytes and item table is about 40 megabytes. Next, I’m going to define this function, ‘runQ44’ which I’ll use for the rest of this demo. This function sets up TPC-DS query 44, and it will run the query three times with Hyperspace disabled and three times with Hyperspace enabled and it’s going to return the minimum latency in seconds for each setting. In this cell, I’m setting few Hyperspace related configs to enable hybrid scan and to support Delta tables. I’m also creating Hyperspace indexes for both tables. For the store_sales table, I’m going to create an index on the store ID column and for the item table, the item ID is used as the index column. I chose these configs because there are filters on these columns in TPC-DS query 44.
Now let’s run the benchmark. I pre-read this notebook and we see about 17 seconds when Hyperspace is enabled and about 45 seconds when Hyperspace is disabled, giving you more than 2x speed up and one typical use case of using Delta table is appending new data. So let’s simulate this use case by adding new data to the store_sales table. I pre-generated new store_sales data and here I’m appending about 10 gigabytes of Parquet files which is about 5% of the original sales data and when we run the query 44, you will see that using Hyperspace index helps achieving about 1.7x speed up thanks to the hybrid scan. I’m going to append another 10 gigabytes of data to the store_sales table and run the query and we still see a similar speed up when Hyperspace is enabled.
I’m going to repeat the same until about 50 gigabytes of new data is appended which is about 25% of the original data and as we can see, we still see a good performance game. But, now that 25% of the data is not indexed, can we do better? Let’s use incremental refresh so that only new data is indexed. If you run the benchmark after the incremental refresh, we now see a much bigger performance gain of 5x speed up. Finally, this is the graph plotting all the numbers discussed in this demo. Blue bars represent the core latency when Hyperspace was enabled. Whereas orange bars are for the case where Hyperspace was disabled. The x-axis represents the sequence of scenarios in this demo, starting with no data change, adding new data, and finally incrementally refreshing the index. From this graph, the benefit of using Hyperspace index on Delta table is pretty clear. Now, let’s go back to the presentation.
Rahul Potharaju: Thank you so much, Terry. Hopefully that could show you how Hyperspace can be used in some of your workloads to accelerate some of your production workloads, without really thinking too much in fact. So again, we welcome you to check out Hyperspace. Definitely, it depends on your current workloads in terms of whether the Hyperspace works or not so if you run into any issues, please feel free to open any issues on GitHub and our team can actually help you fine tune some of the indexes that you’re actually building. With that I’d like to take a moment to conclude. To repeat, Hyperspace is an extensible indexing subsystem. It’s a fairly new system in the world of Apache Spark. It’s less than an year old. It’s the same technology that powers the indexing engine inside Azure Synapse Analytics and we have taken ultra care to ensure that it works out-of-box with open-source Apache Spark.
And it works with a language of your choice, including Scala, Python and .NET and we’ve shown that Hyperspace works on key industry of workloads and gives you pretty reasonable performance. In this particular case, we’ve discussed this in our last talk, Hyperspace already offers you 2x and 1.x speed up on TPCH and TPC-DS derived industrial workloads. This is considering that Hyperspace is being used on commodity hardware without any hardware acceleration, essentially. So we’ve truly enjoyed building this. Hope you also enjoy using this in production, and please do let us know if you run into any issues and that team is pretty widely open to any contributions that you’d like to make and it’s currently the entire source code is open-source. With that, thank you so much for attending this talk and have a wonderful rest of your day. Thanks.
Rahul Potharaju is a Principal Engineering Manager at Microsoft’s Azure Data group working on Azure Synapse Analytics. He has led several open sourcing efforts including Hyperspace and .NET for Spar...
Terry Kim is a Principal Software Engineer at Microsoft’s Azure Data group, focusing on scalability, performance, and query optimization. His current work involves enabling Apache Spark for .NET ...
Eunjin Song is a Senior Software Engineer at Microsoft's Azure Data group working on Azure Synapse Analytics. She is working on Hyperspace, an indexing sub-system for Apache Spark and previously worke...