Koalas is an open source project that provides pandas APIs on top of Apache Spark. pandas is a Python package commonly used among data scientists, but it does not scale out in a distributed manner. Koalas fills the gap by providing pandas equivalent APIs that work on Apache Spark. Koalas is useful for not only pandas users but also PySpark users. For example, PySpark users can visualize their data directly from their PySpark DataFrame via the Koalas plotting APIs such as plotting. In addition, Koalas users can leverage PySpark specific APIs such as higher-order functions and a rich set of SQL APIs. In this talk, we will focus on the PySpark aspect and the interaction between PySpark and Koalas in order for PySpark users to leverage their knowledge of Apache Spark in Koalas.
Speakers: Takuya Ueshin and Haejoon Lee
– Hi everyone, let me start our talk. Our talk is Koalas: Interoperability between Koalas and Apache Spark. We will introduce how PySpark users effectively work with Koalas. I’m Takuya, a Software Engineer at Databricks. And I’m a Apache Spark committer and a PMC member. My focus is on Spark SQL and PySpark and now I’m mainly working on Koalas project. And Haejoon is also a Software Engineer at Databricks and a Koalas contributor. He will play a demo later to show you examples related to topics I present here. In this talk, I introduce Koalas and also pandas and PySpark. Then I show some important APIs to work with PySpark easily and efficiently, starting with conversion from and to PySpark DataFrame. Next, I’ll show some I/O APIs for Spark accessor. After we roughly see those APIs, we will see the demo by Haejoon. Firstly, let’s look over Koalas, pandas and PySpark. What is Koalas? Koalas was announced on April 24th at the previous Spark AI Summit. It’s a pure Python library, and provides a drop-in replacement for pandas enabling efficient scaling out to hundreds of worker nodes for everyday data science and machine learning. It trying to unify the two ecosystems with a familiar API and to help to make a seamless transition, between small and large data. For pandas users, it will help to scale out the pandas code and make learning PySpark much easier. And for PySpark users, it will provides more productivity, by it’s pandas-like functions. Let’s review what pandas is, pandas is the standard tool for data manipulation and analysis in Python. Pandas is deeply integrated into Python data science ecosystems, and it can deal with a lot of different situations. Python data scientists tend to start learning how to analyze data with pandas. Pandas works very well for small dataset in a single node, but unfortunately it doesn’t work well with bigger data sets. Data scientists have to learn PySpark or other libraries if they want to handle bigger data set. On the other hand, what is Apache Spark? Spark is a de facto standard unified analytics engine for large-scale data processing. It supports streaming, ETL, machine learning and so on. It provides PySpark API for Python as well as APIs for Scala, Java, R and SQL. Koalas DataFrame follows the structure of pandas, and provides pandas APIs. Also it implements the concept of pandas index or identifier. On other hand, PySpark DataFrame is more compliant with the relations or tables in relational databases. So the PySpark APIs are rather only concessions to perform relational operations. For example, Koalas provides series or index as well as DataFrame. And it can pick up some rows based on the index values for their origin index number. Even if Koalas objects have the same APIs as PySpark’s, the behavior will be different since it follows pandas behavior. Koalas translates pandas APIs into a logical plan of Spark SQL and the plan will be executed by Spark SQL engine. The execution will be done lately, so that Spark SQL optimizer can come in. Then let’s look into APIs for PySpark users to work with Koalas. The first APIs are to convert from and to PySpark DataFrame as it’s good for PySpark users to know how easily we can go back and forth between Koalas and PySpark DataFrame. You can convert PySpark DataFrame by just calling to_koalas function, like spark_df.to_koalas, which is automatically added to PySpark DataFrame when running input command. The function takes index_col parameter, and it indicates which columns should be used as index. If you don’t specify the parameter, default index will be attached. I will describe about a default index shortly. Here are the examples of to_koalas function. As you see, just calling to_koalas functions, converts PySpark DataFrame into Koalas DataFrame, and it looks like a pandas DataFrame. The left-hand side index_col parameter is not specified and an extra column appears as index. The right-hand side on the other hand, the extra column doesn’t appear and the column X is used as an index instead. The X column as index in the left example is the attached default index. Also, let’s see how to convert koalas DataFrame, back to PySpark DataFrame. Koalas DataFrame has to_spark function and also it takes it index_col parameter. Without the column parameter, the converted PySpark DataFrame doesn’t include the index columns. If you specify the parameter, the string will be used as the column name. If there are multiple columns used as index, you have to specify the same length of string released. Here are the examples, the kdf has three columns X, Y, Z and an index. But the left example show that the converted PySpark DataFrame doesn’t include the index columns. The right example, specifies the index_col parameter and the column remains. Here, we should consider about index and default index. As you saw, Koalas manages a group of columns as index and they should behave the same as pandas’. Where you have pandas’ DataFrame and convert it to Koalas, Koalas automatically takes up from the pandas DataFrame. But other data sources like PySpark DataFrame, don’t always have an index. What if the index_col parameter is not specified? Koalas will automatically attach a default index to the DataFrame. Koalas provides three types of default index. As each default index has pros and cons. We need to decide which index type we should use carefully. Actually the default index is used in a different situation. For example, you set index without any remaining index columns. So please don’t forget to consider about the default index anyway. The three index type are sequence, distributed sequence and distributed. The sequence is the different value. This guarantees the index increments continuously, but it uses no partition window function internally. It means all the data need to be corrected into a single node. This is basically for testing purpose and should not be used with a large dataset. The second option is distributed sequence. Once the index is attached, the performance penalty is not as significant as sequence type. It computes the index in a distributed manner, but it needs an extra Spark job to generate the global sequence internally. This index type can be used for most cases, but there are still some performance penalty. The third option is distributed. It has almost no performance penalty and always creates different numbers, but the numbers have an indeterministic job. It means this index type can never be used as indexes for populations on different DataFrames. Please also see the user guide for more detail. Secondary we’ll see I/O functions. There are a lot of functions to read and write data in pandas and in Koalas as well. The APIs in this list are equivalent to pandas functions. Koalas provides these functions, but these use Spark I/O under the hood. These API take index_col parameter the same as to_koalas or to_spark. Also take keyword arguments for the case, you want to specify more options for the underlying Spark DataFrame reader and writer. Koalas provides more I/O functions. The first pair is for reading and writing the existing tables in Spark SQL. The second pair can be used as almost the same as playing DataFrame reader and writer. There are only essential parameters for ease of use and keyword arguments to specify other formats specific options. Last but not least, Koalas also can read and write data tables if you have Delta Lake installed. Delta Lake is an open source storage layer that ranges reliability to data lakes. Please see Delta Lake documents for more detail. These APIs also take index_col parameters, and keyword arguments the same as the previous ones. Lastly, before the demo, let’s see Spark accessor. Spark accessor provides functions to leverage the existing PySpark APIs easily. These functions are to use Spark columns, or DataFrame within Koalas objects. Series.spark.transform takes a function to modify the columns and returns column series. The argument of the given function is actually Spark Column instead of Koalas object. So you can use Spark Columns APIs to modify the column. There are other similar functions, Series.spark.apply or DataFrame.spark.apply. We’ll see the examples in the demo later. The others must be more familiar to PySpark users. Check the underlying Spark data type or schema, explain the execution plan, Cache the DataFrame and the Hints and so on. Spark accessor provides those APIs. Then let’s see the demo. Please welcome Haejoon
– Hi everyone, I’m Haejoon from Databricks. And I’m gonna show you a simple demo for previous slide from Takuya.. Then let me start. First of course, we should import the Koalas package. I’m using Koalas 1.3.0 for this demo. And then set the base path for reading and writing the data and set the default index type to distribute it. Koalas has three types of default index as Takuya introduced in the previous slide. So let’s just start the demo with using Spark I/O. There are a lot of functions to read and write that are in pandas and in Koalas as well. So we can easily create a DataFrame from CSV file using read_csv, just like pandas. The results looks good, but I want to use employee ID as an index rather than attaching the default index here, because it could quite expensive. So to avoid attaching the default index, we should specify the index_col parameter, just like this. And now there is no default index and employee ID is used as an index and we can also create the multi-index by specifying the list of column names to the index_col parameter. Then now we have our multi-index in our DataFrame And not only pandas has I/O functions, but Koalas also has Koalas specific I/O functions, read_delta and to_delta is one of them. Different from the other five sources to read_delta, enables the users to specify the version of the table to time travel. So let me quickly show you how to use it. First set the path for data file and write the data file using to_delta. After writing, we can easily create DataFrame from data file using read_delta. It’s exactly same thing, what we did for CSV file. So we can see the create DataFrame from data path. And next example, I want to increase the employee’s salary by 20%. So first, I’m gonna just simply multiply the salary column by 1.2 and cast two into type, to keep the table salary column. In the result, we can see the increase in the salary. After then I’m gonna write the updated data file to the data path. And read again the data from same path. Then of course we can see the latest data, we just updated our salary. But at this point, read_delta allows the users to read a specific version of data. For example, we can read the data of version 0, like this. Then we can see the original data, which is not updated on salary. Let’s go onto the next part, conversion from and to Spark DataFrame. For our PySpark parameter it’s good to know how easily you can go back and forth between Koalas DataFrame and PySpark DataFrame. So first let’s see, how can we convert Koalas DataFrame to Spark DataFrame? So all we have to do it just simply use the to_spark on the Koalas DataFrame. The results looks good, but the index is lost because I didn’t specify the index_col parameter when I run to_spark. So there is no employee ID. So that’s why this code will less analysis exception, because we have no employee ID, in our Spark DataFrame. Again, we should specify the index_col to preserve the existing index column. And now we can see the employee ID from creating this Spark DataFrame. Next convert, Spark DataFrame to Koalas DataFrame. When importing the Koalas Spark case, it automatically attaches the to_koalas function to Spark DataFrame. So we can simply use this function to convert this Spark DataFrame to Koalas DataFrame. And again, we can see the auto generated default index here, which could quite expensive. So to avoid attaching the default index, we should specify the index_col like this. In the result, there is no default index, but employee ID is used as an index. Next part is the Spark accessor. Koalas provides the Spark accessor for users to leverage the existing PySpark API more easily. So first let’s see the Series Spark accessor. It is possible to apply the Spark function directly to the Koalas series with this Spark accessor. So in this example, I want remove the dash from phone number series using transform supported by Spark accessor. So we can use any Spark functions for a Spark column in the Spark transform. So in the result there is no dash in the phone number series anymore, but if the result has a different length of series, we should use the spark.apply rather than spark.transform. So in this code, I just want to get our maximum salary from salary column using Spark accessor. we can see the proper reader, which has deployments from the original series. And we also have a DataFrame Spark accessor. It is also possible to apply the Spark function directly to the Koalas DataFrame with the Spark accessor. So in this code, I wanna filter the only Gmail users from DataFrame using Spark accessor. So here we got our result, but there is auto generated default index again. So we also can specify the index_col parameter here to avoid attaching the default index. And now we can see the employee ID is used as an index in the result. Let’s go onto the next part. We also can check the underlying Spark data type or schema with Spark accessor. So we can check the schema by using schema or print.schema function. First let’s see the schema function works. We can see the schema with columns in the result. So there is Gmail users and Gmail address and also their type. And next, let us see the, how print.schema function works. And also we can see the schema of columns in the result. But we cannot see the employee ID column in the both print.schema. This is because I didn’t specify the index_col parameter when I run the functions. So if you want to see the index column in the print.schema, you can also use index_col parameter for data functions. And next take example is, explain Spark plan. So when you want to know the column Spark plan, we can use explain function using Spark accessor, just write PySpark. So in the result, we can see the physical plan, which is generated by under underlying Spark planner. And next Cache, the Spark accessor also provides cache related functions. So we can simply use the cache in the contest manager like this. So in the result, we can see those storage level for column cache DataFrame, and you should know, after the contest manager the cache will automatically be unpersisted. If you want to use another storage level, you can use the persist function and specify the storage level you wanna use. And now we can see the change in the storage level in the result, or we just can cache the DataFrame outside of the contest manager and unpersist it later. So first cache the DataFrame by using Spark accessor and next, return easily on persist cache the DataFrame by using unpersist function supported by Spark accessor. And please don’t forget to unpersist after using cache in this case. And the last part of this demo is Hints. There are some join-like operations in Koalas, such as merge, join and update. So of course the actual join method depends on the underlying Spark planner under the hood, but we can still specify our hint by using the broadcast or hint function. So here we have a two Koalas DataFrame, kdf1 and kdf2, and I wanna merge them. Before I merge them, So before I actually run the code, I want to know the intel Spark plan. So I’m gonna use explain function again. Now we can see the physical plan in the result, and I want to give some hint for addressing the plan. So in this case, we can give the hint by using broadcast or hint function. So first example is broadcast, And now we can see the broadcast has a Join in the physical plan, different from the previous Spark plan. And next example is, basically same as above but I use hint function, supported by Spark accessor, and keep the parameter as broadcast. So as we can see, the result also same as above but in particular hint is more useful if the underlying Spark is 3.0 or above, because more hints are available in Spark 3.0. For example, a broadcast mode shuffle has shuffle_replicate-NL and etc are available in Spark 3.0. So I’m gonna use the SHUFFLE_HASH in this example. So we can see the shuffledHashJoin in the physical plan. Yep, basically that’s all from this demo. And thanks for your interest in Koalas and the support watching this demo. Enjoying the Summit, thanks.
– Well, please get started soon. If you are a database user, you can use Koalas without installation on Databricks Runtime 7.1 and higher. Otherwise you can install with PyPI or Conda. If you don’t have a Spark Cluster yet, you can also install PySpark with PyPI or Conda for your local computers. There are other useful resources, you can take a look. If you have any suggestions, questions, or future request, please feel free to file an issue on the GitHub issue page or directly submit a PR. Your contributions are always welcome.
Takuya Ueshin is a software engineer at Databricks, and an Apache Spark committer and a PMC member. His main interests are in Spark SQL internal, a.k.a. Catalyst, and also PySpark. He is one of the major contributors of the Koalas project.
Haejoon is a software engineer at Databricks. His main interest is in Koalas and PySpark. He is one of the major contributors of the Koalas project.