R parallel: rbind parallely into separate data.frames
Asked Answered
S

2

6

The below code produces different results on Windows and Ubuntu platforms. I understand it is because of the different methods of handling parallel processing.

Summarizing:
I cannot insert / rbind data on Linux parallely (mclapply, mcmapply) while I can do it on Windows.

Thanks @Hong Ooi for pointing out that mclapply does not works on Windows parallely, yet below question is still valid.

Of course there are no multiple inserts to same data.frame, each insert is performed into separate data.frame.

library(R6)
library(parallel)

# storage objects generator
cl <- R6Class(
    classname = "cl",
    public = list(
        data = data.frame(NULL),
        initialize = function() invisible(self),
        insert = function(x) self$data <- rbind(self$data, x)
    )
)

N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))

# random data.frames
set.seed(1)
ldt <- lapply(i, function(i) data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))

# entity storage
lcl1 <- lapply(i, function(i) cl$new())
lcl2 <- lapply(i, function(i) cl$new())
lcl3 <- lapply(i, function(i) cl$new())

# insert data
invisible({
    mclapply(names(i), FUN = function(n) lcl1[[n]]$insert(ldt[[n]]))
    mcmapply(FUN = function(dt, cl) cl$insert(dt), ldt, lcl2, SIMPLIFY=FALSE)
    lapply(names(i), FUN = function(n) lcl3[[n]]$insert(ldt[[n]]))
})

### Windows

sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000

### Unix

sapply(lcl1, function(cl) nrow(cl$data)) # mclapply
#n1 n2 n3 n4
# 0  0  0  0
sapply(lcl2, function(cl) nrow(cl$data)) # mcmapply
#n1 n2 n3 n4
# 0  0  0  0
sapply(lcl3, function(cl) nrow(cl$data)) # lapply
#     n1     n2     n3     n4
# 100000 100000 100000 100000

And the question:

How can I achieve rbind parallely into separate data.frames on a Linux platform?

P.S. Off-memory storage like SQLite cannot be considered as solution in my case.

Stephanotis answered 22/5, 2015 at 12:7 Comment(12)
You do realise that mclapply doesn't actually run in parallel on Windows, right?Nodular
@HongOoi now I know. Thanks. I've rephrase question then.Stephanotis
One tip is to look at the data.table package. I know my tip doesn't answer your question directly, but still might help performance. The data.table package plays nicer with large data sets (GBs range) than base R's data.frames.Prebendary
@RichardErickson It was data.table, I translate to data.frame just for asking question ;)Stephanotis
To get the same behaviour on Linux as on Windows, use lapply. It won't run in parallel, but neither does mclapply on Windows.Nodular
@RichardErickson, Jan is one of the main developers of the data.table package and its different extensions...Bureaucratize
@DavidArenburg Surely not one of main devs, I've just pushed few small commits, you should try :)Stephanotis
@DavidArenburg I learn something new everyday. Thank you guys for developing the package!Prebendary
Ok, I meant Currently you are on of the main developersBureaucratize
@HongOoi see mclapply hack for windows.Stephanotis
@Stephanotis What about it? It's just using the parallel library, which has been part of base R for 4 years.Nodular
@HongOoi just linked FYI it is possible to use mclapply on windows, of course my example does not use it, but it is not the case now.Stephanotis
A
4

The problem is that mclapply and mcmapply are not intended to be used with functions that have side effects. Your function is modifying objects in lists, but mclapply doesn't send the modified objects back to the master process: it only returns the values explicitly returned by the function. That means your results are lost when the workers exit as mclapply returns.

Normally I would change the code to not depend on side effects, and return the objects that are modified. Here's one way to do that using clusterApply so that it also works in parallel on Windows:

library(R6)
library(parallel)
cl <- R6Class(
    classname = "cl",
    public = list(
        data = data.frame(NULL),
        initialize = function() invisible(self),
        insert = function(x) self$data <- rbind(self$data, x)))

N <- 4L # number of entities
i <- setNames(seq_len(N),paste0("n",seq_len(N)))
set.seed(1)
ldt <- lapply(i, function(i)
  data.frame(replicate(sample(3:10,1),sample(letters,1e5,rep=TRUE))))
nw <- 3  # number of workers
clust <- makePSOCKcluster(nw)
idx <- splitIndices(length(i), nw)
nameslist <- lapply(idx, function(iv) names(i)[iv])

lcl4 <- do.call('c', clusterApply(clust, nameslist, 
  function(nms, cl, ldt) {
    library(R6)
    lcl4 <- lapply(nms, function(n) cl$new())
    names(lcl4) <- nms
    lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
    lcl4
  }, cl, ldt))

This method doesn't work if you want to create the list of objects once and then modify the objects multiple times in parallel. That is also possible to do, but you must have persistent workers. In that case, you fetch the modified objects from the workers after all of the tasks are complete. Unfortunately, mclapply doesn't use persistent workers, so in this case you must use the cluster-based functions such as clusterApply. Here's one way to do it:

# Initialize the cluster workers
clusterEvalQ(clust, library(R6))
clusterExport(clust, c('cl', 'ldt'))
clusterApply(clust, nameslist, function(nms) {
  x <- lapply(nms, function(n) cl$new())
  names(x) <- nms
  assign('lcl4', x, pos=.GlobalEnv)
  NULL
})

# Insert data into lcl4 on each worker
clusterApply(clust, nameslist, function(nms) {
  lapply(nms, FUN = function(n) lcl4[[n]]$insert(ldt[[n]]))
  NULL
})

# Concatenate lcl4 from each worker
lcl4 <- do.call('c', clusterEvalQ(clust, lcl4))

This is very similar to the previous method, except that it splits the process into three phases: worker initialization, task execution, and result retrieval. I also initialized the workers in a more conventional manner using clusterExport and clusterEvalQ.

Amby answered 26/5, 2015 at 18:32 Comment(4)
I'm impressed of the parallel R understanding! I wonder if RRO can simplify the api?Stephanotis
Meaning "Revolution R Open"? I've never used it, so I don't know.Amby
@Stephanotis have you tried using data.table::rbindlist?Brause
@RomanLuštrik yes but the goal was to distribute this work over a cluster of machines. I am aware rbindlist is extremely fast.Stephanotis
A
0

I think that windows version of mclapply is working because it delegates its job to lapply. Checking timings or cpu core usage can verify this. According to R source, Windows mclapply and mcmapply are substituted with sequential versions.

It seems, something is wrong with how code is parallelized, can't see what exactly at the moment.

Acapulco answered 22/5, 2015 at 12:41 Comment(1)
mclapply on Linux uses the fork system call to make a clone of the current R process. This call doesn't exist on Windows, hence mclapply reverts back to sequential behaviour.Nodular

© 2022 - 2024 — McMap. All rights reserved.