Since Spark 3.2, a new library called Pandas on Spark API was integrated into PySpark. We will see why and when it should be used.
Pandas is a very powerful library that all data scientists know, but Pandas code can only run on a single machine, so it does not scale out. Consequently, if you are processing a large dataset with Pandas it will be very slow, and you will most likely get an OOM error!
Usually, that is when Spark comes into play. Spark is the de facto tool for large-scale data processing. It divides a large dataset into small chunks, called partitions, and distributes them across multiple workers that can do the work in parallel allowing large-scale data processing.
PySpark (the Spark API for Python) has a module called Spark SQL which provides a programming abstraction called DataFrame (similar in some ways to Pandas DataFrames).
Nevertheless, PySpark DataFrames have two main drawbacks:
- Existing Pandas code cannot be reused because Pandas is not compatible with PySpark DataFrames.
- The PySpark syntax is very different from the Pandas syntax (see below) making it harder to learn for data scientists.
As a result, PySpark DataFrames are not really appropriate if you need to scale out an existing Pandas codebase.
Scale-out with the Pandas API on Spark
On April 24, 2019, Databricks announced a new open-source project named Koalas which aimed at providing the Pandas API on top of Spark. This library gathered momentum and was officially merged into PySpark in Spark 3.2 release (October 2021) and named the Pandas API on Spark.
The Pandas API on Spark has the same syntax as Pandas but it uses a PySpark DataFrame under the hood. It means that code written with Pandas API on Spark can be run on a large cluster of nodes where Spark is set up (unlike Pandas) to handle large datasets.
Concretely, it allows you to do the following:
from pandas import read_csv
from pyspark.pandas import read_csv
psdf = read_csv("data.csv") # pandas-on-Spark DataFrame
psdf.columns = ["x", "y", "z"]
psdf["x2"] = psdf.x * psdf.x
Basically, you can do almost everything you used to do with Pandas (+83% of Pandas functions are available in pyspark.pandas). Here is the official documentation with the list of all functions: link.
What if I cannot find a Pandas function on the Pandas API on Spark?
Since a pandas-on-Spark dataframe uses a PySpark dataframe under the hood, it can be converted from/to a PySpark dataframe. So if you cannot find the function that you need, you can still do the following:
- Transform the pandas-on-Spark dataframe into a PySpark dataframe
- Perform multiple transformations using PySpark API
- Convert the PySpark dataframe back into a pandas-on-Spark dataframe
>>> import pyspark.pandas as ps
>>> psdf = ps.range(10)
>>> sdf = psdf.to_spark().filter("id > 5")
How does it work?
Internal Frame to bridge the gap between PySpark and Pandas
When a pandas-on-Spark dataframe is created, an “Internal Frame” and a PySpark dataframe are also created.
The “Internal Frame” provides conversions between the pandas-on-Spark and the PySpark dataframes. It keeps metadata like the column mapping between the 2 dataframes or the index.
It allows pandas-on-Spark dataframe to support Pandas functionalities that are not supported by PySpark dataframes:
- Mutable syntax: so that you don’t have to create a new dataframe each time you want to modify it
- Sequential index: so that you can use Pandas functionalities based on an index (see below)
- Pandas dtypes
What is important to bear in mind is that the data is distributed across multiple machines (called workers) whereas in Pandas the data stays in a single machine.
Unlike PySpark dataframe, pandas-on-Spark dataframes replicate the indexing functionality of Pandas (thanks to the Internal Frame). As a reminder, index columns are used to access rows by loc/iloc indexers or to match corresponding rows for operations combining two DataFrames or Series (for example df1 + df2), and so on.
When you create a pandas-on-Spark dataframe you can set a column of the dataframe as the index column.
If you did not specify any index column, the default index will be used. 3 types of indexes can be set as default:
This is the index type used by default. It implements a sequence that increases one by one. This is very likely to move all the dataframe into a single node which will be slow and probably raise an OOM error. As a result, you should not use it when the dataset is large.
It implements a sequence that increases one by one in a distributed-manner. If your dataset is large and you need a sequential index, you should use this index type. Note that if more data are added to the data source after creating this index, then it does not guarantee the sequential index.
It implements a monotonically increasing sequence, but not consecutive (e.g. 1, 8, 12) and the values are not deterministic. So it does not replicate Pandas indexing at all. Performance-wise, this is the best index type but it cannot be used for computing operations between two dataframes. Indeed, to do that the 2 dataframes must have the same indexes and it is not guaranteed because of indeterministic index values.
Here is a summary of the different indexes and when to use them.
To learn more about indexing, you can refer to the official documentation: default-index-type
Here is great documentation about best practices: link
Below are the best practices that I have found the most useful when I used the Panda-on-Spark API during my project.
Avoid computation on a single partition
If you start using the Pandas-on-Spark API you will probably get this warning:
"Moving all data to a single partition, this can cause serious performance degradation"
This happens in several cases:
- Whenever an index is needed but there is no index column and default_index is set to sequential type. A very common example is when a PySpark dataframe is converted into a pandas-on-Spark dataframe using “to_pandas_on_spark()”
- When using a function (e.g. rank) which implementation uses PySpark’s Window without specifying partition specification. Note that this is clearly mentioned in the documentation.
Do not use the sequential default index
You should set the default index to distributed or distributed-sequence to avoid computation on a single partition.
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed-sequence')
Try to specify the index column whenever an index is required
Using an existing column as an index column is always faster than using any of the three default indexes because there is some overhead to create a default index.
When a PySpark dataframe is converted into a pandas-on-Spark dataframe you should specify the index column:
Keep in mind that the data is distributed
Since the data is distributed, some functions like “sort_values” require to move the data across all worker nodes. This is called “Shuffle” (see below) and it will slow down a lot the execution of your code. So you should try to use this kind of function as little as possible.
You can use the “spark.explain()” to show the Physical Plan and detect whether there is “Shuffle”.
The “Exchange” word means that “Shuffle” will occur.
Now, let’s see how we used the Pandas API on Spark in a recent project at Sicara.
First, we made a Proof Of Concept of a functionality. We worked on a small dataset and we used Pandas.
Once it was validated we had to scale out the functionality on a larger dataset. To do that we used the Pandas API on Spark (called Koalas at this time) to convert quickly our Pandas codebase.
We were working with the Google Cloud Platform (GCP) so we used Dataproc which provides a cluster with Spark already set up.
Our development workflow was as follow:
- A developer creates a Python script (called a job) using the Pandas API on Spark on his local machine where Spark is installed. Since the developer's machine is much smaller than the cluster, we are not working with the entire dataset at this step but only with a small subset
- The developer (or an orchestration tool) submits the Python script to the Dataproc cluster.
- The Python script is run in a distributed manner on the cluster. It uses the whole dataset located on the data lake.
To wrap up, I have made a summary of Pros and Cons based on my experience:
- The existing Pandas code base was quickly adapted to be run on Spark
- New pipelines were written easily thanks to the Pandas-like syntax
- Good documentation: all functions are well documented with examples (like Pandas)
- Don’t forget that you are working on large datasets:
When you are working on a small dataset with Pandas, execution time is not an issue. You can use lots of functions like “drop_duplicates” or “sort_values” without any noticeable impact. The Pandas API on Spark can give you the illusion that everything will work fine at scale but this is not the case. If your code triggers a lot of “Shuffle” it will run slowly even though it is distributed.
- Spark complexity is still there:
Even though you can write Pandas-like code, there are various tasks that require a good understanding of Spark like detecting “Shuffle”, tuning your cluster (number of workers, CPU, memory, ...), setting up Spark, etc.
- Lack of support:
Tutorials are very hard to find and there are very few questions already asked on StackOverflow
The Pandas API on Spark is a great tool if you need to scale out an existing Pandas codebase. It is also very useful for data analysis at scale since it provides basic statistical functions and plotting. Nevertheless, users will still need to understand the basics of Spark and some PySpark APIs. As a result, it has to be seen as a complementary tool that makes learning PySpark much easier.
Are you looking for Data Engineering Experts? Don't hesitate to contact us!