PySpark and story

Versions: Apache Spark 3.3.0

The topic of this blog post is one of my first big surprises while I was learning the debugging of PySpark jobs. Usually I'm running the code locally in debug mode and the defined breakpoints help me understand what happens. That time, it was different!

That time adding debug breakpoints wasn't enough. Instead, I had to open python/lib/ and add some print statements to get the execution flow right. But wait a minute, can Python evaluate the code included in a zip file?

PEP 273

Yes, Python can interpret the code from zip files and it's not a new thing. It was implemented in Python 2.3 as a part of the PEP 273 - Import Modules from Zip Archives.

The PEP lets you import Python files from zip archives exactly as they were defined in the classical subdirectory tree. The specification states that pretty clearly:

Currently, sys.path is a list of directory names as strings. If this PEP is implemented, an item of sys.path can be a string naming a zip file archive. The zip archive can contain a subdirectory structure to support package imports. The zip archive satisfies imports exactly as a subdirectory would.

The implementation is in C code in the Python core and works on all supported Python platforms.

Any files may be present in the zip archive, but only files *.py and *.py[co] are available for import. Zip import of dynamic modules (*.pyd, *.so) is disallowed.

Just as sys.path currently has default directory names, a default zip archive name is added too. Otherwise there is no way to import all Python library files from an archive.

It explains how Python can work with modules packaged in an archive but doesn't explain when PySpark imports it. After all, in PySpark jobs we're importing the files from the subdirectory.

PYTHONPATH in Apache Spark

As a quick reminder, the PYTHONPATH is an environment variable where you will find all extra directories where Python can find the modules and packages used in the program. PySpark defines PYTHONPATH when it creates worker daemons:

private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
  extends Logging { self =>
// ...
  private val pythonPath = PythonUtils.mergePythonPaths(
    envVars.getOrElse("PYTHONPATH", ""),
    sys.env.getOrElse("PYTHONPATH", ""))

// ...

private[spark] object PythonUtils {
  val PY4J_ZIP_NAME = ""

  /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
  def sparkPythonPath: String = {
    val pythonPath = new ArrayBuffer[String]
    for (sparkHome <- sys.env.get("SPARK_HOME")) {
      pythonPath += Seq(sparkHome, "python", "lib", "").mkString(File.separator)
      pythonPath +=
        Seq(sparkHome, "python", "lib", PY4J_ZIP_NAME).mkString(File.separator)
    pythonPath ++= SparkContext.jarOfObject(this)

As a result, when you try to break the daemon creation as I did in my example, you should see the in the error message:

Bad data in pyspark.daemon's standard output. Invalid port number:
  1181314660 (0x46696e64)
Python command to execute the daemon was:
  python3 -m pyspark.daemon
Check that you don't have any unexpected modules or libraries in
Also, check if you have a module in your python path,
or in your python installation, that is printing to standard output
    at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:244)

You can try to break the daemon creation by adding a print to the localted in

# ...
from pyspark.worker import main as worker_main
from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer


def compute_real_exit_code(exit_code):
    # SystemExit's code can be integer or string, but os._exit only accepts integers
    if isinstance(exit_code, numbers.Integral):
        return exit_code
        return 1
# ... origin

Wow! That's how fun finding the origin of in the project was! The reason story dates back to 2014 when the community has been working only with Apache Spark 1. At that time, the JDK version got upgraded from 6 to 7 and Python users started to complain about the runtime errors despite the PySpark JAR installed in the cluster:

PySpark is not working. It fails because zipimport not able to import assembly jar because that contain more than 65536 files.

The reason for that error was the packaging format for the archives with more than 65 536 files. JDK 6 was using the ZIP format whereas the JDK 7 was using zip64 and the Python interpreter couldn't load it correctly. The workaround was found in SPARK-6869. It consisted of compressing the PySpark project and adding it as a part of the --py-files parameter of the spark-submit.

But it was a workaround. In the real fix, Lianhui Wang added a step to build PySpark as a zip archive and the code you have seen before to include the archive in the PYTHONPATH.

That's the little story behind the file present in the PySpark module and the explanation why in my debug sessions I had to add the debug statements directly to the .py files of the package.