Photon is a new vectorized execution engine powering Databricks written from scratch in C++. In this deep dive, I will introduce you to the basic building blocks of a vectorized engine by walking you through the evaluation of an example query with code snippets. You will learn about expression evaluation, compute kernels, runtime adaptivity, filter evaluation, and vectorized operations against hash tables.
Learn more about Photon: https://www.databricks.com/product/photon
Speaker: Alex Behm
– Hello everyone, I’m Alex, I’m the Tech Lead of project Photon at Databricks and today I’m going to do a Technical Deep Dive and teach you How to Think Vectorized. We’re going to go through an introduction, talk a little bit about the setting, what Vectorization is and I’ll motivate the further work with a few benchmarks and then I’m going to teach you how to build a new engine and we’ll talk about the basic building blocks like Expressions and implementing one operator, which is the Aggregation. And finally, we’ll conclude with some End-to-End Performance benchmarks. Before we go there let’s talk a little about what motivated Photon in the first place, and we’ve been seeing a few trends in two dimensions one is on the hardware side so recently since 2015 roughly we’ve been seeing that storage and networks have been getting faster, but CPUs not so much. And so they do continue to be the bottleneck on some workloads and so then there’s a natural question about how we can achieve the next level of performance, and kind of squeeze out as much of the CPU power that we do have at our disposal. The second trend we’ve been observing is in terms of the workloads. And we’re seeing that, businesses, they want to move faster and they’re loading they want to get all their data ready for processing as quickly as possible and that kind of means that they spend less time and the kind of data modeling and making sure the data is pristine and clean but instead they just want to get the data in very quickly, and so you don’t necessarily have nice schemas that declare whether the columns have NULLs or not strings are very convenient data types so very often the data isn’t perfectly typed and we’ve also seen this process of one of the wanting to, work with the data and its full lifecycle from ingesting it in a raw form and then converting it through maybe to a bronze, silver and gold tables. So basically I’m going through this lifecycle of ingesting the raw data and finally producing tables downstream tables with varying degrees of quality. So it’s not necessarily the case that everyone should go and build perfect schemas upfront no, but we think there’s actually a case to be made for also like basically allowing this workflow and maybe we can design an engine that is both good at dealing with kind of the raw data as well as data with a pristine schema and quality. So those are basically the trends that motivate it. Thinking about a new engine called Photon. So in the bigger picture, where does Photon fit in? So we, Databricks we have this Delta Engine that consists of three components: The Query Optimizer, the Execution Engine and then IO Optimization layers at the bottom like Caching and at the very top you can get into this you can submit SQL queries, Spark DataFrames, or Koalas. So, basically what I’m trying to show you here is that the Photon Execution Engine has this part there in the middle and we’re going to be focused on that part. So what is Photon? It’s a new execution engine for Delta Engine gears towards accelerating these workloads we talked about it’s built completely from scratch and C++ purpose built for performance it’s a vectorized engine it’s a specific technique for making these kind of workloads fast and hopefully I’ll teach you more about the specifics of that we also want to keep in mind that modern workloads are both structured and semi-structured so we want to support those cases as well. So let’s talk a little bit about Vectorization. What is the main idea of vectorization? It’s really taking the query and decomposing it into small compute kernels that process vectors of data. And the main idea there is that CPUs are really very good at doing this kind of work. Running over fairly simple loops over a batch of data items the access patterns are very predictable it’s very cache-friendly it’s very friendly to the pipelining of CPUs maybe it enables the use of SIMD and so on typically these engines use Columnar in-memory formats, to enable a lot of these improvements it’s also nice that since it’s kind of a batch-oriented system, we can have some Runtime Adaptivity that we’ll be talking about actually later on it’s also nice that this idea of compute kernels is modular so, if there’s a certain one of these operations that tends to be slow, it’s very easy to identify the code and kind of optimize only that specific part. Right, and I guess these ideas, they all sound great but what does it actually mean? I completely understand if it sounds a little nebulous and this is actually what I’m hoping to teach you about today. About basically how to build a mental model for what does it mean to build a vectorized engine and what are the basic building blocks? But before we even go into the specifics of this, you might ask, well, building a new engine completely from scratch in a different language, that seems like a pretty big endeavor. Is it worth it? Should we really go down this path? To motivate this I’m showing you a bunch of micro-benchmarks here on a variety of expressions date and timestamp expressions, string expressions arithmetic expressions, Katz expressions. So basically a whole family of different expressions and we can see that there’s still a lot to be gained. So I’m showing you here on this chart of the Speedup that you can achieve with Photon on some of these operations and you can see that we can get improvements in the range of 50 to 60 X. So I think this should motivate us to kind of further understand how can we achieve these Speedups? Great, so let’s get to work and start building a new engine from scratch. We’re going to go through Expression evaluation and adaptivity how to execute Filters and then also how to deal with Hash tables for aggregations. So let’s start with the Expressions. To kind of ease us along, here’s a motivating very simple query that can help us understand the different components so on the left hand side, we see pretty simple SQL query that’s a grouping aggregation with a sum running after a Filter. And this translates into a query, like into a simplified query plan, that you see on the right, you have the Scan, followed by the Filter and the Aggregation and the model here and the vectorized engine is that these operators, they pass batches of columnar data. In between each other. And our job is that, to implement these different operators. For simplicity, we’re going to focus on the filter and aggregation portions in this talk. So let’s talk about the, start with Expression Evaluation focusing on this Filter expression c1 plus c2 is smaller than 10. That will translate into an expression tree like the one you see on the right, and we’re going to think about how to implement them. High level, c1 and c2 are input vectors the plus operation will produce another input vector and that intermediate vector will be fed into the inequality operator against the literal 10 and finally we’ll produce the output of this Filter expression. And so how are we going to implement this? I will introduce you to the idea of compute kernels and what these vectorized compute kernels look like. So let’s start with the plus operator. The plus operator might be implemented by code, that looks like the code you’re seeing here so we’re taking two inputs left and right they both represent vectors of data items and we’re producing a vector of output items. It’s a very simple loop over all the items, adding them up and writing them to the output. Seems simple enough. This should run pretty fast but what about NULLs? We’re gonna have to, we need to deal with the NULLs somehow. Let’s take a look at what that means for us. We can add them in so not only do we have the data input and output vectors, but also the NULL vectors, that we’ve added in now and we can modify the code accordingly it’s not too bad we just need to check whether there’s a NULL and write it to the output NULLLs as well as, not executing the addition operation if any of the, operands are NULL. But unfortunately, adding in the NULLs makes our simple compute kernel over 30% slower with the NULL checks. That is kind of unfortunate. Because in practice, my data may sometimes have NULLs but most of the time it’s not NULL. Wouldn’t it be nice if there was a way for me to read the benefits for my mostly non-NULL data while also being able to deal with data that is actually NULL? And the answer there is this idea of Runtime Adaptivity, it’s pretty simple the main ideas what we can do there is for each of the input columns, we can keep track of whether it has NULLs or not. And then at runtime, we can use this property to, invoke either the generic kernel that can deal with NULLs or we invoke a specialized kernel that is optimized for no NULLS on both sides. And you can kind of this is one example of runtime adaptivity, and you can imagine ways of further generalizing this to only the left or only the right having NULLs and so on or to other kinds of expressions and other ideas for runtime adaptivity include things like you can keep track of string and codings or min and max values of the data but the main idea is that this runtime adaptivity allows you to adapt to kind of the data that you’re seeing at runtime as opposed to having to rely on information that you have at table creation time inside of the schema. So this is very nice because users don’t need to declare all the properties upfront and they can still reap the benefits because we do it we will invoke a specialized code at runtime based on these properties. For evaluating in the next step, the inequality we can do we can implement similar kernels and we can have kind of similar adaptivity implemented for that one and for this specific case you can imagine yet another optimization where, if one of the sides is a literal, we can kind of bake it in as a single value as opposed to passing a vector. And this additional optimization allows us to get about a 25% speed improvement. Great so now we’ve gone all the way through and evaluated it but what exactly is the output? We haven’t really talked about that. And what does this mean for our input column batch that we were processing? And what exactly is it that we’re passing on to the next operator? This leads us to the question for, how are Filters evaluated and represented? And the main idea here is that the filter gets evaluated as a new kind of vector, which we call an Active Row Spectrum. You can think of it as a Lazy Representation of this filter basically just indicating which of the rows have survived this filter. And here you can see in the chart that between the operators they pass on all of these columns, c1, c2, c3, g1 and g2 and the filter will basically just tack on this active row and then pass the original data including the original data and the new active rows to the next operator. The benefit of this laziness is there are several benefits to being lazy here one of them is that you don’t have to kind of compact the entire input data. There could be many, many columns and compacting them eagerly would be kind of expensive. We’ll also see later on that this notion of active rows becomes very helpful in more complicated operations like aggregations and joins. So what does it mean for our engine now that we have this lazy representation of filters? Basically this idea becomes a concept in the engine and it needs to be plumbed throughout the engine it does add complexity in the code but it’s a really, really handy way of expressing in particular, some of the more complex operations like aggs and joins. The main change to the kernels is, if we take a look at our plus kernel, that doesn’t deal with NULLs, you would pass an additional vector the active rows and then inside the kernel you have this indirection of first, checking only going through the rows that are active and performing the operation on those. Alright, that prepares us to move on to aggregation. So, the basic basic algorithm for Hash Aggregation is fairly simple in our example we were grouping by g1 and g2 and computing a sum so we’ll maintain a hash table that has these entries in there, you can think of g1 and g2 as the combined key and sum as the value and the basic algorithm is quite simple, well first hash the input and find the corresponding buckets then we’ll see if the bucket is empty potentially initialize it with the keys and an aggregation buffer in this case, we’d initialize the sum buffer to zero and then we will need to do key comparison and follow whatever our probing strategy is to resolve collisions. Once we’ve identified that we’ve found the right bucket with matching keys, we can evaluate the aggregation function and update the corresponding aggregation buffers. So this is kind of a pretty simple algorithm it’s very straightforward to implement in a row-wise system but how are we going to implement this in a vectorized system? So the main idea again, is to think in a columnar and batch-oriented way so we’re going to have these kernels that are type specialized and column-oriented and batch-oriented. That’s the main idea. So yes, we want to basically go through all of these steps and kernelize all the things and I’ll walk you through what that means exactly. As a quick motivation for why we would want to do this again some micro-benchmarks, even on the simple query shown here, simple grouping aggregation we see that there are, pretty substantial Speedups to be achieved with this kind of vectorization approach. For more complicated aggregations we tend to see bigger Speedups but the main point is that even for a very simple query like this, these techniques can really help us achieve some pretty nice Speedups so pay attention. Hash aggregation. Let’s take a look, on the left-hand side we have our input column batch with g1 well only the interesting column they’re shown g1, g2 and c3, and for ease of exposition, I’ve omitted the filter and all the active row stuff, because it just makes it easier for us to think about. The first step here is to take g1 and g2 and compute, take those g1 and g2 vectors and compute a vector of hashes. This will be done by first computing, the hashes for g1 and then, using that as the seed for the hashing of g2 that will produce a vector of hashes. And you’ll see on the right hand side that I already put a few entries into the hash table kind of just to make it easier for us to think about some of the nontrivial cases. So after we’ve computed the hashes, we will do the modular against the size of the hash table to compute the buckets. So this is another vector that contains pointers to the corresponding buckets that we think might be a match based on the hash. The next step is to deal with collisions meaning that now that we’ve identified candidate buckets we need to compare the input keys with actual contents of the buckets to identify collisions. And this again, will be done in a column by column way so we will start with g1 compared all the g1 values with the corresponding buckets via this bucket pointer indirection and we’ll combine that with the similar comparison for the g2 key to identify which of the input rows have which of the buckets are pointing to buckets with matching keys, and which of the ones are pointing to buckets that do not have matching keys. In particular, we’re interested about these collision cases and here in the example, you’ll see that one of those pointers is a collision and it doesn’t match the key. And we do this, a convenient way of representing these non-matches is through our concept of active rows. And since all our kernels are aware of this active rows we can essentially use the same kernels to continue our probing strategy. In our example we’re following a simple linear probing strategy so to resolve that collision we’ll kind of advance advance the buckets for all the collisions and we’ll repeat the step of comparing the keys and identifying the non-matches advancing the buckets comparing the keys until for all of the bucket pointers we’ve identified all of the places where we want to update the aggregation buffers. And in the next step we’re now going to take the c3 input vector and update all of the corresponding aggregation buffers via the buckets indirection. Again, another one of those kernels. And to give you a better feeling for what one of these kernels might look like I also have a little code Snippet for us to walk through. One interesting thing here is that this is a vectorized engine and in columnar in-memory format, but when dealing with hash tables it’s very convenient to represent them as rows. So we have here a kernel that is a kernel that is a Mixed Column/Row Kernel. You see there that the input in green is just an array of values so it’s like a vector of values and a representation of the buffers and the hash table is through a double pointer. So it’s like a, it’s an array of pointers together with a buffer offset. You can think of these two things together representing a column whose values are sprayed across rows in the hash table. And then to implement the kernel it’s pretty straightforward we resolve the first level of the indirection to get the bucket for the row and then we invoke the aggregation function pointing it to the corresponding input and the bucket plus the offset of the aggregation buffer. Writing the loop in this way is nice again because the CPUs are good at dealing with code like this in particular for hash tables that might become large in memory you will be able to preload a lot the data from memory instead of kind of being bound by cache miss latencies. Alright, great, so let’s now that we’ve seen some of the basic building blocks let’s take a look at End-to-End Performance. Yeah, so we’ve measured Photon together on the TPC-DS benchmark and this chart shows you the queries per hour so it’s the throughput and we see that with these techniques we’ve just discussed we’re able to achieve a 3.3x improvement in throughput with Delta Engine. On this end, the standard benchmark end-to-end. Even further the numbers I’ve shown you so far and the benchmark tends to focus on read queries but similar ideas can actually be applied to writing data and here’s a quick, this chart is showing you some benchmarks on writing data the first column shows you that we’re writing 8.6 billion integer, so a single column right, and there we can achieve a Speedup of, two to three X with these techniques and for a wider table like the TPC-DS store sales table unpartitioned with 23 columns we can achieve even bigger Speedup of almost four X and writing data. We’ve also given a Photon to several preview customers who were interested from different industries clearly they need to have a suitable workloads with where Photon can provide a sufficient coverage of their workload and the typical encouraging results we’ve seen so far is that across the boards, there’s usually a two to three times Speedup end-to-end for suitable queries but again, the mileage may, we’ve seen some of these bigger Speedups and the mileage and practice can vary one interesting data point is that the biggest Speedup we’ve seen so far was improving one query from 80 minutes down to five minutes. So we’re very optimistic that we’ll be able to improve a lot of workloads. Just to recap I talked to you a little bit about the basic blocks of vectorization the way to think vectorized is that you want to decompose all of the operations into very simple loops over vectors of data because CPUs are very good at dealing with this kind of code. Vectorization also allows batch-level adaptivity so you can adapt to the properties of the data you’re seeing at runtime and don’t necessarily need to give all the properties upfront in the schema, we talked about a Lazy filter evaluation by an active rows which is a useful concept for more complicated operations like joins and aggregations and we’ve also talked about how to deal with mixed column and row operations for hash tables. Right, and now it’s time for our Q and A, and please do rate the session.
Alex has been building databases for over a decade in academia and industry, and maintains a passion for speed and quality. He is the tech lead for Photon, a new vectorized engine written from scratch in C++, that powers Databricks' Delta Engine. Alex holds a PhD in databases from UC Irvine.