Golang Logo

Go/Golang Training

Unser Hands-On-Einstieg in die Entwicklung mit Go. Nächster Termin: 14.- 15.05. in Köln – jetzt buchen!
Zum Training 
3 pythons in isolated environments for PySpark
Data Engineering

A Case for Isolated Virtual Environments with PySpark

Lesezeit
10 ​​min

This blog post motivates the use of virtual environments with Python and then shows how they can be a handy tool when deploying PySpark jobs to managed clusters.

When developing and shipping Python applications, one challenge is dependency management. Unlike other languages such as C++, it is not possible to install Python packages in all kinds of different versions to a central location on the host machine and just link the correct ones to each and every application. Python also does not provide the option to create a “fat jar“ in order to ship a package which already contains all of its dependencies, like Java. Starting with the Python version itself, each application needs a specific Python installation and this installation has to provide the necessary package dependencies for the code to run as is expected. This becomes an issue, at the latest, when two different Python applications have conflicting requirements but are supposed to run on the same system.

python_environment.png
https://xkcd.com/1987/

To not end up in a situation like the above, virtual environments are a great way to encapsulate a Python application. What a virtual environment does is creating an installation directory that contains Python itself or a symlink to it, as well as all dependency packages that are required for a specific application. In a best-case scenario, this virtual environment is completely independent of any global installation of Python and therefore, all installed packages inside the virtual environment are isolated from other Python applications on the host system. This not only allows multiple applications and their specific dependencies to be installed side by side, it also helps to avoid accidental breaking changes caused by updating global libraries.

Being able to run multiple Python applications independently from each other is not just important on single-host systems. When it comes to distributed computing on large clusters, in order to run a Python application on each worker node, the same specific execution environment has to be made available throughout the cluster. In the case of Apache Spark, the official Python API – also known as PySpark – has immensely grown in popularity over the last years. Spark itself is written in Scala and therefore, the way Spark works is that each executor in the cluster is running a Java Virtual Machine. The illustration below shows the schematic architecture of a Spark Cluster, in this case managed by YARN. In order to run Python code on the executors, a Python subprocess is launched per executor which interprets the PySpark code and translates it into Spark RDD operations. While the PySpark API provides a large set of built-in functions, with more and more complex Spark applications being written in Python, there is also a growing need for user-defined functions also known as UDFs. Additionally, with the introduction of vectorized pandas_udfs in Spark 2.3, the performance gap to the Scala API has been mostly bridged, making it much more attractive to use custom Python code in a distributed manner. This is where the problem begins.

pyspark_yarn_architecture.png
Drawn after https://miro.medium.com/max/3868/1*BDfKR9VMg-E6twBBJEhC6g.png

Since one might want to use a Python UDF that relies on dependencies to other Python packages such as pandas or numpy, these dependencies have to be available on each worker node in the cluster. As many large clusters are shared between multiple teams and managed by yet another, it is not always possible to meet every developer’s needs with one common installation. Of course the most popular packages could come pre-installed, but this would limit the functionality of the cluster and we would quickly end up in the situation that motivated us to use virtual environments on our local system. Another option would be to grant each and every Data Scientist SSH access to the cluster, allowing them to install their own virtual environments. What could possibly go wrong?!

There are multiple solutions to this problem, one being introduced in this earlier blog post by Florian Wilhelm. He outlines a pragmatic solution that extracts Python wheel files into an HDFS directory, thereby creating an isolated environment. The files are then individually added to the SparkContext using the built-in sc.addFile function. This causes the files to be distributed on the cluster as new nodes are spawned. While this solution solves the problem, it comes with several drawbacks which I want to address here. First, creating such an environment can be quite a hassle, especially when dealing with many different dependencies and in turn their transitive dependencies. Some packages do not even offer wheel files via pip in which case a workaround is needed. For many Data Scientists, who might not be very familiar with the command line, this can present a significant hurdle. Environments may be copied from other projects, which might work for a while but then causes trouble down the line. These HDFS environments are also quite hard to update since there is no consistency check when packages are manually replaced. The second issue with this approach is that the extracted files are often tiny. This is anything but ideal when it comes to HDFS. There have been cases when Spark jobs took close to an hour to start because very large environments containing thousands of small files had to be distributed on launch.

I want to present a different approach that does not require the isolated environments to be created on HDFS. The spark-submit command offers an option to include an archive when launching a Spark job. The specified archive gets sent to the driver and executor nodes where it is automatically extracted. In combination with a tool called conda-pack, the same conda environment used locally for development can thus be used on the cluster to run the job. Keep reading to see how it works.

Creating a Virtual Environment with Conda

Conda is an open-source tool that combines extensive virtual environment functionalities and package management for all kinds of languages including Python. This flexibility and the fact that conda environments come with their own Python installation make it my virtual environment framework of choice. The approach that is proposed here will probably work similarly using virtualenv or pipenv and creating the archive manually.

I recommend installing miniconda, as it comes as a lean distribution of conda that does not contain any packages by default. Anaconda in comparison already includes the most popular packages but is also multiple GB in size. All of the packages we need can simply be installed using the conda package manager.

Once conda is installed, we can create a conda environment by defining an environment.yaml file.

Using this file we can simply run:

and the environment is created. Afterwards we activate the environment by running:

We can now go ahead and develop our own Python package or just write a simple PySpark job containing our custom code that we will want to run on the cluster. Once we are finished, in case of a proper Python package, we can install our package into the conda environment by running:

Creating an Environment Archive with conda-pack

Conda saves each environment in an isolated installation directory which is by default located in the user’s home directory. Under ~/miniconda3/envs/my_venv you can find a folder structure that looks a lot like the root directory on a Linux machine. In order to send the conda environment to the Spark nodes and to avoid sending countless small individual files, we pack the whole environment into an archive. For this we use a little tool called conda-pack which is a command line tool for creating relocatable conda environments.

The tool packs the whole conda environment directory into a tarball-gzip archive when running the simple command:

This can take a couple of minutes depending on the size of your environment. When it is done, you should see the environment.tar.gz file in your current directory.

Distributing the environment on the cluster

Assuming we have a PySpark script ready to go, we can now launch a Spark job and include our archive using spark-submit.

The interesting part here is the –archives option.

By appending #environment to the archive, we are generating an alias under which the content of the archive will be available on the Spark nodes. That means not only are we making sure our archive is being extracted on the cluster, we can even use our own Python installation on the cluster by referring to it in the PYSPARK_PYTHON variable. Since the Python version on the workers has to be identical to the Python version on the driver, this can prevent errors, especially in client mode.

And that’s it!

Conclusion

As extensive as the PySpark API is, sometimes it is not enough to just use built-in functionality. While the public cloud becomes more and more popular for Spark development and developers have more freedom to start up their own private clusters in the spirit of DevOps, many companies still have large on-premise clusters. Especially in these setups, it is important for developers to distribute their custom Python environments. Conda and conda-pack provide a great way to encapsulate PySpark applications and run them on managed Spark clusters without needing direct access to the nodes. The approach also works for interactive Spark sessions, i.e. when developing a Proof of Concept in Jupyter.

8 Kommentare

  1. Do you know if there is an alternative way to package relocatable dependencies?
    My company does not use conda to host python packages, so I cannot package the environment using conda-pack.
    The reason is that my company only host pip repo not conda channels.

    1. Conda actually also supports pip dependencies, so you could still use a conda environment and install your company’s pip dependencies in there. Another apporach would be to use virtualenv and just create a zip archive manually. I haven’t tried that myself though.

  2. I have opted to use a Docker based approach to mange dependencies.
    Thanks a lot for writing this up, this has been the best/cleanest approach I have found online after about two weeks of banging my head against the wall.

  3. The reason that I cannot use this approach is that my local machine is running a different OS than the Spark cluster.
    More context, I am trying to deploy a PySpark application to an EMR Spark cluster.
    I have tried two approaches, the first one Python package based which is discussed in this post. I failed at the step of packing an environment using conda.
    Another approach is to use Docker to capture all the dependencies.
    There the biggest challenge for me is to find where to place source dependency.
    The trick is to copy my source dependency to site-packages/.

  4. Hello, thanks for the great article. However, I am having some problems with this approach, and I’m not sure if my use case is supported with this approach. I am running Spark on a cluster with YARN, and my start.py entry point script attempts to import the libraries I have installed in my environment.tar.gz, but cannot find them. I made sure to put my imports after initializing the Spark context, but it still doesn’t work. Is this because Spark only unpacks the environment.tar.gz on the worker nodes, and not on the master, where I’m running my start.py?
    From looking at my local directory, I can see that my environment.tar.gz archive is not unzipped on the master node, so it would make sense that the driver python cannot locate my packages. I’ve also tried to update my PYSPARK_DRIVER_PYTHON to point to the conda python but that didn’t help either.
    Please let me know if I’m doing anything wrong here.

    1. Hi Amine, from my experience this approach should also work for importing packages on the driver node. I am importing my packages before creating the SparkSession.
      Unfortunately it’s hard for me to say what is going wrong in your case. But since the approach I’m describing is now also part of the official PySpark documentation, perhaps a look there will help you solve your problem. https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html

Hat dir der Beitrag gefallen?

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert