Replacement for parallel plyr with doMC
Asked Answered
C

1

9

Consider a standard grouped operation on a data.frame:

library(plyr)
library(doMC)
library(MASS) # for example

nc <- 12
registerDoMC(nc)

d <- data.frame(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

res <- ddply(d, .(g), function(d_group) {
   # slow, complicated operations on d_group
}, .parallel = FALSE)

It's trivial to take advantage of a multi-core setup by simply writing .parallel = TRUE instead. This is one of my favorite features of plyr.

But with plyr being deprecated (I think) and essentially replaced by dplyr, purrr, etc., the solution to parallel processing has become significantly more verbose:

library(dplyr)
library(multidplyr)
library(parallel)
library(MASS) # for example

nc <- 12

d <- tibble(x = c("data", "more data"), g = c("group1", "group2"))
y <- "some global object"

cl <- create_cluster(nc)
set_default_cluster(cl)
cluster_library(cl, packages = c("MASS"))
cluster_copy(cl, obj = y)

d_parts <- d %>% partition(g, cluster = cl)
res <- d_parts %>% collect() %>% ungroup()

rm(d_parts)
rm(cl)

You can imagine how long this example could get considering each package and object you need inside the loop needs its own cluster_* command to copy it onto the nodes. The non-parallelized plyr-to-dplyr translation is just a simple dplyr::group_by construction and it's unfortunate that there's no terse way to enable parallel processing on it. So, my questions are:

  • Is this actually the preferred way to translate my code from plyr to dplyr?
  • What sort of magic is happening behind the scenes in plyr that makes it so easy to turn on parallel processing? Is there a reason this capability would be particularly difficult to add to dplyr and that's why it doesn't exist yet?
  • Are my two examples fundamentally different in terms of how the code is executed?
Coati answered 1/12, 2017 at 16:42 Comment(3)
Re your third question: I'd say yes. Your plyr example uses doMC, that is a multicore backend for foreach, that is: forking. Your multidplyr example uses create_cluster that defaults to parallel::makePSOCKcluster, that is : Parallel SOCKet Cluster.Thew
Re your second question: the same kind of magic that happens if you just call partition() without setting up a cluster in advance: plyr relies on a previously registered foreach backend (print(plyr:::setup_parallel))), multidplyr::partition() without a cluster relies on create_cluster() implicitly, but would probably detect another backend if one is already registered (I haven't checked, though, see print(multidplyr:::cluster_exists))). The first examples of the multidplyr vignette illustrate this capability of simply calling partition() without previous setup.Thew
Re your first question: as far as I can tell, from the doc and from my own experiments, multidplyr doesn't allow forking the way plyr does, only PSOCK.Thew
A
3
  1. I don't think there is one true 'prefered' way to translate {plyr} code to {dplyr}.

  2. In the comments @Aurèle did a better job than I ever could in describing the connection between {plyr} and {doMC}. One thing that happened is that the incentives changed a bit. {doMC} is from Revolution Analytics (since purchased by Microsoft). But Hadley, who developed dplyr, currently works at RStudio. These two companies compete in the IDE space. So, it is perhaps natural that their packages aren't designed to play well together. The only form of parallelism I've seen strong support for coming out of RStudio is {sparklyr}, which they've made relatively 'easy' to set up. But, I can't really recommend futzing with Spark to do parallel processing for a single machine.

  3. @Aurèle again did a good job of explaining the execution differences. Your new code uses a PSOCK cluster and the old code used forks. Forks use a copy on write mode for accessing RAM, so parallel processes can start off with access to the same data immediately post fork. PSOCK clusters are like spawning new copies of R - they have to load libraries and receive an explicit copy of the data.

You can use a pattern like...

library(dplyr)
library(purrr)
library(future)
plan(multicore)
options(mc.cores = availableCores())
d <- data.frame(x = 1:8, g = c("group1", "group2", "group3", "group4"))
y <- "some global object"


split(d, d$g) %>% 
  map(~ future({Sys.sleep(5);mean(.x$x)})) %>% 
  map_df(~value(.x))

... with some finesse on the map_df step to do some parallel processing. Note that under {purrr} the ~ is anonymous function syntax where .x is the values that have been mapped in.

If you like to live dangerously, you might be able to create a version of something similar without using {future} by using a private method in {purrr}

mcmap <- function(.x, .f, ...) {
  .f <- as_mapper(.f, ...)
  mclapply(.x, function(.x) {
    force(.f)
    .Call(purrr:::map_impl, environment(), ".x", ".f", "list")
  }) %>%
    map(~ .x[[1]])
}
Annalee answered 16/12, 2017 at 13:31 Comment(1)
Thanks for the explanation. I haven't tried the code yet, but purrr+future might be a nice solution.Coati

© 2022 - 2024 — McMap. All rights reserved.