How to export data from Spark SQL to CSV
Asked Answered
F

7

52

This command works with HiveQL:

insert overwrite directory '/data/home.csv' select * from testtable;

But with Spark SQL I'm getting an error with an org.apache.spark.sql.hive.HiveQl stack trace:

java.lang.RuntimeException: Unsupported language features in query:
    insert overwrite directory '/data/home.csv' select * from testtable

Please guide me to write export to CSV feature in Spark SQL.

Fowl answered 11/8, 2015 at 9:24 Comment(1)
This question/answer not solves the problem for Spark 2.x... the real problem is to export to standard CSV format. Please answer here.Cortez
D
88

You can use below statement to write the contents of dataframe in CSV format df.write.csv("/data/home/csv")

If you need to write the whole dataframe into a single CSV file, then use df.coalesce(1).write.csv("/data/home/sample.csv")

For spark 1.x, you can use spark-csv to write the results into CSV files

Below scala snippet would help

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.write.format("com.databricks.spark.csv").save("/data/home/csv")

To write the contents into a single file

import org.apache.spark.sql.hive.HiveContext
// sc - existing spark context
val sqlContext = new HiveContext(sc)
val df = sqlContext.sql("SELECT * FROM testtable")
df.coalesce(1).write.format("com.databricks.spark.csv").save("/data/home/sample.csv")
Darrelldarrelle answered 11/8, 2015 at 11:1 Comment(7)
I tried the coalesce thing you mentioned. It creates a directory at the specified path with a "part" file and a file called "_SUCCESS". Do you know of a way to actually only get the one file?Schacker
No, I think there is no way to do it.Darrelldarrelle
it will not be local file but hdfs fileGraiae
I found a bug in this code, my original directory with partitions csv has 1 extra column when compared to the single csv generated by this code. I know the code works for trivial cases but my last 2 columns were of the format concat('"', concat_ws(",", collect_list(some_column)), '"') which worked fine on insert overwrite but not when I selected all the columns and wrote to this format, even the header was correct but it incorrectly identified the second last column values to fill both and ignored the restPopularity
This is how my csv partitons looked before "USR",0,0,""css","shell","html","python","javascript"","381534,3960,1683,229869,1569090" and this is how they look like now "\"USR\"",0,0,"\"\"css\"","\"shell\""Popularity
I fixed it following this #44395863Popularity
repartition(1) might be faster than coalesce(1), especially if there's heavy computation beforehand.Offside
F
50

Since Spark 2.X spark-csv is integrated as native datasource. Therefore, the necessary statement simplifies to (windows)

df.write
  .option("header", "true")
  .csv("file:///C:/out.csv")

or UNIX

df.write
  .option("header", "true")
  .csv("/var/out.csv")

Notice: as the comments say, it is creating the directory by that name with the partitions in it, not a standard CSV file. This, however, is most likely what you want since otherwise your either crashing your driver (out of RAM) or you could be working with a non distributed environment.

Fontaine answered 29/11, 2016 at 9:57 Comment(6)
Hi all, Is there a way to replace the file as it fails when it tries to rewrite the file.Callison
Sure ! .mode("overwrite").csv("/var/out.csv")Fontaine
In Spark 2.x it is creating the directory by that name. Any help?Irretentive
My guess is that your partitions are inside that directory.Fontaine
But it is not a standard CSV file, it is producing a folder with strange files (!). See https://mcmap.net/q/353673/-how-to-write-standard-csv/287948Cortez
If you're using Spark because you're working with "big" datasets, you probably don't want to anything like coalesce(1) or toPandas() since that will most likely crash your driver (since the whole dataset has to fit in the drivers RAM). On the other hand: If your data does fit into the RAM of a single machine - why are you torturing yourself with distributed computing?Fontaine
S
32

The answer above with spark-csv is correct but there is an issue - the library creates several files based on the data frame partitioning. And this is not what we usually need. So, you can combine all partitions to one:

df.coalesce(1).
    write.
    format("com.databricks.spark.csv").
    option("header", "true").
    save("myfile.csv")

and rename the output of the lib (name "part-00000") to a desire filename.

This blog post provides more details: https://fullstackml.com/2015/12/21/how-to-export-data-frame-from-apache-spark/

Smolensk answered 12/7, 2016 at 8:1 Comment(5)
One can add model as well, if one wishes to keep writing to an existing file. resultDF.repartition(1).write.mode("append").format("com.databricks.spark.csv").option("header", "true").save("s3://...")Seay
coalesce(1) requires the dataset to fit into the heap of a single machine and will most likely cause issues when working with large datasetsFontaine
@DmitryPetrov Do we need to mention write.format("com...") option when including coalesce option ?Clarance
@Clarance Yes, coalesce(1) just repartitions to a single partition (file).Smolensk
@DmitryPetrov I understand that coalesce(1) just repartitions it to a single partition file but do we need to explicitly mention the write.format option in Spark 2.x when we use coalesce option ?Clarance
B
11

The simplest way is to map over the DataFrame's RDD and use mkString:

  df.rdd.map(x=>x.mkString(","))

As of Spark 1.5 (or even before that) df.map(r=>r.mkString(",")) would do the same if you want CSV escaping you can use apache commons lang for that. e.g. here's the code we're using

 def DfToTextFile(path: String,
                   df: DataFrame,
                   delimiter: String = ",",
                   csvEscape: Boolean = true,
                   partitions: Int = 1,
                   compress: Boolean = true,
                   header: Option[String] = None,
                   maxColumnLength: Option[Int] = None) = {

    def trimColumnLength(c: String) = {
      val col = maxColumnLength match {
        case None => c
        case Some(len: Int) => c.take(len)
      }
      if (csvEscape) StringEscapeUtils.escapeCsv(col) else col
    }
    def rowToString(r: Row) = {
      val st = r.mkString("~-~").replaceAll("[\\p{C}|\\uFFFD]", "") //remove control characters
      st.split("~-~").map(trimColumnLength).mkString(delimiter)
    }

    def addHeader(r: RDD[String]) = {
      val rdd = for (h <- header;
                     if partitions == 1; //headers only supported for single partitions
                     tmpRdd = sc.parallelize(Array(h))) yield tmpRdd.union(r).coalesce(1)
      rdd.getOrElse(r)
    }

    val rdd = df.map(rowToString).repartition(partitions)
    val headerRdd = addHeader(rdd)

    if (compress)
      headerRdd.saveAsTextFile(path, classOf[GzipCodec])
    else
      headerRdd.saveAsTextFile(path)
  }
Biggs answered 12/8, 2015 at 4:41 Comment(2)
While this is the simplest answer (and a good one), if you're text has double-quotes, you'll have to account for them.Charmain
Simply getting the error after create RDD for the table scala> df.rdd.map(x=>x.mkString(",")); <console>:18: error: value rdd is not a member of org.apache.spark.sql.SchemaRDD df.rdd.map(x=>x.mkString(","));Fowl
S
2

With the help of spark-csv we can write to a CSV file.

val dfsql = sqlContext.sql("select * from tablename")
dfsql.write.format("com.databricks.spark.csv").option("header","true").save("output.csv")`
Subterfuge answered 15/1, 2018 at 15:41 Comment(1)
No, it is not a real CSV file, the result output.csv is a folder.Cortez
G
1

The error message suggests this is not a supported feature in the query language. But you can save a DataFrame in any format as usual through the RDD interface (df.rdd.saveAsTextFile). Or you can check out https://github.com/databricks/spark-csv.

Greig answered 11/8, 2015 at 10:45 Comment(2)
scala> df.write.format("com.databricks.spark.csv").save("/data/home.csv") <console>:18: error: value write is not a member of org.apache.spark.sql.SchemaRDD Do I need to build current jar with databricks package again?Fowl
DataFrame.write was added in Apache Spark 1.4.0.Greig
M
-3

IN DATAFRAME:

val p=spark.read.format("csv").options(Map("header"->"true","delimiter"->"^")).load("filename.csv")
Moulmein answered 23/11, 2018 at 10:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.