How to read only n rows of large CSV file on HDFS using spark-csv package?
Asked Answered
L

7

22

I have a big distributed file on HDFS and each time I use sqlContext with spark-csv package, it first loads the entire file which takes quite some time.

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")

now as I just want to do some quick check at times, all I need is few/ any n rows of the entire file.

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)

but all these run after the file load is done. Can't I just restrict the number of rows while reading the file itself ? I am referring to n_rows equivalent of pandas in spark-csv, like:

pd_df = pandas.read_csv("file_path", nrows=20)

Or it might be the case that spark does not actually load the file, the first step, but in this case, why is my file load step taking too much time then?

I want

df.count()

to give me only n and not all rows, is it possible ?

Larva answered 31/5, 2017 at 6:15 Comment(0)
D
13

My understanding is that reading just a few lines is not supported by spark-csv module directly, and as a workaround you could just read the file as a text file, take as many lines as you want and save it to some temporary location. With the lines saved, you could use spark-csv to read the lines, including inferSchema option (that you may want to use given you are in exploration mode).

val numberOfLines = ...
spark.
  read.
  text("myfile.csv").
  limit(numberOfLines).
  write.
  text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
  read.
  option("inferSchema", true). // <-- you are in exploration mode, aren't you?
  csv(s"myfile-$numberOfLines.csv")
Desiderata answered 31/5, 2017 at 6:26 Comment(5)
+1 although wouldn't .read() be read by multiple executors so the order of .limit() is not guranteed? In other words, yes you get numberOfLines lines but those aren't necessary first lines in the original "myfile.csv", isn't it?Mcintosh
I tried this but text().limit(2) seems to be reading the whole file first, which in my case is several GBs.Dominickdominie
@Dominickdominie That seems your file is not split due to a large split size on HDFS and simply fits into one single split (?) Just guessing. You should ask a separate question and provide more information, e.g. query plan and HDFS configuration.Desiderata
@Mcintosh My understanding is that the order is guaranteed. Upon uploading a CSV file onto HDFS, the splits will be in order and that's what Spark knows about the file.Desiderata
I notice the csv reader has the option samplingRatio – defines fraction of rows used for schema inferring. If None is set, it uses the default value, 1.0. Perhaps that is a performant way to infer the schema whilst in exploration, and help you to explicitly specify schema for performance & repeatability in production.Reichstag
R
23

You can use limit(n).

sqlContext.format('com.databricks.spark.csv') \
          .options(header='true', inferschema='true').load("file_path").limit(20)

This will just load 20 rows.

Reconnaissance answered 31/5, 2017 at 6:26 Comment(9)
Would that preclude inferSchema, i.e. what if I used inferSchema enabled?Desiderata
If you inferSchema, all the data will be scanned anyway. Scanned and not loaded.Reconnaissance
How is scanning different from loading?Desiderata
Scanning as a passing over the data and loaded as in putting the data in memory... I'm not sure I don't get what you don't get about that @JacekLaskowski :-/Reconnaissance
"Scanning as a passing over the data" == "loaded as in putting the data in memory" are the same things on Spark executors.Desiderata
No man it's not. :) and by exploration, the OP is referring to the data exploration process in data analysis...Reconnaissance
I'm sorry but I don't agree with you on this one @JacekLaskowskiReconnaissance
@Reconnaissance provided solution loads file first and then limit it. How is this going to help if I am having a petabyte file ? this will anyway load all the rows and then show limited records n.Closefitting
The solution is to read a part of file as it will be in a distributed environment. sqlContext.format('com.databricks.spark.csv') \ .options(header='true', inferschema='true').load("file_path/part-00000* ")Moralez
D
13

My understanding is that reading just a few lines is not supported by spark-csv module directly, and as a workaround you could just read the file as a text file, take as many lines as you want and save it to some temporary location. With the lines saved, you could use spark-csv to read the lines, including inferSchema option (that you may want to use given you are in exploration mode).

val numberOfLines = ...
spark.
  read.
  text("myfile.csv").
  limit(numberOfLines).
  write.
  text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
  read.
  option("inferSchema", true). // <-- you are in exploration mode, aren't you?
  csv(s"myfile-$numberOfLines.csv")
Desiderata answered 31/5, 2017 at 6:26 Comment(5)
+1 although wouldn't .read() be read by multiple executors so the order of .limit() is not guranteed? In other words, yes you get numberOfLines lines but those aren't necessary first lines in the original "myfile.csv", isn't it?Mcintosh
I tried this but text().limit(2) seems to be reading the whole file first, which in my case is several GBs.Dominickdominie
@Dominickdominie That seems your file is not split due to a large split size on HDFS and simply fits into one single split (?) Just guessing. You should ask a separate question and provide more information, e.g. query plan and HDFS configuration.Desiderata
@Mcintosh My understanding is that the order is guaranteed. Upon uploading a CSV file onto HDFS, the splits will be in order and that's what Spark knows about the file.Desiderata
I notice the csv reader has the option samplingRatio – defines fraction of rows used for schema inferring. If None is set, it uses the default value, 1.0. Perhaps that is a performant way to infer the schema whilst in exploration, and help you to explicitly specify schema for performance & repeatability in production.Reichstag
V
5

The solution given by Jacek Laskowski works well. Presenting an in-memory variation below.

I recently ran into this problem. I was using databricks and had a huge csv directory (200 files of 200MB each)

I originally had

val df = spark.read.format("csv")
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.load("dbfs:/huge/csv/files/in/this/directory/")

display(df)

which took a lot of time (10+ minutes), but then I change it to below and it ran instantly (2 seconds)

val lines = spark.read.text("dbfs:/huge/csv/files/in/this/directory/").as[String].take(1000)

val df = spark.read
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.csv(spark.createDataset(lines))

display(df)

Inferring schema for text formats is hard and it can be done this way for the csv and json (but not if it's a multi-line json) formats.

Veinule answered 17/2, 2021 at 11:53 Comment(1)
This is what I was looking for. Clean straightforward way to just partially read a file in SparkCogitable
L
4

Not inferring schema and using limit(n) worked for me, in all aspects.

f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)

Note: If we use inferschema='true', its again the same time, and maybe hence the same old thing.

But if we dun have idea of the schema, Jacek Laskowski solutions works well too. :)

Larva answered 31/5, 2017 at 7:23 Comment(1)
It's always going to be better to explicitly provide the schema, if you have it. You could use inferSchema the first time you read it and then save & use the df.schema for later, but if the files are so huge that doing it even once is too much, then the workaround suggested is right. The correct answer is a combination of this and https://mcmap.net/q/572177/-how-to-read-only-n-rows-of-large-csv-file-on-hdfs-using-spark-csv-packageReichstag
B
3

Since PySpark 2.3 you can simply load data as text, limit, and apply csv reader on the result:

(spark
  .read
  .options(inferSchema="true", header="true")
  .csv(
      spark.read.text("/path/to/file")
          .limit(20)                   # Apply limit
          .rdd.flatMap(lambda x: x)))  # Convert to RDD[str]

Scala counterpart is available since Spark 2.2:

spark
  .read
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .csv(spark.read.text("/path/to/file").limit(20).as[String])

In Spark 3.0.0 or later one can also apply limit and use from_csv function, but it requires a schema, so it probably won't fit your requirements.

Basswood answered 22/6, 2019 at 21:6 Comment(2)
The original question doesn't preclude knowing the schema. I think it's worthwhile adding the Spark 3.0.0 solution and mentioning that if you know the schema then you should provide it and not use inferSchema. That would complete this as the correct answer.Reichstag
strangely that does not shows the header. maybe limit does not preserve the order of rows since it is an RDDUncharitable
L
0

Since I didn't see that solution in the answers, the pure SQL-approach is working for me:

df = spark.sql("SELECT * FROM csv.`/path/to/file` LIMIT 10000")

If there is no header the columns will be named _c0, _c1, etc. No schema required.

Luba answered 13/7, 2021 at 6:30 Comment(0)
L
0

May be this would be helpful who is working in java. Applying limit will not help to reduce the time. You have to collect the n rows from the file.

        DataFrameReader frameReader = spark
          .read()
          .format("csv")
          .option("inferSchema", "true");
    //set framereader options, delimiters etc

    List<String> dataset = spark.read().textFile(filePath).limit(MAX_FILE_READ_SIZE).collectAsList();
    return frameReader.csv(spark.createDataset(dataset, Encoders.STRING()));
Luzon answered 9/11, 2021 at 12:26 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.