spark delta overwrite a specific partition
Asked Answered
D

3

10

So I have a dataframe which has a column, file_date. For a given run, the dataframe has only data for one unique file_date. For instance, in a run, let us assume that there are say about 100 records with a file_date of 2020_01_21.

I am writing this data using the following

(df
 .repartition(1)
 .write
 .format("delta")
 .partitionBy("FILE_DATE")
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .option("replaceWhere","FILE_DATE=" + run_for_file_date)
 .mode("overwrite")
 .save("/mnt/starsdetails/starsGrantedDetails/"))

My requirement is to create a folder/partition for every FILE_DATE as there is a good chance that data for a specific file_date will be rerun and the specific file_date’s data has to be overwritten. Unfortunately in the above code, if I don’t place the “replaceWhere” option, it just overwrites data for other partitions too but if I write the above, data seems to be overwriting correctly the specific partition but every time the write is done, I am getting the following error.

Please note I have also set the following spark config before the write:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

But I am still getting the following error:

AnalysisException: "Data written out does not match replaceWhere 'FILE_DATE=2020-01-19'.\nInvalid data would be written to partitions FILE_DATE=2020-01-20.;"

Can you kindly help please.

Discontent answered 22/1, 2020 at 0:41 Comment(4)
same issue hereBatch
samuel liew - Not sure why my comment was deleted..pls let me knowSilsby
did you get the solution for this? pls let me knowSilsby
spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") work for only parquet table.Microclimate
M
21

There are couple of things that need to be in mind while using replaceWhereto overwrite delta partition. Your dataframe must be filtered before writing into partitions for example we have dataframe DF:

enter image description here

When We write this dataframe into delta table then dataframe partition coulmn range must be filtered which means we should only have partition column values within our replaceWhere condition range.

 DF.write.format("delta").mode("overwrite").option("replaceWhere",  "date >= '2020-12-14' AND date <= '2020-12-15' ").save( "Your location")

if we use condition date < '2020-12-15' instead of date <= '2020-12-15' it will give us error:

enter image description here

Other thing is partition column value needed in quotation '2020-12-15' otherwise chances are it will give error.

There is also pull request open for delta overwrite partitionspark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") here https://github.com/delta-io/delta/pull/371 not sure if they are planning to introduce it.

Microclimate answered 15/12, 2020 at 12:2 Comment(3)
This should be marked as right answer. Thanks AliSubjacent
Any idea why does this only work on Scala ? In PySpark it soesnt have the same behaviourForefoot
I am using PySpark and working same way as it works in scala.Microclimate
F
3

I faced the same and realized, my dataframe had more values for the column that I tried to replace partition with. You must filter your dataframe first and preapare your replaceWhere and write.

Frankpledge answered 10/8, 2020 at 12:2 Comment(0)
D
0

What's is the specific use of ReplaceWhere here? The thing which you are trying to achieve can be simply done by setting ("spark.sql.sources.partitionOverwriteMode","dynamic"). It will replace existing partition if there and add new partition if it's not already there. Correct me if I am having wrong understanding.

Desultory answered 12/12, 2023 at 13:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.