Pass additional arguments to foreachBatch in pyspark
Asked Answered
C

3

13

I am using foreachBatch in pyspark structured streaming to write each microbatch to SQL Server using JDBC. I need to use the same process for several tables, and I'd like to reuse the same writer function by adding an additional argument for table name, but I'm not sure how to pass the table name argument.

The example here is pretty helpful, but in the python example the table name is hardcoded, and it looks like in the scala example they're referencing a global variable(?) I would like to pass the name of the table into the function.

The function given in the python example at the link above is:

def writeToSQLWarehose(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

I'd like to use something like this:

def writeToSQLWarehose(df, epochId, tableName):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", tableName) \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

But I'm not sure how to pass the additional argument through foreachBatch.

Chirm answered 3/5, 2019 at 16:12 Comment(1)
Have you found a solution for it?Unijugate
P
13

Something like this should work.

streamingDF.writeStream.foreachBatch(lambda df,epochId: writeToSQLWarehose(df, epochId,tableName )).start()
Ptyalism answered 14/8, 2019 at 22:5 Comment(1)
Very nice! Couldn't find such solution anywhere else in documentation!Compensate
O
4

Samellas' solution does not work if you need to run multiple streams. The foreachBatch function gets serialised and sent to Spark worker. The parameter seems to be still a shared variable within the worker and may change during the execution.

My solution is to add parameter as a literate column in the batch dataframe (passing a silver data lake table path to the merge operation):

.withColumn("dl_tablePath", func.lit(silverPath))
.writeStream.format("delta")
.foreachBatch(insertIfNotExisting)

In the batch function insertIfNotExisting, I pick up the parameter and drop the parameter column:

def insertIfNotExisting(batchDf, batchId):
  tablePath = batchDf.select("dl_tablePath").limit(1).collect()[0][0]
  realDf = batchDf.drop("dl_tablePath")
Osithe answered 24/8, 2021 at 9:8 Comment(0)
N
0

This is the cleanest and most pythonic way I could come up with.

from functools import partial

batch_processor = partial(writeToSQLWarehose, tableName="my_table")

streamingDF.writeStream.foreachBatch(batch_processor)

Another alternative is a class.

class BatchProcessor:
    def __init__(self, tableName: str) -> None:
        self.tableName = tableName

    def writeToSQLWarehose(self, df, epochId):
        df.write \
            .format("com.databricks.spark.sqldw") \
            .mode('overwrite') \
            .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
            .option("forward_spark_azure_storage_credentials", "true") \
            .option("dbtable", self.tableName) \
            .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
            .save()

batch_processer = BatchProcessor(tableName="my_table")

streamingDF.writeStream.foreachBatch(batch_processer.writeToSQLWarehose)
         
Nahama answered 19/7, 2024 at 3:35 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.