R - How to replicate rows in a spark dataframe using sparklyr
Asked Answered
T

3

5

Is there a way to replicate the rows of a Spark's dataframe using the functions of sparklyr/dplyr?

sc <- spark_connect(master = "spark://####:7077")

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")

This is the desired output, saved into a new spark tbl:

> df2_tbl
   row1  row2
  <int> <chr>
1     1     A
2     1     A
3     1     A
4     2     B
5     2     B
6     2     B
7     3     C
8     3     C
9     3     C
Turku answered 13/6, 2017 at 20:5 Comment(0)
T
6

With sparklyr you can use array and explode as suggested by @Oli:

df_tbl %>% 
  mutate(arr = explode(array(1, 1, 1))) %>% 
  select(-arr)

# # Source:   lazy query [?? x 2]
# # Database: spark_connection
#    row1 row2 
#   <int> <chr>
# 1     1 A    
# 2     1 A    
# 3     1 A    
# 4     2 B    
# 5     2 B    
# 6     2 B    
# 7     3 C    
# 8     3 C    
# 9     3 C    

and generalized

library(rlang)

df_tbl %>%  
  mutate(arr = !!rlang::parse_quo(
    paste("explode(array(", paste(rep(1, 3), collapse = ","), "))")
  )) %>% select(-arr)

# # Source:   lazy query [?? x 2]
# # Database: spark_connection
#    row1 row2 
#   <int> <chr>
# 1     1 A    
# 2     1 A    
# 3     1 A    
# 4     2 B    
# 5     2 B    
# 6     2 B    
# 7     3 C    
# 8     3 C    
# 9     3 C   

where you can easily adjust number of rows.

Trauma answered 21/1, 2018 at 18:0 Comment(1)
BTW for newer versions of rlang, the environment is necessary to pass into parse_quo, and so you'd go parse_quo(paste(...), env = sc) where sc is the Spark context :)Randers
H
1

The idea that comes to mind first is to use the explode function (it is exactly what it is meant for in Spark). Yet arrays do not seem to be supported in SparkR (to the best of my knowledge).

> structField("a", "array")
Error in checkType(type) : Unsupported type for SparkDataframe: array

I can however propose two other methods:

  1. A straightforward but not very elegant one:

    head(rbind(df, df, df), n=30)
    #    row1 row2
    # 1    1    A
    # 2    2    B
    # 3    3    C
    # 4    1    A
    # 5    2    B
    # 6    3    C
    # 7    1    A
    # 8    2    B
    # 9    3    C
    

    Or with a for loop for more genericity:

    df2 = df
    for(i in 1:2) df2=rbind(df, df2)
    

    Note that this would also work with union.

  2. The second, more elegant method (because it only implies one spark operation) is based on a cross join (Cartesian product) with a dataframe of size 3 (or any other number):

    j <- as.DataFrame(data.frame(s=1:3))
    head(drop(crossJoin(df, j), "s"), n=100)
    #    row1 row2
    # 1    1    A
    # 2    1    A
    # 3    1    A
    # 4    2    B
    # 5    2    B
    # 6    2    B
    # 7    3    C
    # 8    3    C
    # 9    3    C
    
Hypostasis answered 20/1, 2018 at 23:18 Comment(1)
It should be array<type> not array for example structField("a", "array<string>") .Trauma
I
0

I am not aware of a cluster side version of R's repfunction. We can however use a join to emulate it cluster side.

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df")

replyr <- function(data, n, sc){
  joiner_frame <- copy_to(sc, data.frame(joiner_index = rep(1,n)), "tmp_joining_frame", overwrite = TRUE)

  data %>%
    mutate(joiner_index = 1) %>%
    left_join(joiner_frame) %>%
    select(-joiner_index)

}

df_tbl2 <- replyr(df_tbl, 3, sc)
#    row1 row2 
#    <int> <chr>
# 1     1 A    
# 2     1 A    
# 3     1 A    
# 4     2 B    
# 5     2 B    
# 6     2 B    
# 7     3 C    
# 8     3 C    
# 9     3 C  

It gets the job done, but it is a bit dirty since the tmp_joining_frame will persist. I'm not sure how well this will work given lazy evaluation on multiple calls to the function.

Invar answered 20/1, 2018 at 23:27 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.