PySpark flatmap should return tuples with typed values
Asked Answered
M

1

1

I'm using Jupyter Notebook with PySpark. Within that I have a have a dataframe that has a schema with column names and types (integer, ...) for those columns. Now I use methods like flatMap but this returns a list of tuples that have no fixed type anymore. Is there a way to achieve that?

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- ...
 |-- ...
 |-- ratings: integer (nullable = true)

Then I use flatMap to do some calculations with the rating values (obfuscated here):

df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings))
y_rate.toDF().printSchema()

And now I get an error:

TypeError: Can not infer schema for type:

Is there any way to use map/flatMap/reduce by keeping the schema? or at least returning tuples that have values of a specific type?

Mehala answered 14/5, 2016 at 9:59 Comment(0)
N
2

First of all you're using a wrong function. flatMap will map and flatten so assuming your data looks like this:

df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"])

output of the flatMap will be equivalent to:

sc.parallelize(['foo', 0, 'bar', 5])

Hence the error you see. If you really want to make it work you should use map:

df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF()
## DataFrame[_1: string, _2: bigint]

Next, mapping over DataFrame is no longer supported in 2.0. You should extract rdd first (see df.rdd.map above).

Finally passing data between Python and JVM is extremely inefficient. It not only requires passing data between Python and JVM with corresponding serialization / deserialization and schema inference (if schema is not explicitly provided) which also breaks laziness. It is better to use SQL expressions for things like this:

from pyspark.sql.functions import when

df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings))

If for some reason you need plain Python code an UDF could be a better choice.

Novick answered 14/5, 2016 at 10:53 Comment(4)
Extremely helpful. thanks for your example code. I just didn't get the part with flatMap vs Map.Mehala
flatMap is a function RDD[T] => (T => Iterable[U]) => RDD[U]. In other words it expects function to return Itereble (Python tuple is) and concatenates these (flattens) the result.Novick
Is there a way to give the when/otherwise column a name in that statement? see df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings)) @NovickMehala
Yes, you can use alias for example: when(df.ratings > 5, 5).otherwise(df.ratings).alias("foo").Novick

© 2022 - 2024 — McMap. All rights reserved.