SparkR DataFrame partitioning issue
Asked Answered
V

1

6

In my R script, I have a SparkDataFrame of two columns (time, value) containing data for four different months. Because of the fact that I need to apply my function to an each month separately, I figured I would repartition it into four partitions where each of them would hold data for a separate month.

I created an additional column called partition, having an integer value 0 - 3 and after that called the repartition method by this specific column.

Sadly as it's being described in this topic: Spark SQL - Difference between df.repartition and DataFrameWriter partitionBy?, with the repartition method we are only sure that all the data with the same key will end up in the same partition, however data with a different key can also end up in the same partition.

In my case, executing code visible below results in creating 4 partitions but populating only 2 of them with data.

I guess I should be using the partitionBy method, however in case of SparkR I have no idea how to do that. The official documentation states that this method is applied to something called WindowSpec and not a DataFrame.

I would really appreciate some help with this matter as I have no idea how to incorporate this method into my code.

sparkR.session(
   master="local[*]",  sparkConfig = list(spark.sql.shuffle.partitions="4"))
df <- as.DataFrame(inputDat) # this is a dataframe with added partition column
repartitionedDf <- repartition(df, col = df$partition)

schema <- structType(
  structField("time", "timestamp"), 
  structField("value", "double"), 
  structField("partition", "string"))

processedDf <- dapply(repartitionedDf, 
  function(x) { data.frame(produceHourlyResults(x), stringsAsFactors = FALSE) },
  schema)
Virgenvirgie answered 26/1, 2018 at 15:43 Comment(0)
M
5

You are using wrong method. If you

need to apply my function to an each month separately

you should use gapply that

Groups the SparkDataFrame using the specified columns and applies the R function to each group.

df %>% group_by("month") %>% gapply(fun, schema)

or

df %>% gapply("month", fun, schema)

In my case, executing code visible below results in creating 4 partitions but populating only 2 of them with data.

This suggests hash collisions. Increasing number of partitions reasonably above the number of unique keys should resolve the problem:

spark.sql.shuffle.partitions 17

I guess i should be using the partitionBy method, however

No. partitionBy is used with window functions (SparkR window function).


To address your comment:

i decided to use dapply with separate partitions in order to be able to easily save each month into separate CSV file

Hash partitioner doesn't work like that How does HashPartitioner work?

You can try with partitionBy in the writer, but I am not sure if it directly supported in SparkR. It is supported in structured streaming, for batch you may have to call Java methods or use tables with metastore:

createDataFrame(iris) %>% createOrReplaceTempView("iris_view")
sql(
    "CREATE TABLE iris 
    USING csv PARTITIONED BY(species)
    LOCATION '/tmp/iris' AS SELECT * FROM iris_view"
)
Moramorabito answered 26/1, 2018 at 15:53 Comment(3)
I do have it enabled. Now i actually noticed, that when i issued the CREATE command without LOCATION clause it saved the dataframe to the default location\spark-warehouse folder. This default location can be changed by setting the: spark.sql.warehouse.dir to desired value.Virgenvirgie
And what version of Spark do you use? I see it doesn't work before 2.2.Moramorabito
Yea then that is the case, i have 2.1.1. Ill try to update it to the newer version.Virgenvirgie

© 2022 - 2024 — McMap. All rights reserved.