pyspark udf print row being analyzed
Asked Answered
S

1

8

I have a problem inside a pyspark udf function and I want to print the number of the row generating the problem.

I tried to count the rows using the equivalent of "static variable" in Python so that when the udf is called with a new row, a counter is incremented. However, it is not working:

import pyspark.sql.functions as F
def myF(input):
    myF.lineNumber += 1
    if (somethingBad):
        print(myF.lineNumber)
    return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())

How can I count the number of times a udf is called in order to find the number of the row generating the problem in pyspark?

Salmonoid answered 18/1, 2019 at 11:4 Comment(6)
wh the downvote?Salmonoid
An UDF is applied to each row ... you cannot count the number of line within an UDF because the UDF is duplicated and applied to each rows ...Straggle
If you would have read the question more carefully you would see that I am not trying to count the number of rows directly but trying to have a counter counting how many times the function is called.Salmonoid
my bad ... but anyway, this is still not possible for the only reason that a dup of the function is executed at a worker level and only the result is brougt back to the driver level. Therefore, you cannot access the final value for lineNumber.Straggle
even if I am executing pyspark locally (I mean only on my computer, no distriburion)?Salmonoid
The solution I can offer you is to output a struct with 2 cols : "good result" and "bad result" and then count the bad or good results ...Straggle
R
15

UDFs are executed at workers, so the print statements inside them won't show up in the output (which is from the driver). The best way to handle issues with UDFs is to change the return type of the UDF to a struct or a list and pass the error information along with the returned output. In the code below I am just adding the error info to the string res that you were returning originally.

import pyspark.sql.functions as F
def myF(input):
  myF.lineNumber += 1
  if (somethingBad):
    res += 'Error in line {}'.format(myF.lineNumber)
  return res

myF.lineNumber = 0

myF_udf =  F.udf(myF, StringType())
Responsible answered 18/4, 2019 at 19:18 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.