Since there are to the tune of 30/100 columns, so let's add a few more columns to the DataFrame
to generalize it well.
# Loading the requisite packages
from pyspark.sql.functions import col, when
df = sc.parallelize([(1,"foo","val","baz","gun","can","baz","buz","oof"),
(2,"bar","baz","baz","baz","got","pet","stu","got"),
(3,"baz","buz","pun","iam","you","omg","sic","baz")]).toDF(["x","y","z","a","b","c","d","e","f"])
df.show()
+---+---+---+---+---+---+---+---+---+
| x| y| z| a| b| c| d| e| f|
+---+---+---+---+---+---+---+---+---+
| 1|foo|val|baz|gun|can|baz|buz|oof|
| 2|bar|baz|baz|baz|got|pet|stu|got|
| 3|baz|buz|pun|iam|you|omg|sic|baz|
+---+---+---+---+---+---+---+---+---+
Let's say we want to replace
baz
with Null
in all the columns except in column x
and a
. Use list comprehensions
to choose those columns where replacement
has to be done.
# This contains the list of columns where we apply replace() function
all_column_names = df.columns
print(all_column_names)
['x', 'y', 'z', 'a', 'b', 'c', 'd', 'e', 'f']
columns_to_remove = ['x','a']
columns_for_replacement = [i for i in all_column_names if i not in columns_to_remove]
print(columns_for_replacement)
['y', 'z', 'b', 'c', 'd', 'e', 'f']
Finally, doing the replacement using when()
, which actually is a pseudonym for if
clause.
# Doing the replacement on all the requisite columns
for i in columns_for_replacement:
df = df.withColumn(i,when((col(i)=='baz'),None).otherwise(col(i)))
df.show()
+---+----+----+---+----+---+----+---+----+
| x| y| z| a| b| c| d| e| f|
+---+----+----+---+----+---+----+---+----+
| 1| foo| val|baz| gun|can|null|buz| oof|
| 2| bar|null|baz|null|got| pet|stu| got|
| 3|null| buz|pun| iam|you| omg|sic|null|
+---+----+----+---+----+---+----+---+----+
There is no need to create a UDF
and define a function to do the replacement if it can be done with normal if-else
clause. UDF
s are in general a costly operation and should be avoided when ever possible.