How to manage options in PySpark more efficiently
Asked Answered
B

4

6

Let us consider following pySpark code

my_df = (spark.read.format("csv")
                     .option("header","true")
                     .option("inferSchema", "true")
                     .load(my_data_path))

This is a relatively small code, but sometimes we have codes with many options, where passing string options causes typos frequently. Also we don't get any suggestions from our code editors. As a workaround I am thinking to create a named tuple (or a custom class) to have all the options I need. For example,

from collections import namedtuple
allOptions = namedtuple("allOptions", "csvFormat header inferSchema")
sparkOptions = allOptions("csv", "header", "inferSchema")
my_df = (spark.read.format(sparkOptions.csvFormat)
                     .option(sparkOptions.header,"true")
                     .option(sparkOptions.inferSchema, "true")
                     .load(my_data_path))

I am wondering if there is downsides of this approach or if there is any better and standard approach used by the other pySpark developers.

Burnie answered 28/3, 2022 at 4:27 Comment(0)
H
0

If you use .csv function to read the file, options are named arguments, thus it throws the TypeError. Also, on VS Code with Python plugin, the options would autocomplete.

df = spark.read.csv(my_data_path,
                    header=True,
                    inferSchema=True)

If I run with a typo, it throws the error.

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/var/folders/tv/32xjg80x6pb_9t4909z8_hh00000gn/T/ipykernel_3636/4060466279.py in <module>
----> 1 df = spark.read.csv('test.csv', inferSchemaa=True, header=True)

TypeError: csv() got an unexpected keyword argument 'inferSchemaa'

On VS Code, options are suggested in autocomplete.

enter image description here

Hearthside answered 2/4, 2022 at 17:16 Comment(2)
Thanks @Emma. You are correct when using using a specific reader like csv as you mentioned in your example. However, my question is for more generic spark.read.format, where we pass the reader and then other options. I have to use this (as I used in my example) API to read and write as my program will decide the format to read/write at runtime.Burnie
I see. sorry, it wasn't clear that you have to use the load function. option function is not fail proof, it takes any string key, so namedtuple would make it better but it is not completely giving you the correct error. I think you can either write your own wrapper with named arg or you could try using specific readers by having if-else/match on format you receive from the caller.Hearthside
B
0

For that and many other reasons, in production level projects, we used to write a project to wrap spark.

So developers not allowed to deal with spark directly.

In such project we can :

  • Abstract options using enumerations and inheritance to avoid typos and incompatibles options.
  • Set default options for each data format and developers can overwrite them if needed, to reduce the amount of code written by the developer
  • Set and defines any repetitive code like frequently used data sources, default output data format, etc.
Bosky answered 5/4, 2022 at 11:13 Comment(0)
S
0

I think the best approach is to make a wrapper(s) with some default values and kwargs like this

def csv(path, inferSchema=True, header=True, options={}):
    return hdfs(path, 'csv', {'inferSchema': inferSchema, 'header': header, **options})

def parquet(path, options={}):
    return hdfs(path, 'parquet', {**options})

def hdfs(path, format, options={}):
    return (spark
        .read
        .format(format)
        .options(**options)
        .load(f'hdfs://.../{path}')
    )
Sieve answered 7/4, 2022 at 4:40 Comment(0)
N
0

A simple, albeit a bit less powerful alternative, is to:

  • Define repeated options in a dictionary
  • Pass the unpacked options to spark.read.options() or spark.write.options() whenever you need to reuse them.

For example:

common_options = {
    'user': 'my_db_user',
    'password': 'my_db_password'
    # whatever other option you require.
}

metrics_df = spark.read.format("csv") \
        .options(**common_options) \
        .load()
Nationalize answered 27/6 at 19:22 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.