Using apply functions in SparkR
Asked Answered
F

2

7

I am currently trying to implement some functions using sparkR version 1.5.1. I have seen older (version 1.3) examples, where people used the apply function on DataFrames, but it looks like this is no longer directly available. Example:

x = c(1,2)
xDF_R = data.frame(x)
colnames(xDF_R) = c("number")
xDF_S = createDataFrame(sqlContext,xDF_R)

Now, I can use the function sapply on the data.frame object

xDF_R$result = sapply(xDF_R$number, ppois, q=10)

When I use a similar logic on the DataFrame

xDF_S$result = sapply(xDF_S$number, ppois, q=10)

I get the error message "Error in as.list.default(X) : no method for coercing this S4 class to a vector"

Can I somehow do this?

Fleisig answered 22/10, 2015 at 16:32 Comment(0)
V
0

This is possible with user defined functions in Spark 2.0.

wrapper = function(df){
+     out = df
+     out$result = sapply(df$number, ppois, q=10)
+     return(out)
+ }
> xDF_S2 = dapplyCollect(xDF_S, wrapper)
> identical(xDF_S2, xDF_R)
[1] TRUE

Note you need a wrapper function like this because you can't pass the extra arguments in directly, but that may change in the future.

Vinna answered 24/7, 2016 at 14:32 Comment(0)
F
0

The native R functions do not support Spark DataFrames. We can use user defined functions in SparkR to execute native R modules. These are executed on the executors and thus the libraries must be available on all the executors.

For example, suppose we have a custom function holt_forecast which takes in a data.table as an argument.

Sample R code

sales_R_df %>%
  group_by(product_id) %>%
  do(holt_forecast(data.table(.))) %>%
  data.table(.) -> dt_holt

For using UDFs, we need to specify the schema of the output data.frame returned by the execution of the native R method. This schema is used by Spark to generate back the Spark DataFrame.

Equivalent SparkR code

  1. Define the schema

      structField("product_id", "integer"),
      structField("audit_date", "date"),
      structField("holt_unit_forecast", "double"),
      structField("holt_unit_forecast_std", "double")
    )
    
    
  2. Execute the method

      library(data.table)
      library(lubridate)
      library(dplyr)
      library(forecast)
      sales <- data.table(x)
      y <- data.frame(key,holt_forecast(sales))
    }, dt_holt_schema)
    
    

Reference: https://shbhmrzd.medium.com/stl-and-holt-from-r-to-sparkr-1815bacfe1cc

Fleda answered 28/11, 2020 at 11:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.