Koalas is an open source project that provides pandas APIs on top of Apache Spark. Pandas is the standard tool for data science and it is typically the first step to explore and manipulate a data set, but pandas does not scale well to big data. Koalas fills the gap by providing pandas equivalent APIs that work on Apache Spark.
There are also many libraries trying to scale pandas APIs, such as Vaex, Modin, and so on. Dask is one of them and very popular among pandas users, and also works on its own cluster similar to Koalas which is on top of Spark cluster.In this talk, we will introduce Koalas and its current status, and the comparison between Koalas and Dask, including benchmarking.
Takuya Ueshin: Hi, everyone. Let me start our talk. Our talk is Koalas. Does Koalas work well or not? We did some Koalas benchmark, so we will share how we did the benchmark and it’s result here.
I’m Takuya, a software engineer at Databricks and an Apache Spark committer and a PMC member. My focus is on Spark SQL internal and PySpark. And now, I’m mainly working on Koalas project. Xinrong is also a software engineer at Databricks and a Koalas maintainer. She will show the benchmark and its result later.
In this talk, I will introduce Koalas and also pandas and PySpark. Then, I will show some internal structure of Koalas to know how Koalas works with PySpark. After that, switching to Xinrong and she will show the benchmark. We compared the benchmark result with Dask. So she will roughly introduce Dask and show the result.
Firstly, let’s look over Koalas, pandas and PySpark. What is Koalas? Koalas was announced on April 24th, 2019 at the previous Spark AI Summit. It is our pure Python library and provides drop-in replacement for pandas, enabling efficient, scaling out to hundreds of worker nodes for everyday data science and machine learning. Each strength to unify the two existing with our familiar API and to help to make a seamless transition between small and large data. For pandas users, it will help to scale out the pandas code and make learning PySpark much easier. And for PySpark users, it will provide more productivity by it’s pandas like functions.
Next let’s review what pandas is. Pandas is the standard tool for data manipulation and analysis in Python. It is deeply integrated into Python data science ecosystems, and it can deal with a lot of different situations. As you can see in the Stack Overflow Trends. Pandas is still growing rapidly. Python data scientists tend to start learning how to analyze data with pandas. It works really well for small datasets in a single node, but unfortunately it doesn’t work well with bigger dataset. Data scientists have to learn PySpark or other libraries if they want to handle bigger dataset.
Okay then, what is about Apache Spark? Spark is a de facto standard unified analytics engine for large scale data processing. It supports streaming, ETL, machine learning and so on. It provides PySpark as an API for Python, as well as APIs for Scala, Java, R and SQL. Koalas data frame follows the structure of pandas and provides pandas APIs. Also it implements the concept of pandas index or identifier. For example, Koalas provides series or index as well as data frame. And it can pick up some rules based on the index values or their origin index number. On the other hand, PySpark data frame is more compliant with the relations or tables in relational databases. So the PySpark APIs rather only essential to perform relational obligations. Even if Koalas objects have the same APIs as PySpark’s, the behavior will be different since it follows on this behavior.
Koalas translates pandas APIs into a logical plan of Spark SQL and then a plan will be executed by the Spark secret engine. The execution will be done lazily so that Sparks secrets optimizer can come on. When a user tries to show, collect or store the data, the whole plan will be passed to the Spark secret engine, then the engine optimizes the plan, compiles and executes it.
Then let’s dive into the Koalas internal structure. Here, I talk about the internal structure named InternalFrame and how it works between Koalas DataFrame and PySpark DataFrame. InternalFrame is internal immutable structure and it holds mainly four types of data. The first one is the current PySpark DataFrame. The whole operations are based on this PySpark DataFrame. Secondly, it manages PySpark current object for both index and date columns separating it from PySpark DataFrame. This is used for maximizing map side operations without shuffling as well as the mapping between columns index or date columns and PySpark columns.
The current objects are derived from the PySpark data frame and contain the updates from it. For example, DF+1, don’t need to create a new PySpark data frame, but only updates the columns as Col+1 for each column. We can combine the column updates and the underlying PySpark DataFrame to corroborate the result later. Also, it manages index names and data column names because Koalas index names or date column names are not always equal to PySpark DataFrames column names. Since it can be int, float, et cetera, as well as string. And also it can be multi-level values like couples. Whereas, PySpark column names must be only strings. Dtypes are the other metadata. Dtypes in this case are equivalent to pandas dtypes, not sparks data types. This is because there are some pandas dtypes which can’t be represented by only sparks data types.
For example, pandas has categorical dtype, periodic dtype which are not supported in Spark SQL, or they are [inaudible] int, float, string, billion types, in addition to the normal equip corresponding types. Last but not least, it provides the way to combat from pandas DataFrame to PySpark DataFrame and extract metadata. Conversely, the way or convergence from Koalas DataFrame to pandas DataFrame it’s provided here. It combines the PySpark DataFrame and columns, then convert it to pandas DataFrame. The directly converted pandas DataFrame could have a different column names or dtypes. So it fixes them with the other metadata. The result must be the expected pandas DataFrame.
The basic structure is like this, Koalas DataFrame has InternalFrame in it and InternalFrame we pass PySpark DataFrame and hold some metadata like Spark columns, index names, column names, dtypes and so on, as I mentioned. Well, when I use Koalas API, Koalas DataFrame updates PySpark DataFrame and metadata in the InternalFrame to behave like the corresponding pandas API and creates or copies the current InternalFrame with the new stats then it turns up new Koalas DataFrame.
Sometimes the update of PySpark DataFrame is not needed, but only the update of metadata is needed. New structure will be like this. For example, renaming column names or index names, doesn’t need to use a new PySpark DataFrame or DF+1, I mentioned before also doesn’t need a new one. It can be done only by updating the Spark columns instead. Koalas tries to avoid creating a new PySpark DataFrame if possible, so that Koalas can easily calculate something like DF1 plus DF2.
If the underlying PySpark DataFrames of DF1 and DF2 are the same, it will be just a Spark column operations. Otherwise, we need joints to combine the two PySpark DataFrames. In any case Koalas DataFrame will never mutate InternalFrame, but creates or copies the InternalFrame to keep it immutable.
This is a rough overview of Koalas DataFrame. I hope this helps you understand what’s happening in Koalas. Now let me hand it over to my colleague, Xinrong, who will walk through the benchmark.
Xinrong Meng: Thanks Takuya great introduction of Koalas and it’s internal. Then, does Koalas work well? We’ve conducted a benchmark against Dask to answer that question. What makes Dask special? Dask is a parallel computing framework. It’s written in pure Python, so it can leverage the excellent Python ecosystem. It parallelize code basis like NumPy using blocked algorithms and task scheduling. Block algorithms can evaluate a large problem by solving many small problems. Then what Dask task scheduling mean? A task represents an atomic unit of ORC to be ran by a single worker. A task graph is built from parallel collections, a scheduler executes task graph on parallel hardware. There are single machine scheduler and distributed scheduler. Dask is great.
How is Dask different from Koalas? First, they have different execution engines. Koalas is built on top of Apache Spark, which is a unified analytics engine for big data. Dask itself is a graph execution engine. Second, Koalas aims to provide a single code base that works with both pandas and Spark. If you are already familiar with pandas, you can be productive with Spark immediately. Dask aims to scale pandas workflow. Third in Koalas current plan is an entry point to understand the query execution. Whereas Dask has task graph and task scheduler as we mentioned at the beginning. Last, Koalas employs pandas DataFrame APIs, Dask has parallel collections, such as Array, DataFrame and Bag.
Clearly Dask and Koalas have many differences, but they can both scale pandas. Wouldn’t a performance comparison be interesting? So we conducted a benchmark. How did we set up the benchmark? The dataset contains about 150GB of taxi trip ride cars. We benchmark common operations, such as basic statistical calculations, joins and grouping.
These operations were applied to the whole dataset. Then the filtered data and finally the cached filtered data. Filtered data is 36% of the whole data. In this way, we can understand the impact of Lazy evaluation, caching and related optimizations in both system. The scenario used in this benchmark was inspired by [inaudible] they have benchmarks. Then where did we run the benchmark? The benchmark was ran on top of Databricks Ranpak. We ran it on a single machine and then in a clustered. The cluster has four nodes. It has the same total memory as the single machine. We ensured both Dask and Koalas to run in the distributed mode.
Here’s an overview of benchmark results. Koalas out perform, Dask in the benchmark. Considering simple average in the local mode, Koalas is four times faster. In the cluster mode, Koalas is around eight times faster. Koalas APIs are written on top of PySpark. So the results will apply similarly to PySpark.
Are you interested in detailed results? Let’s look at some plots. Each bar in the plots shows the execution time for an operation. The ratio of Dasks execution plan to Koalas execution plan it’s annotated per operation. For example, 2X means Koalas is double as fast as Dask. Plots on the left hand side are for local mode. Whereas plots on the right are for distributed mode.
First let’s look at the operations that apply to the whole data set. In local mode, Koalas was on average 1.2 times faster. The operation count when joining was 17 times faster. In distributed mode, Koalas was on average double as fast as Dask. Count index operation was 25 times faster. Then we applied the same operations to the filtered data. Filtering is computed together with the operations lazily. In local mode, Koalas was on average six times faster. The count operation was 11 times faster. In cluster mode, Koalas was on average nine times faster. The count index operation was 16 times faster.
Finally, we cached the filtered data then apply the same operations. Here, the data was fully cached before measuring any operations. In local execution, Koalas was on average than 1.4 times faster. The operation count when joining was six times faster. In cluster mode, Koalas was on average five times faster. The count index operation was 28 times faster.
Are you wondering why Koalas is fast? As we mentioned, Koalas is based on Apache Spark. Either core or Spark SQL is the catalyst optimizer. On top of catalyst, we have code generation to generate Optimax bad calls at Ranpak. They both accelerate Koalas significantly. To understand the influence of catalyst, we look at the main calculation on the filtered data. We filtered down the original data at [inaudible] and then calculate the average fig. The current plan before the catalyst optimization uses brute force to read all columns.
And then performs projection multiple times with the filter in the middle. This is inefficient because it needs already more data. Spending more time on I/O and performs the same projections, multiple times. The query plan becomes much simpler after catalyst optimization. Now it only rates the columns needed for the computation and filters data in the source level that saves memory usage.
Let’s move on to the code generation. We take two operations on the filtered data for example, without code generation, mean calculation takes eight seconds and count when joining operation it text 27 seconds. With code generation, calculating the mean takes 1 second and the join count operation takes two seconds. It’s a huge improvement.
The benchmark results demonstrate that first, SQL optimizers such as the plan in Spark SQL can improve performance of DataFrame APIs. Second, caching accelerates both Koalas and Dask significantly. Third, in the benchmark, Koalas outperforms Dask in most of the use cases.
I want to add that, different projects have different focuses, even though the benchmark we ran was reproducible and we try our best to be fair throughout the process. They might be unconscious bias. The main goal of the benchmark is to establish SQL optimizers influenced to DataFrame APIs. If you’d like to know more about the benchmark, please check out the reference blog post. All right, that’s the end of our benchmark section.
Then what’s new with Koalas? Since Spark AI Summit 2020, Koalas has released nine versions. Let’s take a look at main changes. Plotly support is improved by implementing PI, histogram and box plots with plotly. Plotly is our default plotly backend now. Basic extension dtypes are supported such as categorical data and int8 data. More texts of index are introduced such as date tab index and int64 index. API extensions are supported, so you can register your custom accessories to date of frame, series and index. Python support are enhanced by allowing pandas instances and column names in the return tap.
There’s more exciting news. We are going to port Koalas into PySpark. With the pandas APIs layer on PySpark, users can leverage their existing Spark cluster to scale their pandas workloads. They can also easily switch between pandas APIs and PySpark APIs. If you’d like to know more, please check out the JIRA ticket. That’s the end of Koalas updates. Thank you so much for having us Takuya and I are happy to answer any questions you may have.
Takuya Ueshin is a software engineer at Databricks, and an Apache Spark committer and a PMC member. His main interests are in Spark SQL internal, a.k.a. Catalyst, and also PySpark. He is one of the...
Xinrong is a software engineer at Databricks. Her main interests are in Koalas and PySpark. She is one of the major contributors of the Koalas project.