John Deere ingests petabytes of precision agriculture data every year from its customers’ farms across the globe. In order to scale our data science efforts globally, our data scientists need to perform geospatial analysis on our data lake in an efficient and scalable manner. In this talk, we will describe some of the methods our data engineering team developed for efficient geospatial queries including:
– Leveraging Quadtree spatial indexing to partition our Delta Lake tables
– Extending the Spark Catalyst Optimizer to perform efficient geospatial joins in our data lake
Christopher Hic…: Hello, and thanks for joining us. Today, we will be presenting on how to manipulate geospatial data at massive scale. My name is Chris Hicks, and I’m a Staff Software Engineer at John Deere. I’ve been with John Deere for almost two years, working in our data science and engineering space.
Charcey Peterso…: And I’m Charcey Peterson. I’m also an engineer at Deere. I haven’t been on the team as long, just a few months, but I’m also working in the data engineering space.
Christopher Hic…: Today, we will start with an introduction to data science and engineering at Deere. Next, Charcey, we’ll talk about how we partition our datasets by space to maximize geospatial performance. And thirdly, I will talk about an application of that partitioning in our scalable point in polygon algorithm for Spark. Charcey and I are a part of the Data Science and Engineering Department at Deere. Our mission is to leverage data and analytics to enable smarter equipment and better decisions. An example of how we do this is shown on this slide. John Deere equipment emits high resolution IOT, sensor data, such as planting data and the picture on the left. You can see each yellow dot represents one unit of planning data that was collected during the operation. We collect this IOT sensor data and use it along with various layers of environmental geospatial data in our analytics engine, which allows our data scientists to drive the next generation of insights for our customers.
Charcey Peterso…: Thanks, Chris. So I’m going to talk about partitioning by space. So when we have these big datasets, it’s very important that we pick partitions. And just as a refresher, a partition is just an identifier that segments your dataset. It’s very important to pick a partition that’s going to be useful and really represent the data, so that you can perform performance queries and repeatable query times and job times. So when you’re picking a partition, critical to know your dataset. But some common types of partitions would be by time, so maybe you partition by year or month, different static types that are common for your specific dataset. In the instance of agricultural data, that could be something like a harvest, or application, or seeding. And then the one that we’re going to focus on is by geographical space. And with partitions, you want to strive for the data within a given partition to be the same size, and that’s going to look like the graph I have here.
So in the graphs that I’m going to go through in these slides, there’s only 10 partitions. You would likely have a lot more in your specific dataset, but the point here is just to show the distribution of data within each partition. The reason that you want it to be even is to minimize skew, which allows you to have repeatable and predictable job or query runs.
So let’s talk about how you can partition by space. There’s a few different algorithms available, but the one that we went with, the one we’re going to talk about today is called a Quad Tree. And so, let’s talk through the photo I have here, so Quad Tree, as you can probably get from the title, it’s just a tree with four children. So at level one of the tree, we project projected earth to be flat, and then we split it up into four quadrants. Now, each of those quadrants splits into its four children, which then itself splits into four children, and so on and so forth. So as you get to higher and higher resolutions, the amount of geographic space within the quad key gets smaller and smaller, and that means it’s really good for projecting 2D data into a single data point.
So you decide, you want to partition your dataset by space. You choose your quad key resolution, so say you choose level six, for example. You break up the world into level six, you put your data into those different keys, effectively being partitions. You look at your partitions and they’re extremely skewed. Well, that is not too good. So you look and you see, “Most of my data is in two partitions or a few partitions, some partitions have no data at all. What is going on here?” Well, turns out that not all data is distributed evenly, especially spatial data. So there’s different characteristics of data that’s going to affect where it is in the world. Think of farmland; that’s concentrated to certain areas. Think of cities, think of transportation networks. All of those, if you put data in them is not going to be evenly distributed across space over the world.
So in this picture, you can see what we did is we split our dataset into just one single resolution, so all of those squares are the same geographic size, but then within that, we had varying levels of data. So some, we might have no data, other ones, we might have a ton of data, billions of records, so we got really skewed and that is not what we want. So next, we’ll talk about what we did to fix that.
We decided to create a custom Quad Tree. So if you want to go about this approach, what you would do is you would choose your men and your max quad key resolutions, and you would also choose what’s called a breakpoint density, and that is the number of records per quad key before you had skipped down to the next resolution.
So if we look at these pictures here, what’s going on is you start at your min resolution. So say you start at level six, and you’re going through, and you’re looking at all the data that goes into that specific key, and say you hit your breakpoint density of a million records. So, that key is going to get split into its four children. And then you’re going to look through each of those, and you’re going to keep splitting as you hit that breakpoint density. When you haven’t hit that breakpoint density, you move on to the next one. And then at the end, if you want to visualize it, this is what it might look like. So those high density areas that we saw are going to have high resolution quad keys that go along with them. So instead of balancing by geographic space, you also balance by data density.
And if you were to graph the partitions, hopefully they look something like this. So most of your data is going to be spread out on most of the partitions. Now, it’s likely you’re still going to have one to maybe 20 partitions that don’t have a lot of data. Just because of the nature of data, maybe you have one data point that’s on an island or something like that. That can definitely happen, but by and large, hopefully you have a pretty level number of data in each partition. If you do this and your data is still heavily skewed. You could consider decreasing the lowest level of quad key resolution or increasing the density breakpoint. And this is kind of what it comes back to knowing your dataset, and knowing how it’s spatially distributed and things like that. And so, you can play around with it until you kind of figure out where those partitions should land. And this can also be something that you’d come back to over time. So you’re going to have to re partition your database over time as maybe you get a new customer, you have a lot of data in an area you didn’t have before. So it’s good to be able to build this custom tree because then you can reapply it to your dataset over time.
Do be mindful of the total number of partitions that you have. You don’t want too few, but you also don’t want too many, but your Databricks Consultant can be super helpful with that for figuring out what’s going to be right for your dataset. And then lastly, you could consider multiple partitions. So maybe you do partition by space, but you’re just not getting the performance that you need, you could consider another partition maybe by time, maybe by, like I said, those static types. By harvest, for example. But do remember that order matter, so what do you want to partition by first, and second, and third? So keep that in mind.
And then here, I just wanted to show how easy it is to apply with Spark and Delta Lake. So in this code snippet, just saving a data frame as a partition Delta Lake table. So you simply enter the format as Delta, and then, “PartitionBy” is going to be what column name you have assigned to that partition. In this case, it would be, “QuadKey.” So that’s what we did to partition our dataset by space, and now I’m going to hand it off to Chris and he’s going to talk about one of the cool queries we wrote.
Christopher Hic…: Thanks, Charcey. Now that we’ve learned how to partition our datasets by space, I’m going to talk about an application of this partitioning in our scalable point in polygon algorithm. So the problem we faced was our data scientists at Deere need to efficiently query geospatial boundaries using PySpark. This is central to a lot of the insights we generate for our farmers. The algorithm we’ll focus on today, ray casting, is a very popular algorithm for point in polygon searches. This algorithm states that from a given point, if you cast a Ray in any direction, that point is within a polygon if it intersects an odd number of times. You can see this on the left with the simple rectangle example. The point in the middle of the rectangle casts a Ray and intersects once on the edges, while the point below the rectangle intersects twice. This algorithm also extends to complex multi polygons like the picture in the middle, the shape of Italy, for instance.
One challenge we had with this algorithm is the overall brute force time complexity. The approximate time complexity is shown on this slide; it’s approximately big-O of the number of points or rows in your database, times the number of edges in the polygon. So this is not easily scalable for polygons with thousands of edges. Like Italy, for instance, has over 3000 edges in this example, or large Spark data frames with billions to trillions of rows, like many of the data frames we use in our work at Deere.
Another problem we faced was finding a good open-source solution that was sufficient for all of our needs. In particular, we struggled to find something that was efficient and would natively take full advantage of our Quad Tree partitioning structure. So our solution to these problems were to write our own point in polygon algorithm, which maximized the use of our Quad Tree partitioning structure and expands the use of quad trees to minimize the computation needed.
To demonstrate the power of this implementation, here’s a hypothetical example, which is similar to the types of datasets we work with. In this example, we have a table consisting of exactly 1 trillion rows. This is a Delta Lake table with 5,000 Quad Tree partitions. We are assuming that there is zero skew in the data, it is perfectly partitioned across the table in these 5,000 Quad Tree keys. So each partition will have 2 billion rows. So the problem we’re trying to solve is we want to find all of the points within the shape of Italy, and the shape of Italy has over 3000 edges. If we look at the brute force runtime, we would need to run the ray casting casting algorithm for all 1 trillion rows and 3000 edges, which would result in approximately three quadrillion point in polygon calculations.
So from this brute force approach, we will attempt to improve this computation in three separate steps. First on the left, we’re going to use the existing Quad Tree partitions that we already have in our table to immediately filter to only the quad keys we care about. Second, and in the picture in the middle, we will construct a Quad Tree within the bounds of the polygon, to capture most of the points within the polygon. Finally, for any points, which may lie either inside or outside of the polygon, but reside along the edges, we’re going to perform a much smaller scale ray casting algorithm.
So in step one, the first thing we want to do, because we have these Quad Tree partitions in our table already, we can use these immediately to filter out the data to only the quad keys that we care about. For this example, we have nine partitions in and around the shape of Italy, so we’re immediately able to reduce our data frame from 1 trillion rows to 18 billion rows, or just under 2% of the original table. And thanks to Delta Lake, we can do this in constant time using partition pruning. A simple metadata lookup as involved with partition pruning, and the time to look up that metadata is constant, no matter how large our table might grow.
Now that we’ve reduced the table from 1 trillion to 18 billion rows. Our next step is to identify most of the points within the polygon by constructing a full Quad Tree within the boundaries of the polygon. So from the polygon geometry that we input, we construct a Quad Tree, which is fully contained within these boundaries. We take the keys from each node in the tree, and put it into a large set. Then, for each of these 18 billion rows, we do a latitude longitude to Quad Tree key conversion at a very high resolution. So from here, we are able to check each of the 18 billion rows, whether or not they belong to that larger quad key set that we constructed from the boundary. When we do this for our provided example, we find that 15 billion points are indeed contained in the Quad Tree, but the question remains: what about the other 3 billion?
Now that we’ve identified most of the points within the polygon, we still need to determine whether the 3 billion remaining rows along the edges are either inside or outside of the polygon. To do this, we need to use the ray casting algorithm, but we’re able to use it at a much smaller scale than before. So to do this, we break down the edges of the polygon into, in this example, 1000 high resolution tiles. Since our polygon is about 3000 edges in total, each tile is going to contain approximately three edges. So from here, we can do the full ray casting algorithm on a reduced number of rows, the 3 billion rows remaining, as well as these 1000 small tiles with three edges each. So if we go back and think about our total time complexity for the Ray casting algorithm, which is big-O the number of points, times the number of edges. The number of points has been reduced from 1 trillion to 3 billion. The number of edges has been reduced from 3000 to an average of three on each tile. So we ended up with only about 9 billion point in polygon calculations, which is several orders of magnitude smaller than our brute force solution.
Now that we understand our optimized algorithm, let’s look at how we’ll translate this to the catalyst optimizer for use in Spark SQL. First, we define a case class that extends rule of logical plan, and this is the first line in the code. And this allows us to build a rule that can be injected into Spark’s tree-based logical plan. Then in the second line, we apply a partial function to the catalyst, transform all expressions method, in which we apply pattern matching for a function name. And so here, our function name is, “Point in polygons.” You can see the case point in polygon statement in the pattern matching that occurs through that case statement.
From there, within our case statement, we just apply our three-step process. Each step is going to have its own catalyst expression. So step one, you can see we filter by our partitions, and the in partitions variable is our resulting expression for step one. For step two, we construct a Quad Tree to find most of the points contained to the polygon, so the, “Contained in Quad Tree” variable is our resulting expression for step two. For step three, we find all of the points along the edge of the polygon, and we use the ray casting algorithm on the edge tiles. So the, “Ray cast success” variable is our resulting expression for step three. So finally, we take our three resulting expressions and create one high level catalyst expression, which states the point is in the polygon if it’s in partitions, and either it’s contained in Quad Tree or ray cast success.
To learn more about the internals of the Spark catalyst optimizer, linked at the bottom of the slide is a very insightful, deep dive document from Databricks. So now I’ll pass it back to Charcey just to discuss the outcomes of these algorithms and implementations.
Charcey Peterso…: Thanks, Chris. So two main outcomes here we want to highlight. The one is simply time; so we can query petabytes of data in seconds to minutes, and this is extremely important for any processes that we have running. For anyone that wants to consume our data, it’s great that they could get it within seconds to minutes instead of any brute force backend that’s going to take hours to possibly complete. And kind of to build off of that, now we’re enabling the generation of features for machine learning, because we can serve up this data in a predictable way, and in a fast way that allows us to build features that can enhance John Deere’s data science. So that’s why, Chris was saying we have a Data Science and Engineering Department, so we’re really going hand-in-hand to enable that data science with the data engineering that we’re doing.
Christopher Hic…: Finally, we’d like to thank you all for joining us for this presentation. We hope you enjoy it, and please leave your feedback.
Charcey is a Software Engineer within John Deere's Intelligent Solutions Group. Her formal training is in Geographic Information Systems and Remote Sensing, but has found an interest in systems engine...
Chris is a Staff Software Engineer within John Deere's Intelligent Solutions Group. He designs and develops large-scale data pipelines and production-grade machine learning tools and infrastructure, a...