Spark RDD to DataFrame python
Asked Answered
T

3

51

I am trying to convert the Spark RDD to a DataFrame. I have seen the documentation and example where the scheme is passed to sqlContext.CreateDataFrame(rdd,schema) function.

But I have 38 columns or fields and this will increase further. If I manually give the schema specifying each field information, that it going to be so tedious job.

Is there any other way to specify the schema without knowing the information of the columns prior.

Turbidimeter answered 26/9, 2016 at 9:24 Comment(2)
if you have 38 columns, why do you work with RDD in the first place? why won't you start with DataFrame?Promptitude
I am loading data from Neo4j Graph. Where the data is retrieved as RDD and have some dependencies on it.Turbidimeter
L
78

See,

There are two ways to convert an RDD to DF in Spark.

toDF() and createDataFrame(rdd, schema)

I will show you how you can do that dynamically.

toDF()

The toDF() command gives you the way to convert an RDD[Row] to a Dataframe. The point is, the object Row() can receive a **kwargs argument. So, there is an easy way to do that.

from pyspark.sql.types import Row

#here you are going to create a function
def f(x):
    d = {}
    for i in range(len(x)):
        d[str(i)] = x[i]
    return d

#Now populate that
df = rdd.map(lambda x: Row(**f(x))).toDF()

This way you are going to be able to create a dataframe dynamically.

createDataFrame(rdd, schema)

Other way to do that is creating a dynamic schema. How?

This way:

from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])

df = sqlContext.createDataFrame(rdd, schema)

This second way is cleaner to do that...

So this is how you can create dataframes dynamically.

Leathers answered 26/9, 2016 at 14:25 Comment(5)
Can you please mention which one is the least expensive approach in terms of execution time?Solange
In this case will be dataframe option. Due to using PySpark RDD functions will use the pipe between the JVM and Python to run that logic from f(x) and using DataFrame you will not communicate with python to do the schema after the schema is build with the For.Leathers
I'm sorry. You mean the sqlContext.createDataFrame(rdd, schema) option is better, right?Solange
Yes that one, for PySpark the best option for performance is always using DataFrameLeathers
I downvoted because this casts all fields to strings. The OP says he doesn't know what the columns are ahead of time. How would he know they were strings?Redon
A
3

I liked Arun's answer better but there is a tiny problem and I could not comment or edit the answer. sparkContext does not have createDeataFrame, sqlContext does (as Thiago mentioned). So:

from pyspark.sql import SQLContext

# assuming the spark environemnt is set and sc is spark.sparkContext 
sqlContext = SQLContext(sc)
schemaPeople = sqlContext.createDataFrame(RDDName)
schemaPeople.createOrReplaceTempView("RDDName")
Adenosine answered 6/4, 2020 at 17:53 Comment(1)
An alternative: from pyspark.sql import SparkSession, spark = SparkSession.builder.getOrCreate(), spark.createDataFrame(...)Schlicher
E
0

Try if that works

sc = spark.sparkContext

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(RddName)
schemaPeople.createOrReplaceTempView("RddName")
Ezaria answered 26/3, 2018 at 5:2 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.