Spark concurrent writes on same HDFS location
Asked Answered
D

2

8

I have a spark code which saves a dataframe to a HDFS location (date partitioned location) in Json format using append mode.

df.write.mode("append").format('json').save(hdfsPath)
sample hdfs location : /tmp/table1/datepart=20190903

I am consuming data from upstream in NiFi cluster. Each node in NiFi cluster will create a flow file for consumed data. My spark code is processing that flow file.As NiFi is distributed, my spark code is getting executed from different NiFi nodes in parallel trying to save data into same HDFS location.

I cannot store output of spark job in different directories as my data is partitioned on date.

This process is running daily once from last 14 days and my spark job failed 4 times with different errors. First Error:

java.io.IOException: Failed to rename FileStatus{path=hdfs://tmp/table1/datepart=20190824/_temporary/0/task_20190824020604_0000_m_000000/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json; isDirectory=false; length=0; replication=3; blocksize=268435456; modification_time=1566630365451; access_time=1566630365034; owner=hive; group=hive; permission=rwxrwx--x; isSymlink=false} to hdfs://tmp/table1/datepart=20190824/part-00000-101aa2e2-85da-4067-9769-b4f6f6b8f276-c000.json

Second Error:

java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190825/_temporary/0 does not exist.

Third Error:

java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190901/_temporary/0/task_20190901020450_0000_m_000000 does not exist.

Fourth Error:

java.io.FileNotFoundException: File hdfs://tmp/table1/datepart=20190903/_temporary/0 does not exist.

Following are the problems/issue:

  1. I am not able to recreate this scenario again. How to do that?
  2. On all 4 occasions, errors are related to _temporary directory. Is is because 2 or more jobs are parallelly trying to save the data in same HDFS location and whiling doing that Job A might have deleted _temporary directory of Job B? (Because of the same location and all folders have common name /_directory/0/)

If it is concurrency problem then I can run all NiFi processor from primary node but then I will loose the performance.

Need your expert advice.

Thanks in advance.

Detrimental answered 3/9, 2019 at 18:15 Comment(6)
point 2 is the issue. well known fact.Plumbum
@Plumbum Can you please share some information, links on this issue? I tried to search with Spark concurrency but didn't found any useful information. Thanks a gain for quick reply.Detrimental
will do later ...Plumbum
This is not S3 I assume from the abovePlumbum
Cannot find the information. Will look again tomorrowPlumbum
I cannot find the stuff I once read, but u state it works most of the time. So, check with Hworks.Plumbum
E
1

It seems the problem is that two spark nodes are independently trying to write to the same place, causing conflicts as the fastest one will clear up the working directory before the second one expects it.

The most straightforward solution may be to avoid this.

As I understand how you use Nifi and spark, the node where Nifi runs also determines the node where spark runs (there is a 1-1 relationship?)

If that is the case you should be able to solve this by routing the work in Nifi to nodes that do not interfere with each other. Check out the load balancing strategy (property of the queue) that depends on attributes. Of course you would need to define the right attribute, but something like directory or table name should go a long way.

Evante answered 2/7, 2020 at 18:49 Comment(0)
S
0

Try to enable outputcommitter v2:

spark.conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")

It doesn't use shared temp directory for files , but creates .sparkStaging-<...> independent temp directories for each write
It also speeds up write, but allow some rear hypothetical cases of partial data write
Try to check this doc for more info:
https://spark.apache.org/docs/3.0.0-preview/cloud-integration.html#recommended-settings-for-writing-to-object-stores

Spiritualism answered 19/6, 2022 at 17:47 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.