Update some rows of a dataframe or create new dataframe in PySpark
Asked Answered
E

1

0

I am new to PySpark and my objective is to use PySpark script in AWS Glue for:

  1. reading a dataframe from input file in Glue => done
  2. changing columns of some rows which satisfy a condition => facing issue
  3. write the updated dataframe on the same schema into S3 => done

The task seems very simple, but I could not find a way to complete it and still facing different different issues with my changing code.

Till now, my code looks like this:

Transform2.printSchema() # input schema after reading 
Transform2 = Transform2.toDF()
def updateRow(row):
    # my logic to update row based on a global condition 
    #if row["primaryKey"]=="knownKey": row["otherAttribute"]= None
    return row

LocalTransform3 = [] # creating new dataframe from Transform2 
for row in Transform2.rdd.collect():
    row = row.asDict()
    row = updateRow(row)
    LocalTransform3.append(row)
print(len(LocalTransform3))

columns = Transform2.columns
Transform3 = spark.createDataFrame(LocalTransform3).toDF(*columns)
print('Transform3 count', Transform3.count())
Transform3.printSchema()
Transform3.show(1,truncate=False)

Transform4 = DynamicFrame.fromDF(Transform3, glueContext, "Transform3")
print('Transform4 count', Transform4.count()) 

I tried using multiple things like:

  • using map to update existing rows in a lambda
  • using collect()
  • using createDataFrame() to create new dataframe

But faced errors in below steps:

  • not able to create new updated rdd
  • not able to create new dataframe from rdd using existing columns

Some errors in Glue I got, at different stages:

  • ValueError: Some of types cannot be determined after inferring
  • ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
  • An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob. Traceback (most recent call last):

Any working code snippet or help is appreciated.

Elis answered 27/1, 2022 at 16:21 Comment(6)
What is the transformation you want to do on your rows?Wiedmann
if row["primaryKey"]=="knownKey": row["otherAttribute"]= NoneElis
And which column should holy "knownKey" / None?Wiedmann
my logic is: for some input primary keys, make other column as None for rows of these primary keysElis
So everything should be None if a row has a knownKey in the Primary Key column?Wiedmann
No. only a particular attribute ("otherAttribute") will be NoneElis
W
1
from pyspark.sql.functions import col, lit, when

Transform2 = Transform2.toDF()
withKeyMapping = Transform2.withColumn('otherAttribute', when(col("primaryKey") == "knownKey", lit(None)).otherwise(col('otherAttribute')))

This should work for your use-case.

Wiedmann answered 27/1, 2022 at 17:39 Comment(3)
Hi Robert, my one usecase is that the 'otherAttribute' is an attribute inside an array attribute 'arrayAttribute'. Now I want to make 'otherAttribute' as None for all items of 'arrayAttribute'. Is there any way to do it the way you have suggested ?Elis
This is a different question. If this helped you please upvote & accepted and open up another question.Wiedmann
sure. your answer provided me a way to tackle itElis

© 2022 - 2024 — McMap. All rights reserved.