I have a spark code which saves a dataframe to a HDFS location (date partitioned location) in Json format using append mode.
df.write.mode("append").format('json').save(hdfsPath)
sample hdfs location : /tmp/table1/datepart=20190903
I am consuming data from upstream in NiFi cluster. Each node in NiFi cluster will create a flow file for consumed data. My spark code is processing that flow file.As NiFi is distributed, my spark code is getting executed from different NiFi nodes in parallel trying to save data into same HDFS location.
I cannot store output of spark job in different directories as my data is partitioned on date.
This process is running daily once from last 14 days and my spark job failed 4 times with different errors. First Error:
java.io.IOException: Failed to rename FileStatus{path=hdfs://tmp/table1/datepart=20190824/_temporary/0/task_20190824020604_0000_m_000000/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json; isDirectory=false; length=0; replication=3; blocksize=268435456; modification_time=1566630365451; access_time=1566630365034; owner=hive; group=hive; permission=rwxrwx--x; isSymlink=false} to hdfs://tmp/table1/datepart=20190824/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json
Second Error:
java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190825/_temporary/0 does not exist.
Third Error:
java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190901/_temporary/0/task_20190901020450_0000_m_000000 does not exist.
Fourth Error:
java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190903/_temporary/0 does not exist.
Following are the problems/issue:
- I am not able to recreate this scenario again. How to do that?
- On all 4 occasions, errors are related to _temporary directory. Is is because 2 or more jobs are parallelly trying to save the data in same HDFS location and whiling doing that Job A might have deleted _temporary directory of Job B? (Because of the same location and all folders have common name /_directory/0/)
If it is concurrency problem then I can run all NiFi processor from primary node but then I will loose the performance.
Need your expert advice.
Thanks in advance.