Can I read a CSV represented as a string into Apache Spark using spark-csv?
Asked Answered
F

4

15

I know how to read a CSV file into Apache Spark using spark-csv, but I already have the CSV file represented as a string and would like to convert this string directly to dataframe. Is this possible?

Foliage answered 23/8, 2016 at 22:53 Comment(4)
No, it is not possible. In Python you could use Pandas, IO and convert result to Spark data frame.Possess
@Possess can you post your comment as an answer instead?Crafty
Should this be tagged with Scala (not a rhetorical question)?Gonzalez
@PeterMortensen, I don't mind adding Scala as a tag. My actual use case and solution is written in Java, actually, but some of the answers below are in Scala. I'm not sure if I should add language specific tags, though, since the question doesn't regard or require a specific language be addressed. It's more about spark-csv and spark, independent of the language.Foliage
H
20

Update for Spark 3.x - although actually more for Java 17, to make it compatible with the new lines() function type nature:

import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()

import spark.implicits._
import scala.collection.JavaConverters._

val csvData: Dataset[String] = ("""
                                  |id, date, timedump
                                  |1, "2014/01/01 23:00:01",1499959917383
                                  |2, "2014/11/31 12:40:32",1198138008843
      """.stripMargin.lines.toList.asScala).toDS()

val frame = spark.read.option("header", true).option("inferSchema", true).csv(csvData)
frame.show()
frame.printSchema()

Starting from Spark 2.2.x

There is finally a proper way to do it using Dataset.

import org.apache.spark.sql.{Dataset, SparkSession}
val spark = SparkSession.builder().appName("CsvExample").master("local").getOrCreate()

import spark.implicits._
val csvData: Dataset[String] = spark.sparkContext.parallelize(
  """
    |id, date, timedump
    |1, "2014/01/01 23:00:01",1499959917383
    |2, "2014/11/31 12:40:32",1198138008843
  """.stripMargin.lines.toList).toDS()

val frame = spark.read.option("header", true).option("inferSchema",true).csv(csvData)
frame.show()
frame.printSchema()

Old Apache Spark versions

Actually you can, though it's using library internals and not widely advertised. Just create and use your own CsvParser instance. An example, that works for me on Spark 1.6.0 and spark-csv_2.10-1.4.0, is below:

import com.databricks.spark.csv.CsvParser

val csvData = """
    |userid,organizationid,userfirstname,usermiddlename,userlastname,usertitle
    |1,1,user1,m1,l1,mr
    |2,2,user2,m2,l2,mr
    |3,3,user3,m3,l3,mr
    |""".stripMargin
    val rdd = sc.parallelize(csvData.lines.toList)
    val csvParser = new CsvParser()
      .withUseHeader(true)
      .withInferSchema(true)


    val csvDataFrame: DataFrame = csvParser.csvRdd(sqlContext, rdd)
Himation answered 23/6, 2017 at 5:52 Comment(0)
M
5

You can parse your string into a CSV string using, e.g. scala-csv:

val myCSVdata : Array[List[String]] =
  myCSVString.split('\n').flatMap(CSVParser.parseLine(_))

Here you can do a bit more processing, data cleaning, verifying that every line parses well and has the same number of fields, etc.

You can then make this an RDD of records:

val myCSVRDD : RDD[List[String]] = sparkContext.parallelize(msCSVdata)

Here you can massage your lists of Strings into a case class, to reflect the fields of your CSV data better. You should get some inspiration from the creations of Persons in this example:

Spark SQL, DataFrames and Datasets Guide

I omit this step.

You can then convert to a DataFrame:

import spark.implicits._
myCSVDataframe = myCSVRDD.toDF()
Myna answered 24/8, 2016 at 10:0 Comment(1)
The link is half-broken (page anchor).Gonzalez
H
4

The accepted answer wasn't working for me in Apache Spark 2.2.0, but it lead me to what I needed with csvData.lines.toList:

val fileUrl = getClass.getResource(s"/file_in_resources.csv")
val stream = fileUrl.getContent.asInstanceOf[InputStream]
val streamString = Source.fromInputStream(stream).mkString

val csvList = streamString.lines.toList

spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(csvList.toDS())
  .as[SomeCaseClass]
Herd answered 13/12, 2017 at 16:52 Comment(0)
D
0

This is the PySpark solution I recently encountered and was successful in. Here I am taking console output dataframe.show output and am creating a datafame using the CSV API of Spark.

Since the Scala version is already there, this PySpark version is slightly different from that. I used this for an impala/hive console outputs to convert as CSV for my unit testing and was really useful.

I used regular expressions... remove +-----+ kind of strings

 re.sub(r'\n[+-]+\n' , '\n', input_data)
import os
import re
import sys

from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# Initialize Spark session
spark = SparkSession.builder \
    .appName("String to CSV") \
    .getOrCreate()

# Input data as a string
input_data = """
+-----+------------------+-------+
|empid|empname           |salary|
|    1|    Ram Ghadiyaram| 10000|
+-----+-------+----------+--------+
""".replace("|\n","\n").replace("\n|","\n")

#remove +-----+-------+------+ from the string
input_data = re.sub(r'\n[+-]+\n' , '\n', input_data)
# Capture the input data as a string
df = spark.read.option("header","true").option("inferSchema","true").option("delimiter", "|").csv(spark.sparkContext.parallelize(input_data.split("\n")))
df.printSchema()
# Show the result CSV data
df.show()

The complete explanation is there in my article.

Dearth answered 13/10, 2023 at 3:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.