Koalas: Pandas on Apache Spark NA

Download Slides

In this hands on tutorial we will present Koalas, a new open source project. Koalas is an open source Python package that implements the pandas API on top of Apache Spark, to make the pandas API scalable to big data. Using Koalas, data scientists can make the transition from a single machine to a distributed environment without needing to learn a new framework.

We will demonstrate Koalas’ new functionalities since its initial release, including Apache Spark 3.0, discuss its roadmaps, and how we think Koalas could become the standard API for large scale data science.

What you will learn:

  • How to get started with Koalas
  • Show easy transition from Pandas to Koalas on Apache Spark
  • Demonstrate similarities between Pandas and Koalas APIs for DataFrame transformation and feature engineering
  • Use it for single machine Pandas vs distributed environment of Koalas

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Hi, everyone. Welcome and thanks for joining this talk where we will be exploring Koalas, an open source Python package that implements the pandas API on top of Apache Spark. And before we kick off, just a little bit about myself, my name is Niall Turbit, and I am a Data Scientist on the Professional Services and Training Team at Databricks. With this, my time is split pretty evenly between working with our customers to build and deploy scalable machine learning solutions, as well as deliver training classes which focus on data science and machine learning with Spark. Prior to Databricks, my background has involved, building scalable data-driven and machine learning solutions across a range of domains, such as supply chain and forecasting, logistics, optimization, and also recommender systems. Just to set the scene with regards to what we will cover today. So we’re going to kick off with a brief comparison of pandas and Spark, discussing their origins, having a brief look at some code, then we’re going to explore some of the justifications for the Koalas project. So why was the project conceived? What are the issues that the project seeks to address? We’re also going to discuss high Koalas is implemented. And in terms of how we can recreate the pandas API on top of Spark, and for the meat of the talk, we’re gonna dig into an in depth walkthrough of using Koalas to do some end-to-end data analysis and modeling. To finish things off then, we’re also going to set out the roadmap for future, what’s gonna come from Koalas going forth?

Typical Journey of a Data Scientist

So let’s take a step back and think of the typical journey of a data scientist. And if you look at when a data scientist is going to do their education via online courses, books, universities, what we see is that pandas is the standard tool when it comes to working with smaller datasets due to its ease of use for beginners, and it’s heavily promoted via these mediums. Only when working with larger datasets does Spark actually then come into play.

l pandas

So in terms of pandas origins, so it’s almost as old as Spark. It’s super successful due to its range of functionality and ease of use, we see how it’s deeply integrated into the Python data science ecosystem, built on top of numpy, we’ve got matplotlib, scikit-learn, all of which it ties together nicely. And we see how it can deal with a lot of situations varying from just basic statistical analysis all the way to actual modeling and how you can prepare your datasets prior to fitting machine learning models. So with regards to Spark, Spark is pretty much the de facto standard tool for doing data science and data engineering at scale. And especially, whenever it comes to large scale data processing, we see how Spark is this integrated toolbox that can be employed to a wide range of use cases, all under the same roof. A large factor for its rise to popularity and can be attributed to the fact that it caters to different communities across a range of API’s ever since its inception, when it was created by the founders of Databricks at UC Berkeley.

A lot of the extra subtleties and differences come from the underlying implementation of Spark versus pandas.

A short example

Just to illustrate these we can look at a short example. So when looking at these differences between pandas syntax and PySpark syntax, it’s evident that coming from a panda’s context and wanting to implement the same operation in PySpark is not inherently intuitive for a pandas user. PySpark isn’t this intuitive for those users coming from pandas ecosystem, and in many instances, it can be more cumbersome or verbose in comparison to pandas. If we take, for example, reading in a CSV file, whereby we want to rename the columns and create a new column that multiplies this column x by itsef, we see in pandas simply pd.readcsv after we have imported pandas; however, with PySpark, we have to do a bit more verbose syntax. And as I said, it’s not as intuitive as one coming from pandas would think.

And this can often be seen as the buyer to entry for those coming from a pandas background but wanting to utilize the distributed nature of Spark.


And this is where Koalas comes into play. Maybe it’s the key motivate key motivation for the Koalas project, which was the ninth last year April. Koalas is a pure Python library, so it can be installed like any other Python library via PIP, Conda, or from source. The core aim for the project is to provide all the pandas API on top of Spark, thus giving users the scalability that you would get inherently from Spark but with the familiarity of pandas.

In terms of growth, pandas has gained significant traction since its release last year, and having recently passed over 30,000 daily downloads, we see that the GitHub project actually has over 2000 GitHub stars right now.

short example

So the reason for this popularity is that clause allows you to take existing pandas code, and with very minimal change, you can convert it to something that runs on top of Apache Spark. Just to demonstrate this, we’re going to take the same code example that we had previously, we’re gonna read the CSV in, we’re going to rename the columns and then create this new column x2 that multiplies X by itself. If we want to do the exact same in Koalas, all we simply have to do is import databricks.koalas as ks, using ks as our alias for the Koalas library. And then instead of pd, we can instead use ks.read.csv. So with very minimal code change, we can do the exact same as pandas albeit utilising Spark under the hood.

With regards to the current status of the Koalas project, there’s a really active community backing Koalas with releases coming out every two weeks. As of the most recent release, which is Koalas 0.33, a significant majority of the pandas API has actually been implemented in Koalas. We see that 70% of the series API is there, nearly three quarters of the data from API is there, over 50% of the index, multiindex API is there, we see nearly 70% of the DataFrameGroupBy and SeriesGroupBy API is there with a significant majority of the plotting capabilities that you would have in pandas, also available on Koalas.

Spark vs pandas – Key Differences

So whilst the aim of Koalas is to replicate the pandas API on Spark, there are some significant differences in the underlying design of Koalas that users should understand. When changing from pandas to Spark, how do these differences translate? First, whilst both Spark and pandas have this concept of a DataFrame, with pandas a DataFrame is a mutable object, you can update rows or columns of a DataFrame inplace. In the case of Spark, we’re chaining together operations against an immutable DataFrame. Spark DataFrame syntax has been heavily influenced by SQL syntax, and as such, it lends itself to these large differences that we see between the syntax of PySpark, for example, against pandas. Most of the differences that you see between pandas and Spark arise from the fact that most of the operations in Spark do not get evaluated immediately. As such, it is lazy by nature. So operations that are only executed when an action is called, and be that evaluating some values when calling an action or writing it an external data storage. Another big difference arises from the fact that Spark is distributed by nature. So Spark does not maintain order of rows, whereas pandas gives us an array of rows, where pandas has an inherent associated index, Spark can be thought of as a bag of rows without any inherent order. Lastly, the differences that we see between Spark and pandas come from disability of Spark to scale horizontally by adding additional nodes to a given Spark cluster. This gives us huge advantages in being able to scale performance. With pandas, however, by design were restricted to the memory capacity of a single machine because pandas inherently is going to be the single node library.

In terms of the Koalas architecture,, before we can get under the hood of Koalas, we must briefly cover where koala sits in relation to Spark and pandas. You can think of Koalas as a Lightweight API layer sitting on top of both Spark and pandas, where we can reap the benefits of having the scalable advantages of Spark through its core engine, through its data source connectors, through optimizations that we get via catalyst and tungsten, as well as the Sparks SQL engine, and the underlying DataFrames API that Koalas then plugs into. We also then get a seamless transition that we’re able to go from Koalas to pandas that utilizes Spark under the hood and also PyArrow to facilitate that conversion. To understand how Koalas translates the pandas-like API to Spark, we must first understand some of the internal structures being used. InternalFrame is the internal immutable structure which holds the current Spark DataFrame along with the immutable metadata Koalas requires. The InternalFrame manages the mappings from Koalas column names to Spark column names. Further, in order to replicate the index or multiindex of pandas, Koalas requires that we map the columns of a Spark DataFrame representing the index to the appropriate Spark column names. The InternalFrame under the hood of Koalas then can be thought of as this bridge between Spark and Koalas, and cannot optionally then enable the conversion between a Spark DataFrame on a panda’s DataFrame. The combination then off the underlying Spark DataFrame with the InternalFrame is then what allows us to mimic that pandas API with Koalas.

So let’s demonstrate a few ways in which Koalas API then calls work. So if a user we’re to make an API call via Koalas the Koalas, the Koalas DataFrame updates the Spark DataFrame and metadata in the InternalFrame, this then creates or copies the current InternalFrame with the new state, which then returns a new Koalas DataFrame.

InternalFrame – Metadata update only

Sometimes, an update of the Spark DataFrame is not actually necessary, and we only require to update the metadata of the InternalFrame. Examples of this would be renaming a column or setting the index by calling .setindex on the Koalas DataFrame itself. On the other hand, sometimes an API call will require Koalas to update the internal state instead of returning a new DataFrame. For example, if we provide the argument inplace=true, the underlying sequence of operations using the InternalFrame would look like such. One thing to note is that a Koalas DataFrame never mutates the InternalFrame, but creates copies of the InternalFrame itself to keep it immutable.

So one significant difference between Koalas and pandas arises from the fact that Koalas is seeking to replicate the indexing functionality present in pandas. To do so Koalas via the underlying InternalFrame, just discussed, manages a group of columns as an index. With these columns in an underlying Spark DataFrame acting as an index, Koalas can then thus behave in the same manner as pandas. If we do not specify a specific column when creating our Koalas DataFrame, what Koalas will do is automatically attach what is known as a default index. Now, there are several different types of default index which a Koalas usually can configure. However, each of these index types has its pros and cons. Choosing an appropriate default index type is thus hugely important as users can often run into slow performance if their choice of default index type is not well-suited to the use case at hand.

Default Index Type

So let’s dig into the three different types of default index that we have available to us. The first of these is the sequence index. Sequence is used by default in Koalas, implementing a sequence that increases one by one using PySpark window function without specifying a partition. As this uses a no partition Spark window function internally, what it means is that all the data needs to be collected on a single node. If this node then subsequently doesn’t have enough memory, a user can encounter an out of memory error. As such, it is advised to avoid the use of the sequence index when the dataset is large. When working with a larger dataset, what we can do is make the index distributed by configuring the default index to either distributed sequence or distributed. Just to run through these then, distributed sequence implements a sequence that increases one by one using a group-by and group-map approach in a distributed manner. Subsequently, when attaching this index, a Spark job is run to compute the indices of the DataFrame, and the resulting row order could actually be different from the original order when the data was loaded. This still results in a sequential index globally, but will not result in all of the data being brought onto a single node. Once this index is attached, there’s actually not a significant penalty against using this index type over sequence. As such, if you necessarily do require a continuously increasing index globally on a large dataset, distributed sequence is the recommended index type. One thing to note here is that if more data are added to the data source after creating this index, a continuously increasing sequential index is not guaranteed. Lastly, we have the option of distributed distributed index type. Distributed index implements a monotonically increasing index type using PySparks monotonically increasing ID function in a fully distributed manner. The values of this index are actually non-deterministic in that the underlying function will always create different numbers and the numbers will have an unexpected gap. What this means is that this index type can never be used as indices for operations on different DataFrames such as joins. However, if we do not require a strictly continuous increasing sequence, this index should be used. Performancewise the index has little bounty compared to other index types. With all these considerations in mind then, one can configure their option by using compute.defaultindextype in Koalas. So with this knowledge in hand, let’s switch over to a demo where we will see Koalas in action and we’ll conduct an end-to-end analysis using Koalas and exploring how we can reap the benefits of the distributed nature of Koalas and compare against pandas. Actually, can I just rerecord that? It was a bit wordy.

– Yep. – Thanks. So with this knowledge in hand, let’s switch over to a demo, where we will see Koalas in action. With this demo, we look to build on the knowledge of what’s going on under the hood when using Koalas and go on a bit of a deeper dive of the Koalas API itself. The following is a Databricks notebook which can be downloaded at the following bitly link, and once downloaded, you’ll be able to import this notebook into any Databricks environment, be that the free community edition or pay the current environment. For this demo in particular, we’ll be utilizing a dataset which contains the occurrence of plant and animal species in Australia, with each observation being a recording of the number of a given flora or fauna from a five by five kilometre square area in the country.

In particular, we’ve obtained this dataset from the global biodiversity information facility, which is a fantastic resource for biodiversity data collected by volunteers all across the world. If you would like to follow along with this notebook, and in particular, use the same dataset that we’re using here. If you follow our data citation, and in particular the occurrence dataset link that we have provided, you’ll get the exact same dataset that we’re using here. Note that we have already filtered this to be occurrences of species in Australia from the start of 2010 to May 2020.

In terms of the outline for our demo, what we’re gonna kick off with is loading our data. So firstly, examining if we could potentially load the dataset using pandas, but then go on to explore how we could do so with Koalas, we’ll then embark on some exploratory data analysis, looking at null values, value kinds for categorical columns, and also utilizing the plotting functionality of Koalas to get a deeper understanding of our full dataset. We’ll then move on to doing an analysis of Koalas numbers in Australia using Koalas, and in particular, what we want to do is investigate if our data reflects news in recent years of declining population numbers of Koalas in Australia. We’re then going to finish off with a demonstration of how we can use Koalas to parallelize model training. In particular, we will use the popular open source time series forecasting library, Prophet, to show how we can utilize the underlying distributed nature of Spark which Koalas employs to train independent models in parallel and forecast population numbers of animals in Australia. In addition to this, we’ll log these plots and forecasts to MLflow, which is an open source platform for managing the end-to-end machine learning lifecycle. A couple of things to call out here before we dive into things are the Databricks Runtime version and library dependencies that we have here. So we’ve tested this on the most recent Databricks Runtime version, in particular, the ML Runtime version, which is 7.0 ML Beta.

And on top of that have installed Koalas, the most recent package on FbProphet, also the most recent package version to our cluster.

The overall aim that we seek to accomplish with this notebook is demonstrating the ability of Koalas to succeed with large data where pandas at times can fall short. With this, we wanna showcase the similarities of the Koalas API to that of pandas and how it enables a seamless transition, especially for those coming from a pandas background but want to harness the power of the distributed nature of Spark. So with that, let’s dig into loading our data.

So as mentioned, we’re going to be using data detailing the occurrence of flora and fauna in Australia from the first of January 2010 to the first of May 2020. Let’s first have a look at the underlying CSV file that we want to read in here. Note that we have stored the CSV file in the Databricks file system. The Databricks file system is a distributed file system which is mounted into a Databricks workspace and is available on Databricks clusters. We can use the %fs magic command in Databricks and further things calling ls to have a look at the sais2020csv directory that we have created. We see that this holds the gbif csv file that we would like to load in. In particular, one thing to note is the size of this file, we see that it’s nearly 18 gigabytes in size, which could potentially pose some problems if we want to load with pandas.

So due to its size, we’re going to be inherently limited by the amount of memory that we have on a single machine to load this dataset. As pandas is an inherently single machine library, when we go to load this dataset onto our cluster that we’ve set up here, we’re going to be loading it onto the driver itself, so the driver off that cluster. If we did want to load this dataset using pandas, we could follow standard protocol importing pandas as pd, calling pd.readcvs, passing in our CSV file path, and then also our separator note here that this is a tab separated CSV file.

If we were to try to do this, what we’re gonna run into is an out of memory error, whereby the driver memory is less than 18 gigabytes, but we’re trying to load all of that data on to the driver. You would experience this on your own machine, if you had one, I guess, if you had a a small, if you didn’t have memory of at least 18 gigabytes.

So how can we go about actually loading the CSV file? Well, one way to approach this is using Koalas. If we wanted to use Koalas, one thing to note is, again, as we’ve discussed, Koalas is recreating the pandas API on top of Spark, and with that, what we’re able to do is, load the CSV file from above, using Koalas equivalent readcsv function. Before we do this, let’s import our Koalas package, but also set the default index type that we’ve just previously discussed.

To import Koalas, what we do is call importdatabase.koalas as ks, using ks, which is the kind of standardised alias that Koalas is referred to. And what we’re gonna do is set our default index type to distributed. Rationale for selecting this is that our dataset is relatively large and we also don’t explicitly require the index to be monotonically increasing one by one. For a further discussion on this, I have provided a link to some more information on these index types. But for now, let’s set this default index type by calling ks.set option.

Before I actually go on to calling readcsv, what I want to call out is that we could do this, but we could do so in a much more performant way by using Delta as our underlying data source. So one other advantage that we get with Koalas versus pandas is the fact that we can load data from a Delta table.

Delta Lake in particular is a highly performant way of storing your data. So it’s an open source storage there that provides ACID transactions, it provides scalable metadata handling, as well as a way to unify both streaming and batch data processing. With Koalas, what we can do is read from a given Delta table and do so in a parallel manner.

To do so, what we can do is call ks.readdelta, and provide the sais 2020 Delta table that we have.

We can check the size of our Koalas DataFrame by calling .shape on this. Note that the actual DataFrame itself through the Koalas DataFrame that we have just loaded as kdf, the significant size, so it’s 35 million rows with 50 columns.

One other thing that we get with Koalas is the fact that we can utilize SQL queries against DataFrame. So because clause is effectively a wrapper on top of Spark, we can effectively run the SQL query, utilizing under the hood, the Spark SQL engine. So to start off with, let’s issue a SQL query, and we’re going to query this genus Phascolarctos. It seems particularly inappropriate for those that haven’t come across it before. Phascolarctos is the genus name for Koalas.

So providing our SQL query, what we can do is call SELECT COUNT, pass it in as ks.sql, and what we see is that we have 44,000, or well, just over 44,000 instances of observations with Koalas.

Furthermore, we can grab our column names by simply doing kdf.columns as you would with a panda’s DataFrame. And with that, let’s get started on some exploratory data analysis. Let’s start digging into actual data itself using Koalas.

So we’ve loaded our dataset and we wanna conduct some basic analysis. Similar to pandas what we get with Koalas is the exact same functionality with regards to calling .describe on a Koalas DataFrame. So something to note here is that we’re getting the exact same output in terms of the number of nonmissing values, the mean, standard deviation, min max, as well as the median, and the upper and lower quartiles.

To get null values, we can similarly use pandas-like syntax to call kdf.isna.sum. For those coming from a PySpark background, this is significantly not as verbose as you would do so with PySpark.

One thing that we wanna call out is that a lot of the following analysis that we’re gonna do is going to be based off counts of given plant and animal species. So for this, what we want to do is keep individual count where it’s not null and also where we actually have recordings for both month and year. So we’re passing in these conditions and filtering down our kdf DataFrame, our Koalas DataFrame.

A popular functionality in pandas that you don’t get with PySpark is a value count functionality.

So while you can do this in PySpark, it’s a lot more verbose.

And so what we can do is grab our kingdom column from our Koalas DataFrame and call .value counts to get a count of the instances for our given categorical column. In particular, we’re going to normalize this to get a true indication of what percentage. We see that Animalia make up a significant proportion with plants forming the next most number of counts.

Something which Koalas gives us, which is superpowerful, is the fact that we can create data visualization using pandas-like syntax, but being able to do so over the entire dataset, which in this case is it’s 18 gigabytes in size. So let’s call a value count of our kingdom but get a bar plot. And what we can see here is, again, it confirms what we’ve seen above with regards to the Animalia Kingdom from a significant proportion of the observations that we have in our dataset.

Let’s dig into some analysis of our columns. So in particular, what we’re gonna be doing is looking at our class column. So class, just to call out, is a biological classification. So class is a taxonomic rank, which then falls under phylum. Within biology, we have different types of classification.

We see that we have 131 unique classes.

And what if we wanted to get the total count, and the number of distinct recordings, and the average count per recording for each of these classes.

So for those coming from a pandas background, we can kind of quickly do that this with our Koalas syntax as it is exactly the same as pandas, albeit we’re able to do these aggregations over the entire 18 gigabyte dataset.

In particular, what we do here is group by class column, we’re going to sum up the individual lines for each group, and then get a count of the instances in each group. Note here that we are providing as index=false to reset the index. We can rename columns by using classic pandas functionality that you would do for .columns and passing in a list of the new column names.

We then create a column which calculates the average number of animal or plant counts per each distinct recording of each class. So what this is giving us is, again, aggregated over the entire dataset, we can get the number of distinct recordings, the average count per recording. So what we see here is that we have, well, birds in particular, so Aves makeover, well, nearly 15 million, of the distinct recordings that we have. And then on average, we see that each of these recordings or observations has around nine birds per recording. We see that flowering plants are the next most frequently occurring off the observations within our dataset. What if we want to get a distribution of the average counts per recording? What we can do is utilize the plotting functionality of Koalas. So here what we’ll be doing is using the .hist function, calling this on our column from above.

And what we see is the vast majority of classes have average count much below 1000. So we see this spike, but we also see these huge outliers as well, whereby you have a huge number of counts.

So the counts of the actual plants or animals per recording. So something is is heavily skewing our recordings here. So let’s have a deeper look into that.

What we can do is, again, sort our values by average count per recording, and we can use this style background gradient, again, to highlight where we have particularly high counts, or particularly high and distinct recordings. One thing to note is that we have this Cyanophyceae, which is a group of photosynthetic bacteria. And we see that we have nearly seven million instances per recording. I guess this is kind of understandable given the amount of microscopic algae that you could have in a given area.

So let’s move on to doing a bit of an analysis of Koalas numbers in Australia. So the rationale behind this is that in Australia in recent years, Koalas populations have unfortunately been declining due to effects from habitat destruction, bushfires, very notably within the last, well, till the end of last year, domestic dog attacks, road accidents. So what we wanna do is analyze our data itself to see if our dataset confirms such reports.

So what we’re gonna do initially is filter down to the genius that we’ve explored from above, so Phascolarctos, and again calling a head on this. So we see that we have our Koalas DataFrame with our kingdom, phylum, class, order, family, genus, species. We see that we have individual counts. So for this recording, how many Koalas were present? So there’s one in this area, three, and we’re also provided with the latitude and longitude, where that was, so the precise location, the event day, the day, the month, the year.

And what we see when we do this is that we have 21,919 instances of individual Koalas recordings.

Notably, that this is after we have removed any null values, so individual counts. So specifically, what we want is where we have at least an input for this individual count, and also for our month and day, so natural recording the date.

So let’s check our date range, and what we can do with Koalas is, again, grab the event date time on the event date, get the min and max, and this confirms that we have from the first of January 2010 to the first of May 2020.

How many observations then do we have for each province? So what we can do is do a value counts for the recordings on StateProvince, and we see that New South Wales has the majority of observations of Koalas, with Queensland and South Victoria second and third. We see that the Capital Territory and the Northern Territory have a very minimal instances of Koalas recordings.

So how many Koalas are typically in each observation? So one quick way of doing this is to, again, use our histogram functionality. So let’s look at individual count and call .hist. And again, what we see is that there’s a huge spike indicating that in general, our observations, we have fairly low numbers of Koalas per observation, but the fact that we have observations over 50,000 is indicating that there’s something a bit unusual here. Could we have 50,000 or over 50,000 Koalas in a given area?

One would doubt so.

So there are a tiny number of instances where there are a huge number of Koalas recorded. We’re going to treat these as outliers and remove any observations over 100 Koalas for a given single recording. To do so, we’ll filter our individual count to be less than 100, and again, calling .hist to see if things look right. Again, we still have a pretty skewed distribution, but for the most part, we see that for most recordings, we have less than 20 Koalas per observation.

To get an idea of the overall population of Koalas in Australia since 2010, let’s compute the number of occurrences for Koalas recorded, along with the total number of Koalas counted. And what we wanna do is get the average number of Koalas per recorded sighting. So for each observation, how many Koalas do we have in a given observation per month? To do so, we’re going to follow the same kind of pattern of aggregations that we did above, so getting a group-by year and month and we’re going to sum up the individual counts.

So getting the sum and how many observations we have. We’re then going to join these two Koalas DataFrames together using our .merge merge function. We’re gonna join on year and month. We’re going to rename our columns to have total count and number of recordings, and then what we want to do is create an additional column, which is average count for. So for each month, what we want to see is the total count divided by the number of recordings to give us the average kind of per given month.

Note that we’re calling sort values here. One thing to call out is that because under the hood we are utilizing Spark here, sort values is going to trigger a Spark job, you are going to be shuffling your data around your cluster. So do be conscious of calling .sort values.

Sorting ascending false for the most recent 12 months, we see that we have from May of this year to June of last year, we have the number of recordings in each month, the total count of Koalas and then the average count per each month. But let’s actually visualize this to see if Koalas numbers are indeed declining over time.

What we do is plot Koalas numbers over time. And to do so, what we’re gonna do is, firstly, use some SQL to create an aggregation, so concatenating the year and month columns to create a year month column.

And then what we’re gonna do is convert our year month column to a date time format. So we can use Koalas to date time functionality to grab our year month column and convert it to a date time.

We actually need to do this, so we can plot this, and so it corresponds to time.

So sorting our values by year month, again, we’re going to do this inplace.

And what we now do for plotting is that we would like to pass these into a matplotlib.

To do so, what we’re gonna do is call .to numpy. So we would like to create numpy arrays off the actual columns itself. Something to call out here is that we should only employ this if we know the resulting numpy array will be small. In this instance, we’ll be collecting all the data to the drivers memory of our cluster. So do be conscious of the size of this data. So we’re gonna grab our x axis values, our y values, whereby we have total kind and number of recordings, and what we want to do is plot monthly total counts and the number of monthly recordings for Koalas from 2010 until May 2020.

We do so utilizing matplotlib and also employing seaborn, which is a wrapper on top of matplotlib to insert some style preferences here. So we plot two axes, one, which is the total count of Koalas over time and the other being the number of recordings. So what we can see, something that notably sticks out, is that we have these huge spikes towards the end of 2014, towards the end of 2015, and towards the end of 2016. In particular, we see that the total count is huge for these and the number of recordings is also huge. One thing to also note is that we do have something that looks like a sloping of Koalas numbers over time towards the tail end of 2019 going into 2020. A couple of factors this could point towards, especially the awful bushfires that happened towards the end of last year.

One thing that we can do to get something off a truer picture of Koalas numbers over time, is to basically get an average number of Koalas per recording for each month between 2010 and May of 2020.

So what we’re gonna do is plot our monthly average Koala count over time. So what we’re doing here is plotting the average number of Koalas per occurrence and seeing that we have this huge spike towards the end of 2015, whereby there was a huge number of recordings but a smaller total count.

And in terms of the downslope that we see here, we see that there is, a bit of an uptake actually, so it doesn’t look like the numbers are severely declining. The key thing to point out here is the ability that we’re able to utilize our full dataset to explore the underlying data, but doing so using a pandas-like syntax. So if you’re coming from a pandas background, being able to seamlessly transition to working with larger data, albeit using a syntax that you’re very comfortable and familiar with. For our last section, let’s look into parallelizing and model training with Koalas. So in particular, for this what we’re gonna be utilizing is Facebook’s open source time series forecasting library called Prophet. It’s superpowerful and popular package, which is pretty intuitive to use. One thing to call out here is that our aim is not necessarily to to fit a highly accurate forecasting model, but more so demonstrate how we have the functionality and the flexibility to train many models in parallel utilizing Koalas. If we wanted to do this with pandas, we’re going to be inherently limited to doing so in a sequential nature.

So what we do is first filter to the classes that we would like to forecast for. So we’re gonna take some popular and well known animals that are prevalent in Australia. So we’re going to take Koalas, which is Phascolarctos, Kangaroos, Macropus, Cockatoos, which are birds, Cacatua, and emus, also a larger bird called Dromaius. So these are their genius names. We’re going to then apply a filter to our original Koalas DataFrame. We’re going to apply some aggregations to get the counts and total number of observations for each of these classes, renaming our columns as we did previously, and then concatenating the year and month columns, just as we had before, being able to issue and query and create that using our SQL functionality. We then convert our year month column to date time such that we’ll be able to plot and then also pass in to Prophet to be able to then forecast on.

And before we actually come to that, so what I want to discuss is how this is actually working under the hood. So what we’ll be employing is the groupby.apply method that Koalas brings to the table. So the kind of canonical groupby-apply that you have with pandas. However, what we’re going to be doing is using this with Koalas. What we wanna demonstrate is that we can apply an arbitrate Python function to a group of our data.

In this instance, what we’ll be using is our forecast function, which we’ll define below. And which will fit a Prophet model, log parameters, and plots to MLflow. And then what we want to do is we’re actually, we’re passing in a Koalas DataFrame, we have a Koalas DataFrame returned. However, note that what we can do is return a DataFrame which can have a different number of rows and columns compared to the input. Under the hood, what’s happening is that we’re utilizing Sparks pandas UDF functionality when implementing this groupby-apply method. With this, what we get is the ability to parallelize the actual use of this function. So as such, whenever we use the pandas UDF internally, each group is working on a panda’s DataFrame. So we can utilise pandas API’s within our given function that we’re applying.

One thing to call out is that when calling groupby-apply, Koalas executes the function once for a small sample to infer the types. This can potentially be expensive. So for example, where a dataset is created after aggregations or sorting. To avoid this, what we want to do is actually specify our return type. So what we can do is provide return type hint in the function being applied, which we’ll see below.

When we’re actually forecasting for each of the individual genus, so each of our individual animals, we want to produce a number of different plots. So we want to be able to see forecasts for each of these different animals. For this, what we’ll do is utilize MLflow and log the actual plot as artifacts.

So Mlflow, if we do navigate to the MLflow documentation, is an open source platform for managing end-to-end machine learning lifecycle. We can track experiments, and in Databricks, it’s basically baked into our workspace, whereby we can log to a given experiment location, set up an experiment location and log different runs.

In order to firstly set this up, we’re going to import Mlflow, and we’re then going to import the tracking MLflow client. We’re going to specify our MLflow experiment.

So this is just under a given path that I have in dbfs.

I’m providing my experiment name, which is going to be koalassais20exp, so experiment. I’m going to set this as my MLflow experiment location, and I’m going to then grab the experiment ID to then track.

I now define the function that I want to apply to a given group of my Koalas DataFrame. And under the hood, what’s going to happen is that this function is being applied to a given group in parallel. So importing Prophet, importing NumPy, which we’ll also use, and then also importing matplotlib.

Note that we’re providing this type int, a return type int, I should say, in which we have specified the types of the Koalas DataFrame that should come out as a result. Note that what we’re working on internally here is a panda’s DataFrame, though. And this is underpinned by the fact that this is a pandas UDF that we’re applying to each individual group. So within this, you can think of working on a given panda’s DataFrame, whereby we’re grabbing our genus name. We’re going to grab our year month as our date column, we’re going to grab our total count as our y count. So the thing that we want to predict. So going forth, what is the total count off of these animals per month? And what we then do is open up this MLflow code block. So you want to log each run, so a run being fitting a model per genius type, and then feeding that to Mlflow. And what we can do is pass in our experiment ID from above, we’re gonna give this a run name. So we’re gonna pass in our genus name, call this Prophet forecast, and we’re going to define some parameters that we want to, number one, record to MLflow, but then also what we’re putting in as arguments to our Prophet function. So what we’re going to do, track our daily seasonality and Boolean, our weekly seasonality and our yearly seasonality, pass these in as parameters that we would like to log to MLflow. So we’re going to log genus, we’re going to log, like I mentioned, daily seasonality, weekly seasonality, yearly seasonality, we’re going to define our Prophet model, we’re gonna fit our Prophet model to our DataFrame. So our panda’s DataFrame that we have above.

We’re then going to make future forecasts. So we want to forecast for three years into the future at of frequency of months. And we will have this forecast, which will contain our forecasts along with upper and lower bounds. And one other thing that we would like to get with this is a plot of both our forecasts along with our components. So basically, looking at the seasonalities or any trends that we might have there? To do so and capture these plots with MLflow, what we can do is save these figures out to a specified path, and we then log artifacts, given a forecast path. So we’re logging our forecast path and also our components path that we do in a similar manner. We then have a return Dataframe that we would like to have. So this, in essence, again, we’re working on this as if it’s a panda’s DataFrame. Returning this panda’s DataFrame, which when it’s aggregated across all of our data, the return from the entire groupby-apply functionality will be a Koalas DataFrame. To actually then employ this, what we can do is call our grouped Koalas DataFrame from above. So what we have from here, we have our genus, our year, month, number of recordings, total count, year month. We’re gonna group by genus.

We’re going to then apply our forecast function.

When we run this, note that we do not kick off any Spark jobs under the hood. This will only happen when we call an action on our full dataset. So to do this, what we can do is compute the length of the resulting DataFrame.

Once we run this, what we will get is a resulting Koalas DataFrame, which will be this forecast df.

And with this, what we will have are our runs that have been logged to MLflow. We can query these runs programmatically by calling search runs. So MLflow .search runs. Let’s have a look at Cockatoos, so Cockatoos, and what we see is we have a given run ID. We have passed in our, well, we can query it using our search functionality passing in our test genius, and we’re gonna sort by start time to get the most recent run.

And what if we want to call back in what we’ve previously logged though, we can pass in this given artifact url. So each individual run will have a specific artifact url.

We can call this in to grab our forecast plot for that given run. So that’s calling and reading our forecast for cockatoos, and what we can see is that, I mean, it’s not a great performing forecast. We haven’t measured in terms of of metrics, but the thing that we want to demonstrate here is the ability to create these models in parallel. As mentioned, to do this in pandas, it would be implemented in a sequential nature. With this, what we’re able to do is parallelize out the actual model training off each individual model using our groupby-apply functionality and do so and using Koalas on the underlying power of the distributed nature of Spark.

Koalas – On the roadmap

So in terms of the upcoming roadmap for Koalas, in terms of upcoming releases, in for June/July 2020 is to see a 1.0 release for Koalas, which will support the new Spark 3.0. And from Databricks perspective, coming up in July/August 2020, we’ll see a pre-install Koalas 1.x in Databricks Runtime and ML Runtime version 7.0. So how can you get started with Koalas?

Koalas – Getting started

So it’s super simple in terms of either PIP installing Koalas or conda installing. And some really useful resources come in the docs. There’s some really good examples of 10 minutes to Koalas, which will take you through all the basics of Koalas along with some gotchas, as well as the GitHub, where you can see at github.com/databricks/koalas.

So thanks very much. Hopefully, you can see the benefits that Koalas can bring, especially in terms of bringing over existing pandas code and being able to reap the benefits that you can get.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Niall Turbitt


Niall Turbitt is a Senior Data Scientist on the Machine Learning Practice team at Databricks. Working with Databricks customers, he builds and deploys machine learning solutions, as well as delivers training classes focused on machine learning with Spark. He received his MS in Statistics from University College Dublin and has previous experience building scalable data science solutions across a range of domains, from e-commerce to supply chain and logistics.