Spark SQL Beyond Official Documentation

Download Slides

Implementing efficient Spark application with the goal of having maximal performance often requires knowledge that goes beyond official documentation. Understanding Spark’s internal processes and features may help to design the queries in alignment with internal optimizations and thus achieve high efficiency during execution. In this talk we will focus on some internal features of Spark SQL which are not well described in official documentation with a strong emphasis on explaining these features on some basic examples while sharing some performance tips along the way.

We want to demystify Spark’s behavior in various situations where the documentation does not provide sufficient explanation. The content is based mostly on the knowledge gathered from studying Spark source code and on our experience from our daily data processing. We will talk about topics and pitfalls that we encountered and solved either in our real-life queries or when helping the community by answering questions on stackoverflow. The talk is intended for anyone who wants to learn how Spark SQL works under the hood and how to use that knowledge to achieve better performance of Spark queries.

Speaker: David Vrba


– Hello everybody and welcome to my presentation in which I will talk about some features of Spark, which I believe are worth understanding and which are also not so well-documented. So let me first briefly introduce myself. My name is David and I work at the Socialbakers as a machine learning engineer, and here we are developing or building data Lake on a terabyte scale and for this purpose we are developing ETL pipelines in Apache Spark and they are also a running or deploying other models on top of Spark to compute various enrichments for all data. I am also a Spark trainer in the last couple of years. I’ve trained several teams, mostly data engineers and data scientists and I also recently started a blog on a medium about I now continuously publish articles on various topics about Spark. So let me now briefly also explain what is the purpose and goal for this presentation? So the goal for this presentation is pretty much knowledge with sharing. I would like to share with you some details from what we have learned about Spark by like, from experience and also by studying its source code when using it for the past couple of years, pretty much on a daily basis. And it is kind of a free continuation of my previous talk, that I present it last year on a Spark summit in Amsterdam. There I talked about a physical plans in Spark SQL. And I literally want to go beyond the official documentation because I feel that some features of Spark, especially when it comes to internal behavior are not so well-documented and they are not so obvious and people frequently ask questions about it, which I can see not only from the trainings that I teach, but also for example from the questions that people ask on stack overflow. So today I would like to cover two topics. The first topic statistics in Spark, and the second topic is saving data in sorted state file format. So the first, let me go ahead and start talking about the first topic which are statistics and here I would like to explain how you can actually see them and how they are computed under the hood and how is a Spark using them during creative execution and also what is good to be careful about. So let’s talk about statistics. If you want to see statistics in Spark for your table you first need to compute these statistics by running analyze table, table name, compute statistics. And if you do that then you can actually see your statistics by coding this a SQL statement describe extended table name. And then you will see this little table in which it is saying some information about your statistics. Here as you can see, there are two metrics available sizeInBytess and the rowCount. So these are so-called table of statistics and then there are also column level statistics which you can see if you call describe extended table name and column name. And if you do that, then you will see this little table for your column which is a saying also like some statistical information about this column, like minimum value maximum value, number of null values and so on. And here as you can see, in my case, all these values are null and this is because I haven’t computed my statistics in advance so therefore they are not computed and they are now. And that is one more option how you can actually see your statistics for your table and not just for your table, but also for the query that you are running. So this is a new feature in Spark 3.0, in which you can call, explain on a data frame and provide this argument called mode. So if you specify the mode to be cost then you will see a query plan then you will see in a physical plan and from physical plan, also optimized logical plan which is going to contain the information about statistics. And you will see that for each note in your query plan and you will see how old is the statistics gets propagated through the plan. So let me now explain more how this statistics propagation actually works. So here we can see a simple example of a query plan it starts with relation, which is a leaf node and then there are a couple of other operators and each leaf node is always responsible for computing statistic somehow and later on, I will show you how this actually is done under the hood and then after the statistics are computed, they are propagated through the plan from one operator to another. And there are basically two ways how the statistics can be propagated. So let me show them the first way is rather a simple way and a simple and basic in which Spark will propagate only one statistical information which is sizeInBytess. And this statistical information in this size in the bytes is propagated in a very basic way because some operators don’t adjust the value at all. For example, filter, doesn’t adjust the value as you can see from this a simple example that I have here I have a query here in which I filter for all records that the user ID is negative. And I know that there is no such a record but as you can see, Spark doesn’t know it because the sizeInBytes on the relation operator is the same as the sizeInBytess on the filter operator namely 150 megabytes in my case. So in other words we can say that Spark believes that after applying the filter the sizeInBytess is not going to be changed. And then apart from this rather simple propagation there is also a more advanced propagation which is available since Spark 2.2 and it also requires for the cost-based optimizer to be enabled. So if you want to use it you need to enable the cost-based optimizer because by default it is disabled and it is the case still in Spark 3.0. So if you want to use it you have to change this configuration setting, spark.sql.cbo.enabled and you have to set it to True, and then it also requires for your statistics to be computed in metastore. And if this is the case, then as you can see more statistical information can be propagated through the plan. Here, in this example, we can see that Spark propagates sizeInBytes and also the rowCount. However, we can also see that these statistics are again not adjusted on the filter operator because the sizeInBytess is 22 megabytes on the relation operator and it is also 22 megabytes on the filter operator. So the reason for that is that if you want for the filter operator to adjust these statistics Spark needs to compute so-called selectivity for the filter. And it requires a column level of statistics to be computed because Spark needs to have some statistical information about the column by which you are filtering the data. So we can simply go ahead and compute the statistics for the column, by running analyze table table name, compute statistics for columns and then you specify the column name which is user ID, in our case. And then if you go and see your statistics, you will see also like the minimum value, maximum value for your column and in our case, the minimum value is 22. So now Spark has the information available that there are no records with negative user ID, and we can see it in the query plan now Spark actually adjusted the statistics the rowCount is zero and the sizeInBytes is 1 byte. So now we have seen like these two possible ways how Spark can propagate these statistics through the query plan the first one is the basic way in which Spark propagates only sizeInBytes in a rather simple way and the second more advanced way that’s easy that Spark can actually propagate more information and in a better way. Let me now show you how Spark can actually compute the statistics in the first place before they get the propagated through the plan in the relation operator, and there are pretty much three ways, how this can be done. The first way is that Spark will take these statistics from the metastore. The second option is that Spark will use the so-called in-memory file index and under the hood it will use the Hadoop API to guide the sizeInBytes for each file from which the data source is composed and then it will sum up these sizeInBytes to create the complete sizeInBytes for the data source. And in this case, Spark will actually compute only this sizeInBytes metric. And finally, there is this last option in which Spark will use and default value for the sizeInBytes and the default value is given by this configuration serving Spark SQL default sizeInBytes and the value of this configuration setting is 8 exabytes. So this means this is like the default situation because here Spark is going to overestimate the size for your table as much as possible. And so which one of these three options takes place can be described by this diagram. So let me now explain how you can read and interpret the information in this diagram. It is a tree and each note in this tree is a condition and T stands for true, and the F stands for false and the leaves in this tree tell you how Spark will compute the statistics in the Leaf Node. So for example, in-memory file index means that Spark will use the Hadoop API under the hood to compute only the sizeInBytes. Then it starts from N means that the statistics, it will be taken from metastore and there are actually two options how it is going to happen. On the left side, there is a node in which or there is a leaf in which Spark will take all the statistics from metastore. And on the right side of the tree there is a leaf in which Spark will take only sizeInBytes from the metastore. And finally default situation it’s down here in the catalog file index leaf in which Spark will overestimate the size for your table and we’ll use the default value. So let me also explain what are these conditions in this tree. So the first condition on the top, is the catalog table. So here it depends whether or not you access your data as a table So whether you call Spark table or not, then the second condition is whether your cost-based optimizer is enabled or disabled, and then the next condition is whether you have computed analyze table in advance or not and finally, the last condition is whether your table is partly shut or not. So now the best situation is on the left side so let me now show you how you can get that. So first you need to access your data by according Spark table, and then you need to have your cost base optimizer enabled and finally you also need to have your statistics computed in metastore in advance by running analyze table. And in that case all your statistics will be taken from the metastore. On the other hand, let me now explain how you can get through the default situation which is down here on the left side and it is kind of interesting because there are two paths that lead to the situation and it actually doesn’t matter whether you’re a cost-based optimizer is enabled or disabled. So no matter whether it is on or off what matters is whether you have computed your statistics in advance by running analyze table. So if you haven’t computed your statistics, then you will end up in this last condition, which is a table partitioned. And now it depends whether or not your table is partitioned and if the table is partitioned, then you will end up in default situation. So if I can say again, if you didn’t compute analyze table in advance and your table is partitioned in that case, Spark will overestimate the sizeInBytes for your table by using the default value 8 exabytes. On the other hand, if your table is not partitioned and then the situation is actually not as bad because Spark can still compute the sizeInBytes using the modified indexed. So I believe that it is not intuitive and obvious at all that the table partitioning actually has impact on how Spark computes the statistics in the Leaf Node. And let me now, show this on a particular example. So here on this slide, I have two query plans and both of them correspond to a situation in which we haven’t computed the ANALYZE TABLE in advance. So the statistics are not computed in metastore. And now the first query plan corresponds to a situation in which a varied data from a table that is partitioned. And as you can see, the sizeInBytes is 8 exabytes. So this is the situation in which Spark overestimates the size for your table. On the other hand, the second query plan corresponds to a situation in which we actually read the same data but now we read it from a table that is not partitioned. And now, as you can see, Spark is able to compute the sizeInBytesytes to be 150 megabytes. So the point here is that especially if your table is partitioned, then it actually makes sense to compute ANALYZE TABLE to avoid these situations in which Spark will overestimate the sizeInBytes for your table. And now we have seen how Spark actually computes the statistics in the Leaf Node and possibly how they are propagated through the plan and let me now also explain how Spark is using these statistics under the hood during query execution and how Spark makes decisions based on that. So, there is this joined selection, strategy in which, there is this logic implemented here in this strategy Spark has to decide which joining algorithm will be used for joining your data friends. And there is this logic implemented that if one of your tables is small then this configuration settings spark.sql.autoBroadcasthJoinThreshold which the default value is 10 megabytes. So if one of your tables is small than this value, then Spark will broadcast this table. So Spark will use the broadcast-H join, instead of sort-merge join because the broadcast-H join is much more efficient than the sort-merge join. If the broadcast, if the table is very, very small. And so now you can see that if you have like a lots of queries in which you have joins, and some of your tables in these joins are very small and passively partitioned then it really makes sense to run ANALYZE TABLE in advance, to avoid the situation in which Spark would actually overestimate the sizeInBytes for your tables. And there is the other situation, individually statistics are used which is this law called joinReorder and this law is by default disabled. So if you want to use it you need to change this configuration setting, spark.sql.cbo.joinReorder.enabled, and you have to set it to True because by default it is false and this rule is going to compute an optimal configuration for your joins, if you are joining more than two tables. Because the order of the tables in the join actually matters and the Spark will try to find the best configuration for you. So this is about, I had prepared for the statistics and let me know move to the second topic which is saving data in a sorted state to a file format. And the here I am going to first show about our kind of functions we have available in a Spark SQL for sorting. And then I’m going to show you how you can save data in a sorted state which might sound trivial. However, it is a little bit tricky as we will all see. So when it comes to sorting in Spark SQL, we have three functions available. There is orderBy or equivalently sort, then there is sortWithinPartitions and finally that is sortBy. And this sortBy function this can be used together with a bucketBy when you are doing bucketing. It can not be used visa with bucketBy because this function sortBy is going to make sure that your buckets will be sorted. So you have to use it together with bucketBy and then you need to save your data as a table. On the other hand, orderBy and sortWithinPartitions are data frame transformations. And they don’t really care how you will save your data whether you save it as a table or not. And so they are data frame transformations and orderBy is going to make sure that your data is globally sorted. So then called on an action Spark will actually run on the background another job in which it develops sample that data. and it will try to estimate the distribution for the column by which you are sorting. And from this distribution it will compute the boundaries for your partitions and then your data will be actually shuffled, a big shuffle will happen and the data will be re-partitioned according to this computed partition boundaries. So it is a rather expensive operation. On the other hand, the sortWithinPartitions this transformation kind of assumes that your data is already distributed on the Spark cluster in the way how you need it. So it will not shuffle the data it will simply sort each partition. And let me now, show you a practical example so we can see how this can be actually used in practice. So let’s see this example in this example we want to first partition our data to the first system by the column year and then we want to have each partition, sorted by a column user id, and we also want to have exactly one of file better, fastest than partition. and so this file should be sorted by user id. So this is our assignment and let’s now see how you can actually compost a credit that will do this. So in this example, we will first probably want to re-partition the data frame by the column year, because this is going to allow us to control how many files will be created in the file system. We are kind of trying to achieve the same distribution for the data on the Spark cluster as is required for the file system. And if we do it like this, if you re-partitioned by year then we will end up exactly with one file, better, fast system partition. And then I’ll will call sortWithinPartitions by the current user ID, because we want to have each partition sorted by user id and then have you equal, write and then we can specify partition by year and finally call a save or saveAsTable. And the point is that if we do it like this it is not going to work. It is going to save the data of course but your files will not be sorted by the user id column so you can go ahead and try it and then check your files and you will see that they are not sorted by the user id column. And the reason for that is that Spark actually requires this ordering. When saving the data to a file format Spark requires for the data to be this ordering, partition columns, bucketing ID expression and sort columns. And if this ordering is not satisfied in that case Spark will actually forget your sort and will sort the data again when saving it down. So let’s now see how this actually relates to our example. So in our example we don’t have bucketing so a bucketingidExpression and sort columns don’t apply actually. And so the required ordering will be only the partition columns, which is the column year. On the other hand, the actual ordering is a user id because we sorted the data by user id and we sorted each partition by user ID. So as you can see the required, ordering is not satisfied because Spark, it requires for the data to be sorted by year, but it is sorted by user id. And this is exactly the reason why a Spark will forget your sort and you’re sort it again by the column year when to saving the data down. So this is not intuitive at all, it is not obvious However, fortunately, we can fix this simply. We can simply add the column year to the sort within partitions function and we can sort the data by two columns. So if we sort within partition first by the column year, and then by the column user ID, and what is going to happen now is that the actual ordering will be year and to user ID, the required ordering stays the same It is the partition column year. And now as you can see, the required ordering is satisfied because a Spark requires for the data to be sorted by year. And it is actually sorted by year now, plus user ID. But that doesn’t matter important thing is that the data is sorted by the required column year. So now the required ordering is satisfied and Spark will preserve your sort and save the data as it is. So the data will now be saved in sorted state. So now, we are actually coming to a conclusion of my presentation. I covered two topics at the day. I was talking first about statistics and we have seen how Spark actually compute the statistics under the hood in relation node, and then how they are passively propagated through the credit plan. And we have also seen that sometimes it really makes sense to compute the statistics for your tables in advance by running a nice table, because if you don’t do it then actually, if your table is partitioned, Spark overestimate the sizeInBytes for your table. And then I was talking about how you can save data in a sorted state to a file format. And here we have seen that if you want to have the data partitioned in the fast system and to have each partition sorted then it is also important to assort your data by the partition column. So this is I prepared for today. Thank you very much for your attention. And if you, find this information to be useful feel free to check out my blog because I have published that articles over this kind. And so thank you very much again and please don’t forget also to provide us your feedback because your feedback is very important to us. Thank you very much.

Watch more Data + AI sessions here
Try Databricks for free
« back
About David Vrba

Socialbakers a.s.

David is a senior machine learning engineer at Socialbakers. He is working with Spark on a daily basis processing data on different scales from few GBs up to tens of TBs. He also does query optimizations with the goal to achieve maximal performance and helps with productionalizing of various ETL pipelines and ML applications. David enjoys preparing and lecturing Spark trainings and workshops and trained in Spark several teams such as data engineers, analysts and researchers. David received his Ph.D. from Charles University in Prague in 2015.