How to load local csv file in spark via --files option
Asked Answered
C

1

0

I have small csv file with a counties dictionary which I want to use in a large spark job. I load this file via --files option of spark-submit:

spark-submit --master yarn <some config stuff> --files /var/lib/data/airflow/dags/other/path/log4j.properties,/var/lib/data/airflow/dags/other/path/countries.csv,/var/lib/data/airflow/dags/other/path/gdpr.csv

Next, I see the following in logs:

[2024-06-14 19:27:45.980+0300] INFO org.apache.spark.deploy.yarn.Client: Uploading resource file:/var/lib/data/airflow/dags/other/path/countries.csv -> hdfs://hdfs/user/user1/.sparkStaging/application_1718353091108_5460/countries.csv
[2024-06-14 19:27:46,054] {spark_submit.py:502} INFO - Identified spark driver id: application_1718353091108_5460
[2024-06-14 19:27:46,055] {spark_submit.py:526} INFO - [2024-06-14 19:27:46.054+0300] INFO org.apache.spark.deploy.yarn.Client: Uploading resource file:/var/lib/data/airflow/dags/other/path/gdpr.csv -> hdfs://hdfs/user/user1/.sparkStaging/application_1718353091108_5460/gdpr.csv
[2024-06-14 19:27:46,249] {spark_submit.py:502} INFO - Identified spark driver id: application_1718353091108_5460

But when I try to read this file in spark job (file = 'countries.csv' or 'gdpr.csv'):

val localPath = SparkFiles.get(file)
val localFile = new File(localPath)
spark.read.format(format).load(s"file://$localPath").createTempView(name)

I get the error:

[2024-06-14 19:37:30,411] {spark_submit.py:526} INFO - diagnostics: User class threw exception: org.apache.spark.sql.AnalysisException: Path does not exist: file:/var/lib/hadoop/data4/nodemanager/cache/nm-local-dir/usercache/user1/appcache/application_1718353091108_5460/spark-2f967a42-74b3-421f-9139-43b3e999d5db/userFiles-6319505f-af66-402f-85b5-86c92518817e/countries.csv;

How can I find and load these files?

Consistency answered 17/6 at 7:48 Comment(3)
why do you do val localFile = new File(localPath)?Fundamental
I would say pass localPath to load() as is, without string interpolation, without any wrapping/calculationsFundamental
Does this answer your question? Read files sent with spark-submit by the driverHecatomb
C
0

Did it via uploading to HDFS. Here is the working example:

import org.apache.hadoop.fs.Path


//tmp: org.apache.hadoop.fs.Path - temporary folder

val from = new File(file).getAbsoluteFile.toPath
val to = tmp + file
fc.upload(from, to)
spark.read.format(format).load(to.asStr)

Also possible via loading local file and spark.createDataFrame

Consistency answered 19/6 at 7:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.