Data Distribution and Ordering for Efficient Data Source V2

May 26, 2021 11:30 AM (PT)

Download Slides

More and more companies adopt Spark 3 to benefit from various enhancements and performance optimizations like adaptive query execution and dynamic partition pruning. During this process, organizations consider migrating their data sources to the newly added Catalog API (aka Data Source V2), which provides a better way to develop reliable and efficient connectors. Unfortunately, there are a few limitations that prevent unleashing the full potential of the Catalog API. One of them is the inability to control the distribution and ordering of incoming data that has a profound impact on the performance of data sources.

This talk is going to be useful for developers and data engineers that either develop their own or work with existing data sources in Spark. The presentation will start with an overview of the Catalog API introduced in Spark 3, followed by its benefits and current limitations compared to the old Data Source API. The main focus will be on an extension to the Catalog API developed in SPARK-23889, which lets implementations control how Spark distributes and orders incoming records before passing them to the sink.

The extension not only allows data sources to reduce the memory footprint during writes but also to co-locate data for faster queries and better compression. Apart from that, the introduced API paves the way for more advanced features like partitioned joins.

In this session watch:
Anton Okolnychyi, Software Engineer, Apple



Anton Okolnychy…: Welcome everyone, my name is Anton and today I’d like to present a new addition to the data source [inaudible] API that allows connectors to control the distribution and ordering of incoming data. While the [inaudible] may sound specific to folks who build and provide their own connectors, it is actually very important to data engineers that leverage those connectors as well. Distributing data and ordering data correctly will save you a lot of resources and time.
A few words about me. I am Apache Iceberg PMC member and Apache Spark contributor at Apple. My current focus is on making database reliable and efficient at scale, which requires not only a refining [inaudible] but also a table [inaudible] coordinates. Apart from that, I am a big fan of open source. I truly believe that’s the only way to build industry standards. So if you happen to have a topic or a question that is related to the work that I do, feel free to reach out to me as well, I will be glad to discuss that with you.
Today I’ll start with the motivation behind the new data source, [inaudible] API. Then I’ll move on to the main part of the presentation, where we will talk about why data distribution and ordering are so important and how connectors can control them. And at the end, I’ll [inaudible] some remaining [inaudible].
All right, so what’s actually wrong with V1 API? Why do we need another data source [inaudible]?
And first of all, there are reliability problems, and specifically the behavior of the DataFrameWriter is not properly defined. And [Spark] delegates a lot of the decisions that should be made by Spark, to connectors and unfortunately, different connectors tend to behave differently under the same circumstances. For example, what is the override save mode supposed to mean if there is an existing table? Some connectors perform partitional rides, maybe dynamic, maybe static. Some truncate the table and add new data. Some actually jog the table and create a new one which potentially [inaudible] schema and another partition in spec. Learning those differences and memorizing is really frustrating for the user. To make things worse, Spark internally has a number of logical plans for more or less the same commands, which makes meeting the right task pretty hard.
The last points also contributes to another problem. That delegation rules are not always consistent because you have so many logical plans, so different rules capture different plans. Also, there is no schema tracks at all so writing to task based tables. This means you can right whatever you want and Spark will never complain, unless you tried to read the data back. Of course, in this case, Spark will try to make sense of the data that’s available and that location and will fail if it doesn’t make any sense.
Apart from reliability concerns, connectors have to interact as if [inaudible] internal API is like SQL context of RDPs. This makes the evolution of Spark internals a bit hard or pretty inconvenient.
It is also pretty tough to add new features to [inaudible] API. If, for example, you want to add limit or aggregate push down support, which is something that’s been discussed for years in the community, you will have to do probably a lot of [inaudible] printed filter scan. The same applies to exposing the [inaudible] API because existing code depends a lot on [inaudible] rows.
Finally, some of the features are simply not available. Right? You have neither structured streaming nor multi-catalog support and support bucketed tables is extremely limited because you can use only the built-in [inaudible] functions part and you cannot leverage… You cannot vantage from joint optimizations if you have any other type of bucketing. So if you have high bucketing or iceberg bucketing, you cannot really [inaudible]. To summarize, the V1 API is really popular. It’s stable and works really well in simple engineering use cases. But if you want some advanced functionality, that’s where you want to consider migrating into the V2 API.
Due to all of those issues that I’ve just described, the community has decided to offer a new V2 API, which we’ll cover next. And first of all, Spark ensures that all V2 connectors will have predictable and reliable behavior. For example, instead of passing the save mode and letting connectors [inaudible] it, Spark will now use the new catalog API to check that the given table exists and [inaudible] the same validation rules independently of the underlying connector implementation. So that way, you always have consistent behavior in validation for all V2 connectors. Also, a lot of community effort went into designing and stabilizing logical [inaudible] plans, so hopefully it should be a lot easier to evolve and support the new ride pass. Plus, if you’re doing [inaudible] commands to see what actually got executed, you should probably see more detailed… Write down more accurate logical and physical plants, which would make more sense for you.
There are also proper instructions in place so you no longer interact with any internal APIs like SQL context, or RDD you just interact with internal row and column [inaudible] and if you connect your specific interfaces. Also, instead of relying on overloading, there are mixed in traits that you can implement to indicate Spark that you support specific optional functionality.
The new API also supports multiple catalogs per session, essentially enabling catalog federation. So you finally can [inaudible] to multiple [inaudible] in a single job. That’s something that is a highly anticipated feature. You also have a full structured streaming support, so you can provide custom sync and source. Then new APIs also allow you to leverage the vectorization so you can speed up your [inaudible] by exposing columning or batches instead of rows. There’s also a very interesting [inaudible] that I would really love to see done is around partition drawings, which should enable join optimization for high pocketed tables or [inaudible] bucketed tables or for actually any type of bucketing that you might have.
Okay. So hopefully I have convinced you to consider migrating with the new API by now. Let’s move on and talk about data distribution ordering. And I’ll start by clarifying what those terms actually mean.
As we all know, Spark has multiple tasks in [inaudible] job and distribution will define how the data is split between those tasks. Let’s consider an example where two tasks basically could contain records for three years, for 2019, 2020 and 2021. If I ask Spark to cluster this data, which is one way to distribute it, Sparkle will shuffled this data around so that all records for a given year will be in the same task. In the provide example, both original tasks contain records for 2019. And after we did clustering, all the records ended up in task one. It is really important to emphasize that distribution barriers know very easily about the order of the data within tasks. So we know we’ve sampled records for year 21, 20, and.. Yeah, essentially for 2021 are not next to each other, they are in random order. And distribution basically ensures that they are in the same task, but it has no guarantees that they will be next to each other [inaudible] in that task.
Ordering, on the other hand, controls the order of the data within tasks. If I ask Spark to order the data we have in the previous example, it will sort the records but it will not sort them across the tasks. Essentially, ordering does not change the set of records in a task, but it can be aware of them. In the provided examples, both original tasks contain records for 2019 and that still applies after the ordering. It’s pretty much the same, except that the records are now ordered by year. Okay. So you see that distribution ordering [inaudible 00:09:51]. That’s why you would probably need both if you trying to optimize your [inaudible 00:09:55].
[inaudible] question would be like why would anybody actually care about these two? And as it turns out, these two aspects have a profound impact on the overall table compartments. If you’re working with the built-in bias words, or if you’re working with iceberg, then data distribution and ordering well impact the number of generated files. If you use column [inaudible] like [inaudible] then [inaudible] even in worse situation as [inaudible], for example, has to buffer the incoming records before they are written to [inaudible]. And buffering requires more effort. So if you don’t order the data correctly and don’t distribute it correctly, then your clusters will consume more memory. That’s why I believe that doing data distribution ordering is really essential for the right compartments. In many cases, doing a bit of extra work before starting the actual write can be very beneficial. Let me give you a couple of examples.
Let’s say we have a job that is a thousand tasks and each task contains records for day one to day one thousand and our destination table is partitioned by date, right? And without proper distribution, each Spark task may have data for every single partition. This is essentially what you see on the slide right now. This means that each task will have [inaudible] a thousand partitions producing one thousand files with one record each. Totally, this will be one million files per batch, and this will put a lot of pressure into line [inaudible] line storage. And if you’re working with iceberg, this will be a lot of pressure on the iceberg table [inaudible 00:11:58]. If you run this for a month straight or for a year, this will be a lot of [inaudible 00:12:03]. That’s why you would need to aggressively compact these tables and [inaudible 00:12:09] your own files. And this will end up consuming way more resources and time. And by doing something [inaudible 00:12:16]. [inaudible 00:12:18] we cannot really immediately discard the old files as some older readers maybe pan them down so you will still need to keep them around for some time, essentially doubling the storage cost.
At the same time, if you distributed your records correctly, you would have prevented this issue altogether. And the example here, we distributed the data perfectly so that each task has just records for a single day, meaning that each task will write one new file and this will be over one thousand files per batch, which is a substantially less pure files than basically one million files.
Similarly, the order of the data is quite important. Let’s consider iceberg in the file source as examples. In iceberg, whenever the task is writing data, it will keep only a single open file at the time. And whenever iceberg sees records for the next partition it will close the current file and open a new one.
In the example, that you see, the data within the task is in random order. So you have the record for day one followed by the record for day two, and then record for day one again. Under this default behavior, in the worst case, as iceberg will produce four million files of where each file will contain just a single record. Iceberg is smart enough to provide this, and if you try to do this, it will actually [inaudible] exception, but I just wanted to convey the point and show this extreme use case. You can also enable the panel writer, which will keep all files open until the task is done. Do that at your own risk because this will potentially consume a lot of memory because for every single file, the memory consumption will be so essential to be able to work in this [inaudible] like in the worst case, it’s proportional to your row group size.
That’s why if you spend a bit of time and order the data within that task, then you would have prevented this issue from happening. Ordering the data within a task is extremely cheap, right? So you don’t shuffle the data. That’s why I always advised people to do at least this optimization so that you can keep only a single file open and also write the minimum number of files.
[inaudible] from writes data distribution in order, it will also impact your reads. If, for example, the built in file source supports skipping of pro groups in parquet using min-max statistics. By doing a bit of extra when writing, make sure that min-max statistics is actually usable. [inaudible] you can skip those [inaudible]. In iceberg it’s even more critical because iceberg supports file filtering using min-max indexing. Therefore, by doing that global [inaudible] write [inaudible] make sure you can skip files and just limit the scope of your operations to a small subset of files. That’s why I highly advise old iceberg users to make sure they configure a proper [inaudible] in their table. And iceberg will persist that sort key and thus will highly impact not only your reads, but also operations like [inaudible].
We’ve seen the examples where petabyte scale tables were queried in five or six seconds overall time because distribution and the indexing actually allowed us to do that. Also, data distribution and ordering will impact the storage footprint you have in particular column [inaudible] perform way better on sorted data. So if you sort your data globally, then you increase the chances of using dictionary or [inaudible] in coding, which will reduce storage size. This [inaudible]. Also, dictionary in coding is extremely beneficial for your queries, because if you’re looking for a given set of keys, you will be able to check the dictionary and you will immediately know if you need to read that row group or not. And this will allow you to skip irrelevant row groups.
Okay so we’ve covered by data distribution and ordering are so important. Now the question, how do [inaudible] control this, right? And since [inaudible] connectors [inaudible] data frame [inaudible] write, they can apply arbitrary transformations. And built in connectors, for example, store data within tasks using partitioning and bucketing columns to avoid the issue that I just described.
We do connectors and collection. We have no way to control it as it stands today, which leads to severe performance issues, unless it’s explicitly handled by the user, which is rare, right? But that’s kind of expected. So connectors themselves have to be smart enough to figure that out for the user. And I know a lot of developers who are blocked in their migration to the V2 API because of the lack of functionality, but I’m also happy to announce that it’s fixed and mastering. It should be available in [inaudible] and in Spark 3.2 internally at Apple, we’ve been running this in production for quite some time. We’re extremely happy with it.
Let me spend a couple of minutes and talk to you about the solution we came up with and [inaudible] solution we considered a few use cases that are really important to address. Specifically if your use case permits then connectors should be able to [inaudible] the global sort and write to properly master the data within the incoming batch. While doing global sort and write speeds up [inaudible] and also ensures better compression, it also takes more time and needs more resources [inaudible] to perform a range based shuffle. Range based shuffles are expensive as they require a skew estimation step, which is a separate Spark job that allows Spark to determine distribution ranges and also avoid struggle with tasks. Range based shuffles in skew estimations in particular are extremely important, but the account was bad. They also introduced a better performance panel.
If the cost for the skew estimation is too high, then connectors can cluster the data by time expressions instead and then sort the data within those tasks. This option will trigger [inaudible] shuffle, which is considerably cheaper than the range baseline, but it also is not as scalable as as it can introduce hotspots. API data is not distributed evenly. So be really careful and if you can survive the global sort, I would definitely recommend doing that.
And finally, if your use case requires writing data as quick as possible, then we can do a local sort. Usually that performance [inaudible] is negligible, but if you definitely need to write data as is, you should be able to do that as well. In order to support this functionality, we had to modify the Writebuilder interface. So previously it used to report [inaudible] that constructed batch and streaming rights. Right now it will build the logical write and that logical write will [inaudible] into a batch [inaudible] streaming right [inaudible].
And if you needed specific distribution or ordering of the data, then you can implement the price distribution in ordering interface where you can instruct Spark which distribution you want for your dataset and what are the expressions you want to the order data by.
With respectively supported distribution [inaudible] basically support the same distributions Spark has internally. You can request for an order distribution that’s most expensive option that will do the skew estimation to continue the ranges. You have cluster distribution, which will trigger the [inaudible] shuffle [inaudible] skew estimation. So again, be careful with it and not introduce any hotspots. And you also have unspecified distribution, which is the one where you say, “I don’t really care how [inaudible] distribute it.”
And in order to indicate how you want data to be ordered, you will return and re-export order expressions. You don’t have to implement these expressions yourself. [inaudible] that allow you to construct objects of [inaudible] where you basically say, “Hey, I would like my data to be ordered by this expression. I would like it to have these directions. Now order it.” Let’s talk a bit about the current state. As it stands today, its functionality is fully implemented functional in master for batch queries. Structured Streaming support is still in progress. [inaudible] and results in open PR. Structured Streaming is a bit broader because it hasn’t moved properly [inaudible 00:22:52] to API internally. So we don’t have all the logic [inaudible 00:22:57] we need at this point. So it is a bit harder, but I’m still hoping we can have this done by Spark 3.2.
There are also a few interesting remaining features we would like to have. And if you’re interested to collaborate, feel free to reach out to me. Internally at Apple, we allow our customers to indicate what distribution ordering they want to have and to create table statement. In iceberg, for example, we’ll be able to support that by persisting the sort key and demand data. We found this extremely useful and it is supported by a lot of [inaudible] data here [inaudible]. It would be really nice to support this feature in inspire. Also, there is an ongoing effort to control the number of shuffle partitions. In some of the use cases, it’s not enough to say how you want data to be distributed. You actually want to control the number of [inaudible] partitions it has. So there is an ongoing effort to support that as well. Apart from that, there is an effort to leverage that [inaudible] to call as the partitions [inaudible] invites. This will require a bit more effort because the [inaudible] doesn’t actually keep the same distribution, but this will be also very helpful so that you can not only distribute the data correctly, but you can also write reasonably sized files.
Okay. So I’d like to finish with a few key takeaways I want you to remember. First of all, consider migrating to Data Source V2. If you need more advanced features or if you want more liability, I think it is mature enough so that you can [inaudible]. Also, never underestimate the importance of data distribution in ordering. This is [inaudible] scale and this will have a profound impact on your writes, reads, and story footprint. Don’t forget to review and rate sessions. The feedback is important to us. And thank you very much for your time today.

Anton Okolnychyi

Anton is a Spark contributor and a Software Engineer at Apple. He has been dealing with the internals of Spark for the last 3 years. At Apple, Anton is working on an elastic, on-demand, secure, and fu...
Read more