Pandas is the de facto standard (single-node) Data Frame implementation in Python. However, as data grows larger, pandas no longer works very well due to performance reasons. On the other hand, Spark has become a very popular choice for analyzing large dataset in the past few years. However, there is an API gap between pandas and Spark, and as a result, when users switch from pandas to Spark, they often need to rewrite their programs. Ibis is a library designed to bridge the gap between local execution (pandas) and cluster execution (BigQuery, Impala, etc). In this talk, we will introduce a Spark backend for ibis and demonstrate how users can go between pandas and Spark with the same code.
– Hi, everyone and welcome. Thanks for coming to this talk. Today we will be talking about Ibis and how can we use Ibis to seamlessly transition from Pandas to Spark. So, first thing first, because I work for a financial advisor, I need to clarify that this talk is for educational purpose only and not to provide any financial advices. This should become pretty obvious. Okay, let’s get started. First I wanna talk about target audiences, who might find us talk useful. Here are a few examples, if you like Pandas, but want to analyze larger data set. If you are interested in using a distributed data frame, but don’t know which one to choose, or if you want to have your analysis code run faster or more scalable without making code changes. If any of these questions seems to apply to you, stay on and hopefully this talk will give you some answers.
So a quick introduction about me, my name is Li Jin. I am a software engineer working for a technology firm called Two Sigma. At Two Sigma, we apply technologies like computer science, statistics, and machine learning to investment management. My team at Two Sigma is called Modeling Tools. And our mission is to provide the best modeling experience for data scientists at Two Sigma. Our team loves to use open source softwares, and I have contributed and worked on many open source softwares and projects. For example, Apache in spark and pandas we should be all pretty familiar with those. Apache Arrow is another new one. It is a cross language, a memory data format. Flint is a time series and then a library on top of Apache Spark. And of course the topic we’re talking about today, Ibis.
So I want to motivate my talk by start looking at a pretty common data science task.
So here are some Pendas code. We should all be pretty familiar with these, but I’m going to walk through it with you together.
The first thing we’re doing here is doing a single column operation. We’re taking two column v1 and v2, average those and assign it to new column called feature.
The second thing we’re doing here, is a window operation. Basically we’re grouping all the data by another column called key. And for each key, we compute a rolling mean of the column feature and then assign it to this column called feature two.
Finally, we’re doing a group aggregation on column feature two by in grouping by key and compute the max and a main of this column. So the code that runs pretty good on small amounts of data. However, if you use pandas on medium to large size data set, you will know that at some point the code is going to be too slow. And there are a couple of reasons for it. First pandas is not designed for large amounts of data in the first place. Pendas is created 1,000,010 years ago, when the data size is much smaller than those of today. And architectural pandas there’s more or less the same over the past 10 years, it doesn’t involve together with the data size. The second reason is a machine probably doesn’t have enough Ram to hold all the data, because Pendas as a memory single machine solution, it is required that you have enough memory to hold the data in order to process it.
And the third reason is you are not using all the CPUs. Modern machines usually have more than one CPU and pandas for the most part, it’s a single thread library. So you’re not using even about local machines, you’re not using all the costs you can. So we want our code to run faster or to be able to process larger data set. So what do we do? Here we’re going to try a few things to see if that helps.
The first thing we’re going to try, is to use a bigger machine. After all, this seems pretty easy to do, getting a bigger machine these days, it’s not all that hard. And it has a very compelling reason. There is no code changes. You’re running the same code with just a beefier machine. However, this approach, obviously won’t get you very far, obviously. For example, you still have the same software limits of using pandas of single thread. So although you might be able to crank through large data set given enough time, it is not going to be very fast. The next thing we try naturally is to use a distributed systems. After all distributed systems seems to be a good answer, to a lot of the big data province. The first approach we’re going to try here is to use a generic way to distribute our code. This means that the system that distributed the computation, does not know what a computation is trying to do, and to treat it as a black box, there are a lot of ways to do it. Here I’m using a Spark as example. And the advantage of… Here I’m doing, assuming I’m analyzing times here as data of 20 years. Here I’m using Spark Context parallelize, to split my data into one year pieces and then send out to the cluster. So each executer can process one year at a time and therefore achieve a parallelize of 20.
The benefit of this approach, is that it requires very small code changes. You might need to refactor your code a little bit to deal with one subset of the data at that time and just ditch back the results, but it doesn’t require a total rewrite of dependence code. It’s also pretty scalable by showing out data into smaller pieces. You can achieve a very high level of parallelism.
However the disadvantage of this approach, is also pretty clear and only works for embarrassment in parallel problems. And in real life, a lot of the pendas code, they’re not embarrassing parallel. For example, in the case of the we show before, we have a window operation and a group of aggregation operation, in our example. And neither of which are embarrassing parallel. In a window case, for example, we need to handle the boundary cases of a window goes across multiple 10 charts and in a group by in cases, we also need to handle a case where a group population might need data from what multiple charts And also because now we’re using a distributor solution, we’re opening ourselves to distribute failures. The next approach we’re going to try to take is to use an actual distributed data frame library.
Different from the second approach, this one actually, the framework is actually aware of what you’re trying to do. There are a couple of distributed libraries available on the market, Spark being a very obvious one. Dask is another one that’s just popular in a Python. Landscape Koalas being another one, which is pretty new and interesting. The advantage of all of these is those are all pretty scalable. They’re designed to handle large amounts of data and utilize many CPUs and memories. However, the disadvantage of this approach is also pretty important, which involves a very high human cost. The user needs to learn a totally new API, that’s different from pandas and probably need to rewrite a lot of analysis code to use a new API. And the second problem with these, is there’s no obvious answer of which one to use. If you asked me today, if I should use a spark or diaspora koalas, honestly, I don’t have the answer for you. Because that really depends on your situation. And combining with the first cost of high human costs, of learning a new API, this actually becomes a worse problem.
And lastly, we are still open to distributed failures. So we look at three approaches. They all have their advantages, they solve subset of the problem. But none of those seems to be a silver bullet here.
So what should we do? In order to get maybe a new perspective, I want us to step back and rethink our problem here.
So let’s take a step back and look at the previous pandas code.
I think we’re pretty okay with the code itself. It’s not that we think the code is hard to read all right. It’s actually pretty reasonable, to express what it’s trying to do, but we just want the code to run faster or handle larger data.
So this brings us to our problem. The problem is not just to rephrase it. The problem is not how it expressed a computation, but how we execute it.
I have a two out of the three approaches we look at. Involves changing the way we express a computation, which is not a problem we’re trying to solve here. So something seems off. Why is it required to change the way we express a computation in order to make it faster? That seems a bit strange.
So next I want to bring out this design principle called separation of concern. From Wikipedia in computer science, separation concern is design principle for separating a computer program into disjoint sections, such that each section addresses a separate concern. How does this help us? Well by applying this approach to our problem, we can separate how we express to computation, AKA expression part, with how we execute it. The reason we want to separate this, is that the high cost of changing the way the colors is purely involved with expression part and the part we want to improve, our actual execution.
So by except when you these, we can maybe get to a solution where it does not require a changing of the expression part and only improve execution.
So this is actually not a new idea. For example, SQL is a excellent example of doing it. SQL is a way of separating expression from execution. It is powerful, not because SQL is fast. It’s powerful because you can make real SQL query run faster. For example, by switching from Hadoop to Spark, without changing your expression later, AKA the SQL Prairie.
I think kind of crazy to ask you to stop using pandas and spark and all the nice data frame APIs and go back to SQL, this doesn’t make. However I do want to ask the question of, if we can come up with something that is similar to SQL, but most suited for Python data science.
So that will conclude the first section of the talk, where we talk about concepts like expression and execution separation of concern. Next, I wanna switch gears and talk about Ibis. With Ibis these ideas of separation of concern, gets materialized into software libraries. And here is the outline for the rest of the talk. I will first talk about Ibis at a bit of a high level. Then I will go deeper into the two key component of Ibis, the expression and execution. And finally, we’re going to see a little bit of implementation of the PySpark backend, which I personally contribute to. So let’s get started.
So what is Ibis?
Ibis is a open source library in Python. It started in 2015 by Wes McKinney, which is the creator of pendas. And Ibis has been worked on by many top and pendas committers. Aside from that, Ibis also has community engagement with other database developers, like Bigquery, Impala and a sparking community as well.
So Ibis has two major components, maps to the concept we talk about before. The first being the Ibis language. This is the API that is used to express the computation using Ibis. The second part is Ibis backend. These are modules that can translate the Ibis expression, to something that can be executed by different backends. For example, Ibis now suppose a list of backend. These are some examples. Ibis.pandas is a single node backend, that extrudes Ibis expressions using pandas. This is similar to a memory database, if you will. Ibis PySpark is the one that of course execute it, Ibis expression on PySpark. And there’s also a bigquery one, which you can just use Google cloud. Bigquery was Ibis to run code with big query. Omniscidb is a particularly interesting one, in my opinion, it is a GPU database. So that is very different from what we normally use.
Let’s deal with the same API.
So let’s go a little bit deeper into the language part. It has a table API, the Ibis table API is very similar to what we’re familiar with, as the different API. For example, it has projection, filtering, joining et cetera. These are the very common SQL, standard SQL operations. I was also comfortable with a little bit extension to SQL. For example, as of joining is a particularly useful function to deal with tons of serious data. If you use pandas merge asof, you will know what I’m talking about, UDF being another one. And the second piece of the language, is Ibis expression. And these are all of the things that we can think them as abstract thing, syntax trees. So these are intermediate representation of the table API, users don’t need to directly tackle with those, but those are very useful for the developers to implement the backend. Here, it’s just a quick example of the language, here I’m using a table that mutate. This is similar to what column functioning PySpark, to create a new column. And here I’m just showing a visualized version of the abstracts and next tree, which is pretty straightforward. And the backend takes what we have before, the abstracts index tree and translate that to a specific backend expressions. For example, in the pandas case it will use df. Assign. And in the pyspark case it will use df.withColumn to do that.
So that’s a very high level overview. Hopefully I can give you some idea of what I’ve assessed and trying to do. And next as going to a little bit deeper into specific parts.
This is almost the same example that we showed before. This is the pendas code we had before.
And next I’m going to show a side by side comparison between the pendas code and Ibis code.
So this is the first part, this created a new column operation. As you can see, this is pretty similar, nothing is too surprising here.
And next, let’s see the window aggregations. This is the Pandas version part. You’ve been calling pendas rolling operation. And the corresponding Ibis code is a little bit actually like the Pyspark code for that. Here we’re creating an Ibis window object corresponding to the rolling and a group by, and then we’re calling feature column dot main dot over the window option. This is very similar to… Again, it’s very similar to the PySpark, a window operation.
And finally, let’s look at the groupby operations. It’s actually a relatively straightforward translation. The only difference is in pandas, you have this Maxim being string to represent opra or the operator, whereas in Ibis, you have your specific function cost.
So the code looks pretty similar.
This is the final translation.
Although the code looks really similar, there is a very important concept I wanna emphasize here.
Here the Ibis code is purely expression, meaning that it is not associated with any data or any executing backend. This a more abstract idea from the pendas data frame, which is… This is pure function or pure way of expressing the computation.
And by plug this Ibis expression with a specific backend and real data, we can easily execute it. And that’s the metabolite, that’s the implementation of the idea of separation of concern.
Now let’s look at a little bit of into the backend execution part.
So pendas code, oh, sorry. Ibis comes with different backends and each backends have a client. Here are just a few example of creating those clients. They’re usually pretty lightweight. For example, to use the pyspark client, you just passing the payspark session. And if you use Impala, for example, you just give it the server important name, maybe use a name or something like that, but it’s pretty light weightness straightforward to use.
And to get the table from Ibis, here I’m just showing one way of doing it. There are other ways to do this as well. Here I’m referencing a table by name. This is similar to Pyspark, where you can register a data frame with a name in a catalog. In Ibis, there’s a similar concept of that. So I’m just getting a name table called Foo and assigned to a variable. Again, if I’m using PySpark client, I would just do a use PySpark client to get the table. And here’s the Ibis string key node for this table foo, nothing fancy here.
Next, once we have the table and a backend, we can execute it. Again, there are multiple ways of executing Ibis expression. The simplest way of executing an interdependence data frame. This is very useful, to just play with it. However, there’s also functions, for example, the PySpark backend allows you to compile or turn Ibis expression into Spark data frame. Also which can hold much larger data, then a pendas data frame, obviously.
So if we recall table transformation before, we can wrap it up in a function called transform, which takes a Ibis expression and return another Ibis expression. In this way, we can use it on multiple backend. Here, for example, if my table is associated with Pandas backend, this will be Pandas and if it’s PySpark it will be PysSpark, so pretty straightforward. Again to emphasize, my table and the result table, they are expressions now data frames, meaning they are more flexible to be plugged in with different backends. And finally we can execute it and let’s see the result. This is very similar to the two Pandas function in PySpark actually.
Yup. There’s that just a tiny bit of data.
So the final part of this talk, I want to talk about the PySpark backend in Ibis. I picked this backend because, first this is Spark summit so that seems the most relevant. And second of all, this is my contribution to Ibis, so I’m pretty excited they talk about it. But keeping in mind that there are many backend Ibis that are contributed by various communities.
So just to remind ourselves backend goal is to turn the Ibis expression into a native expression in the backend. In this case, we’re trying to take Ibis expression and turn that to a PySpark data frame.
First, let’s take a look at,
how does that work for a single selection and arithmetic expression.
Expression itself is pretty simple, we’re doing a mutate and additional two columns then take the main of those by division.
Let’s go to this is showing you the Ibis expression tree for this user capote.
At the bottom we have our source table. And one layer above that we have two table cons v1, v2. And on top of that, we have two binary operations, addition and division. So this is very similar to, I guess, any extra syntax tree. So the way that the backends are implemented, or you will have your registered a specific nasa to compile a specific node. For example, here we register a function called compelled divide, to explicitly translate the division node in the abstract syntax tree. And this works recursively it’s… Let’s go to the next slide. Here T is a PySpark translator, this is a just internal class of this backend. What this does, is it has a translate method that can validate this expression and turn that into a PySpark object. This is what will actually invoke the corresponding register method and dispatch on the input type.
Next, we have the expo, which is the Ibis expression object, obviously. And let’s go next. The scope is the dictionary that caches intermediate result, which is for performance reasons. So the way this works is, we will recursively translate this note by translating it’s left and right on children, here And then after we translate the left and right, we just finish it. Here left and right are the corresponding result of the translation, which are PySpark columns, which is pretty important. And then once we have those PySpark columns, it’s straightforward to use the PySpark column division to finish the translation.
So let’s just walk through this real quick. Once we hit the left node, we going to translate addition. I’ll print operation here.
And then once we finished that, we just change. The only difference between the addition and division is just the final step. We calling PySpark addition instead of division. And if we go down the tree because, regressively will see now we hit a table column note. Here table column is basically a name column. So we just get the name of the note and the reference to the column using the name.
So that, will basically finish the translation. Oh yeah, finally we hit. So finally we hit the bottom note, which is the table note. Here again, just as the name the table, so we use the catalog, Pyspark catalog to look up the table, which will result in a PySpark data frame. From there we go bottom up again and finish the entire translation.
Yeah, we use a selection to do the table column, obviously.
Let’s go to the next slide. So the previous extent was pretty simple. Next is how we translate window. Here I’m not going to show the step by step ’cause hopefully you already get idea. But I’m going to just show the end result of this. So go to the next slide. The PySpark translation result of this is pretty similar to the Ibis code. Here the window object is translated into PySpark objects. It’s pretty easy because both sides are pretty similar. And we translating the windowed operation, which is, in Ibis, it’s swimming doll over, in PySpark it’s also swimming doll over. So these are pretty straightforward. Finally we just trying mutate.
So yeah, that’s the result for the window. And now let’s look at the aggregation. Aggregation is actually, also pretty straight forward, and not surprisingly. Here we’re translating the main and max function into spark functions. Main and max, the only differences in Ibis, main and max are members of the column object. In PySpark those are stand alone function, that takes a common object. But other than that, those are the same.
That’s the result of the application so pretty straightforward. And the point here is implanting a backend is not that difficult. The PySpark backend total is about 1500 lines of code, and it compared to the total amount of code in Ibis library that’s not a lot. And implementing those are usually pretty straightforward,
for example, it’s pretty clear how to translate table column and the addition and all that.
And then next, I wanna show you some more interesting examples in the translation. And this actually the highest motivation of having this translation that you’re hearing in the first place. For example, here, I’m doing a rank over window operation. And let me show you the result of the PySpark translation. It’s a little bit more complicated than before. Here we first are taking… We cast the type two long and we also need a minus.
I’ll subtract by one because the starting point of the rank is different between Ibis and PySpark. So the point of this is because, we as library authors can fix those things though users don’t need to ever worry about, for example, off by one error or the different T type error between those two things.
And the final example I wanna show here is a bsolean col, not any in Ibis. And when we translate this note, we find there’s no, not any function in PySpark. So we spend a little bit of time figuring out a sort of a trick is to use the max function, which if you have a column that consists of any true value as the max value. That will be true ’cause I guess true is greater than false. And then this basically how you would do, not any operation in PySpark. Again, as a user, you don’t really wanna figure this out, but by implementing this into the library, we take care of this for the users. So the only thing user needs to remember is do you use the, Not any, to do it as a logical operation.
Again this is a no direct translation rule. So that brings us pretty close to the end of the talk. Before I go to the conclusion, just a quick recap. So far, we have talked about concepts of execution and expressions. Separation of concern and see how that helps us in the original problem,
Ibis which is a library to implement these ideas. So if you get up the talk with a few things to remember, I think here are what I think are most important.
The first thing I think is very important is idea of separation of concern, the idea of separating expression and execution. And again, this is an idea borrow from Seiko and it’s extremely powerful one, which I think in a lot of the cases we show by changing from between pandas to spark. And because we don’t have this unified expression language, we have to rewrite a lot of code just to make it faster, which is sub optimal. Ideally, we wanna just like, we want to switch to NANA execution by just similar to how we switched to another SQL backend. It’s usually much easier to compare and to rewriting your entire code. The second thing I wanna sort of highlight here is, so far, we talk a lot about Pendas and Spark, but just to remind ourselves, those are not the only things. Maybe that’s the most popular things today. Well, there are very smart people trying to write new versions of pandas or new versions of a spark. So once those things come out or maybe the next version of distributed data frames or GPU data frames becomes mature, and then we want to use it, we don’t want to rewrite the code. We won’t have a way to switch out existing analysis code and take advantage of the newer and the faster backend. And that’s the power of Ibis, is this library allows you to achieve that. So in the future, you don’t need to worry about rewriting the code again. So that will conclude my talk. And thanks for coming again. Hopefully this is helpful to you, and now we are open to questions.
Two Sigma Investments
Li Jin is a software engineer at Two Sigma. Li focuses on building high performance data analysis tools with Python and Spark for financial data. Li is a co-creator of Flint: a time series analysis library on Spark. Previously, Li worked on building large scale task scheduling system. In his spare time, Li loves hiking, traveling and winter sports.