https://github.com/bartosz25/spark-playground/tree/master/spark-3.2.0-features/pandas_on_pyspark
Project Zen is an initiative to make PySpark more Pythonic and facilitate the Python programming experience. Apache Spark 3.2.0 made a next step in this direction by bringing Pandas to the API!
Data Engineering Design Patterns
Looking for a book that defines and solves most common data engineering problems? I'm currently writing
one on that topic and the first chapters are already available in π
Early Release on the O'Reilly platform
I also help solve your data engineering problems π contact@waitingforcode.com π©
Why?
Everything started in 2019 when Databricks open sourced Koalas, a project integrating Pandas API into PySpark. Since then, the project adoption has increased and the community has started to think about integrating it directly to PySpark to address some of the well known PySpark issues at the same time. Below you can find an excerpt of Hyukjin Kwon's message explaining some of PySpark problems for Python users:
Fourthly, PySpark is still not Pythonic enough. For example, I hear complaints such as "why does PySpark follow pascalCase?" or "PySpark APIs are difficult to learn", and APIs are very difficult to change in Spark (as I emphasized above). This set of Koalas APIs will be able to address these concerns in PySpark. Lastly, I really think PySpark needs its native plotting features. As I emphasized before with elaboration, I do think this is an important feature missing in PySpark that users need. I do think Koalas completes what PySpark is currently missing.
Does it mean PySpark will become a data science project because of this Pandas API integration? Not really. PySpark DataFrame still exists and probably will remain the main choice for data engineering workloads. But thanks to the Pandas API integration, scaling any data science pipelines should be easier. Enough talking, let's see now some technical points!
Koalas to PySpark aka Pandas on PySpark
In the umbrella ticket for moving Koalas to PySpark you will find these keywords: port/integrate, mypy and unit test. Let me explain them shortly to better understand the whole migration process:
- port/integrate - the idea was to bring Koalas code to PySpark with the operations like copying files and packages renaming (e.g. databricks.koalas to pyspark.pandas). This effort was made in different subtatsks by Xinrong Meng and Haejoon Lee.
- mypy - mypy is a Python code analyzer. To facilitate the code migration by not introducing too many changes at once that might be difficult to review, mypy checks on PySpark Pandas modules were explicitly disabled in the beginning of the process. It's only a few subtasks later that Takuya Ueshin reenabled the checks and fixed the linter errors.
- unit tests - also unit tests required some extra effort. Unit tests in Koalas were defined with pytest framework whereas PySpark's tests code base uses unittests library. They required then a separate and iterative migration process.
Besides these 3, there were also some documentation updates and some text rename from Koalas to pandas-on-Spark made by Hyukjin Kwon. Let's see how all this effort translates to the code:
from pyspark import pandas as pyspark_pandas import pandas dataframe_data = {"city": ["Paris", "Warsaw", "Berlin"], "country": ["France", "Poland", "Germany"]} pyspark_dataframe = pyspark_pandas.DataFrame(dataframe_data) print(pyspark_dataframe) pandas_dataframe = pandas.DataFrame(dataframe_data) print(pandas_dataframe) """ city country 0 Paris France 1 Warsaw Poland 2 Berlin Germany city country 0 Paris France 1 Warsaw Poland 2 Berlin Germany """
What's else?
To start, some compatibility changes related to Pandas 1.3, mainly for the group by and union operations. The new PySpark release also includes some type improvements and new functions for Pandas categorical type. This type represents a finite set of values that can be ordered, like in the following snippet including the added functions:
from pyspark import pandas as pyspark_pandas numbers = pyspark_pandas.CategoricalIndex([6, 2, 1, 4, 3], categories=[2, 4, 6], ordered=False) # as_ordered won't order the categories! It's simply marks them as being ordered! # these as_* functions are btw 2 new ones added after Pandas-on-PySpark release ordered_numbers = numbers.as_ordered() print(ordered_numbers) print(numbers) """ Both prints are the same, only the oredered property changes: ``` CategoricalIndex([6, 2, nan, 4, nan], categories=[2, 4, 6], ordered=True, dtype='category') CategoricalIndex([6, 2, nan, 4, nan], categories=[2, 4, 6], ordered=False, dtype='category') ``` """ # But you can change the order with another new function added in PySpark reordered_numbers = ordered_numbers.reorder_categories([6, 4, 2]) print(reordered_numbers) """ This time the values remain the same but the `categories` don't: ``` CategoricalIndex([6, 2, nan, 4, nan], categories=[6, 4, 2], ordered=True, dtype='category') ``` Of course, you can also notice some `nan`s in the `DataFrame`. It's the placeholder for any value not defined in the `categories` """
So, do not be surprised if while visiting Koalas repository you'll see it in read-only mode. That being said, the existing pipelines will continue to work with old versions! But if you have any new pipelines to write, using Pandas on PySpark seems to be a more appropriate choice. And yet, migrating old ones should be the matter of changing the packages import!