how to pass python package to spark job and invoke main file from package with arguments
Asked Answered
Q

6

14

I have my python code with a structure like,

Project1
--src
----util.py
----job1.py
----job2.py
--config
----config1.json
----config2.json

I want to run this job1 in spark but these I just cannot invoke job1.py because its dependent on other files like util.py and job2.py and config files and thus I need to pass complete package as an input to spark.

I tried running spark-submit job1.py but it fails with dependencies like job2.py and util.py because they are not available to executors.

Based on spark documentation, I see --files is an option to do this but it works by passing all filenames to spark-submit which looks difficult if number of files in codebase in future.

Another option I see is passing code zip file with --archive option but still it fails because not able to reference files in zip.

So Can anyone suggest any other way to run such codebase in spark?

Quarrier answered 20/12, 2017 at 11:57 Comment(1)
Did you able submit this app to spark cluster ?Dianetics
T
10

Specific to your question, you need to use --py-files to include python files that should be made available on the PYTHONPATH.

I just ran into a similar problem where I want to run a modules main function from a module inside an egg file.

The wrapper code below can be used to run main for any module via spark-submit. For this to work you need to drop it into a python file using the package and module name as the filename. The filename is then used inside the wrapper to identify which module to run. This makes for a more natural means of executing packaged modules without needing to add extra arguments (which can get messy).

Here's the script:

"""
Wrapper script to use when running Python packages via egg file through spark-submit.

Rename this script to the fully qualified package and module name you want to run.
The module should provide a ``main`` function.

Pass any additional arguments to the script.

Usage:

  spark-submit --py-files <LIST-OF-EGGS> <PACKAGE>.<MODULE>.py <MODULE_ARGS>
"""
import os
import importlib


def main():
    filename = os.path.basename(__file__)
    module = os.path.splitext(filename)[0]
    module = importlib.import_module(module)
    module.main()


if __name__ == '__main__':
    main()

You won't need to modify any of this code. It's all dynamic and driven from the filename.

As an example, if you drop this into mypackage.mymodule.py and use spark-submit to run it, then the wrapper will import mypackage.mymodule and run main() on that module. All command line arguments are left intact, and will be naturally picked up by the module being executed.

You will need to include any egg files and other supporting files in the command. Here's an example:

spark-submit --py-files mypackage.egg mypackage.mymodule.py --module-arg1 value1
Torbert answered 28/8, 2018 at 22:12 Comment(0)
V
7

There a few basic steps:

  • Create a Python package.
  • Either build egg file or create a simple zip archive.
  • Add package as a dependency using --py-files / pyFiles.
  • Create a thin main.py which invokes functions from the package and submit it to Spark cluster.
Varicella answered 20/12, 2017 at 13:25 Comment(1)
how to refer to a function from a zip? I understand that it has to be imported in the progra to be used later.Ellingson
A
4

A convenient solution in general IMO is using setuptools. It will come into handy when your project has too many dependent packages (Python file).

  • First, put an empty __init__ file on each directory which has the code file to be imported. This will help the package to be visible to the import statement.
  • After installing setuptools, create a simple set_up.py file (sample here). Remember to place it at the outermost directory which is your src folder.

    from setuptools import setup, find_packages
    setup(
        name = "name_of_lib",
        version = "0.0.1",
        author = "your name",
        packages = find_packages()
    )
    

The find_packages config will collect all your packages within that folder recursively.

  • Run python set_up.py bdist_egg, and finally check out the .egg file in the dist folder.
  • Submit your job using either --py-files oraddPyFile of SparkContext.

And it works like a charm!

Akerboom answered 9/2, 2020 at 15:32 Comment(1)
__init__.py to the importable directories was very important for this to work. I just added that empty file in every directory and subdirectory and it worked like a charmGittens
T
1

Add this to your PYTHONPATH environment variable: /path-to-your-spark-directory/python. Also your path variable should have location of spark/bin

Telegu answered 20/12, 2017 at 12:42 Comment(0)
S
1

If you want to have a bit more flexibility, i.e run files, modules or even a script specified on the command line, you can use something like the following launcher script:

launcher.py

import runpy
import sys
from argparse import ArgumentParser


def split_passthrough_args():
    args = sys.argv[1:]
    try:
        sep = args.index('--')
        return args[:sep], args[sep + 1:]
    except ValueError:
        return args, []


def main():
    parser = ArgumentParser(description='Launch a python module, file or script')
    source = parser.add_mutually_exclusive_group(required=True)
    source.add_argument('-m', type=str, help='Module to run', dest='module')
    source.add_argument('-f', type=str, help="File to run", dest='file')
    source.add_argument('-c', type=str, help='Script to run', dest='script')
    parser.add_argument('--', nargs='*', help='Arguments', dest="arg")
    self_args, child_args = split_passthrough_args()
    args = parser.parse_args(self_args)
    sys.argv = [sys.argv[0], *child_args]

    if args.file:
        runpy.run_path(args.file, {}, "__main__")
    elif args.module:
        runpy.run_module(f'{args.module}.__main__', {}, "__main__")
    else:
        runpy._run_code(args.script, {}, {}, "__main__")


if __name__ == "__main__":
    main()

It tries to emulate the Python interpreter's behavior, so when you have a package with the following module hierarchy

mypackage
  mymodule
    __init__.py
    __main__.py

where __main__.py contains the following:

import sys
if __name__ == "__main__":
   print(f"Hello {sys.argv[1]}!")

which you built and packaged as mypackage.whl; you can run it with

spark-submit --py-files mypackage.whl launcher.py -m mypackage.mymodule -- World

Supposing the package is preinstalled and available on /my/path/mypackage on the driver:

spark-submit launcher.py -f /my/path/mypackage/mymodule/__main__.py -- World

You could even submit a script:

spark-submit launcher.py -c "import sys; print(f'Hello {sys.argv[1]}')" -- World
Slalom answered 18/8, 2021 at 21:16 Comment(2)
Thanks for the answer! Question: would it be possible to have this launcher script packaged within the wheel/egg file itself and then to access it from within the wheel/package file. The reason I would prefer to have the launcher script within the package is because I'm not sure how a standalone script like this could be stored in a PyPI repository like Artifactory, but maybe there's a different way?Salacious
I don't thinks it can be packaged as Spark requires a top level py file as its positional argument AFAIK. However this script is general enough so it can be shipped with your Spark distribution if it is feasible for your use case. E.g if you build Spark as Docker image, you could add this file there.Slalom
R
0

I also had same issue while working on PySpark.

Here is how you can solve this issue: Assuming src is the python package, ZIP the complete src package into in a zip file let's say code.zip before that restructure job1.py.


Project1
--src
----util.py
----job1.py
----job2.py
---- __init__.py
--config
----config1.json
----config2.json

Since you want to run the job1.py and has dependencies from job2.py and utils.py let's structure job1.py.


from . import job1 as j1
from . import util as ut


def compute(spark: SparkSession, job_args: T.List[str]) -> None:
    # core data processing logic
    .....

    

Now let's write a driver.py which calls this compute function.

import argparse
import importlib
from pyspark.sql import SparkSession

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Spark application to execute '
                                                 'other Spark apps.')

    parser.add_argument(
        '--module', action='store', dest='module',
        required=True,
        help='Name of the Spark module with a compute entry-point'
    )

    (args, job_args) = parser.parse_known_args()

    app_name = args.module.split('.')[-1]
    spark = SparkSession.builder \
        .appName(app_name) \
        .enableHiveSupport() \
        .getOrCreate()

    spark.conf.set('spark.sql.session.timeZone', 'UTC')

    job_module_name = args.module
    job_module = importlib.import_module(job_module_name)

    job_module.compute(spark, job_args)

Use following command to run the app:

   spark-submit --master yarn --py-files ./code.zip driver.py --module src.job1

So now you are all set to run your spark app.

Riobard answered 11/7, 2022 at 14:53 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.