parSapply and progress bar
Asked Answered
S

2

3

I am using the function parSapply to run a simulation on the parallel environment. Here is my code:

runpar <- function(i) MonteCarloKfun(i=i)

# Detect number of cores available
ncores <- detectCores(logical=TRUE)

# Set up parallel environment
cl <- makeCluster(ncores, methods=FALSE)

# Export objects to parallel environment
clusterSetRNGStream(cl,1234567) # not necessary since we do not sample
clusterExport(cl, c("kfunctions","frq","dvec","case","control","polygon", "MonteCarloKfun", "khat", 
                    "as.points", "secal"))

# For 1 parameter use parSapply
outpar <- parSapply(cl,i,runpar)

# close parallel environment
stopCluster(cl)

Does anyone know if there is a possibility to add a progress bar to the parSapply function. Ideally I would like something similar to pbapply of the pbapply library.

Sain answered 5/7, 2016 at 9:20 Comment(2)
finzi.psych.upenn.edu/library/surveillance/html/plapply.htmlAshleyashli
See the parabar package. It is vaguely based on my answer below. Disclaimer: I am the author of the package.Tafilelt
H
2

The parSapply function doesn't support a progress bar, and I don't think there is any really good way to implement one by adding extra code to the task function, although people have made valiant efforts to do that.

The doSNOW package supports progress bars, so you could either use that directly or write a wrapper function that works like the parSapply function. Here's one way to write such a wrapper function:

# This function is similar to "parSapply", but doesn't preschedule
# tasks and doesn't support "simplify" and "USE.NAMES" options
pbSapply <- function(cl, X, FUN, ...) {
  registerDoSNOW(cl)
  pb <- txtProgressBar(max=length(X))
  on.exit(close(pb))
  progress <- function(n) setTxtProgressBar(pb, n)
  opts <- list(progress=progress)
  foreach(i=X, .combine='c', .options.snow=opts) %dopar% {
    FUN(i, ...)
  }
}

You can easily modify this function to use either the tkProgressBar or winProgressBar function.

Here's an example use of pbSapply:

library(doSNOW)
cl <- makeSOCKcluster(3)
x <- pbSapply(cl, 1:100, function(i, j) {Sys.sleep(1); i + j}, 100)

Note that this doesn't use prescheduling, so the performance won't be as good as parSapply if you have small tasks.

Hardspun answered 5/7, 2016 at 17:49 Comment(0)
T
2

Update - Feb 20, 2023

You can achieve this using the parabar package. Disclaimer: I am the author of the package.

You can use the package in an interactive R session as follows.

# Load the package.
library(parabar)

# Define a task to run in parallel.
task <- function(x) {
    # Sleep a bit.
    Sys.sleep(0.01)

    # Return the result of a computation.
    return(x + 1)
}

# Start a backend that supports progress tracking (i.e., `async`).
backend <- start_backend(cores = 4, cluster_type = "psock", backend_type = "async")

# Configure the bar if necessary, or change the bar engine.
configure_bar(
    format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)

# Run the task.
results <- par_sapply(backend, 1:1000, task)

# Update the progress bar options.
configure_bar(
    format = "[:bar] :percent"
)

# Run the task again.
results <- par_sapply(backend, 1:1000, task)

# Stop the backend.
stop_backend(backend)

If you need more flexibility (e.g., when building an R package). There is also a lower-level API based on R6 classes.

# Create a specification object.
specification <- Specification$new()

# Set the number of cores.
specification$set_cores(cores = 4)

# Set the cluster type.
specification$set_type(type = "psock")

# Create a progress tracking context.
context <- ProgressDecorator$new()

# Get a backend that supports progress-tracking.
backend <- AsyncBackend$new()

# Register the backend with the context.
context$set_backend(backend)

# Start the backend.
context$start(specification)

# Get a modern bar instance.
bar <- ModernBar$new()

# Register the bar with the context.
context$set_bar(bar)

# Configure the bar.
context$configure_bar(
    show_after = 0,
    format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)

# Run a task in parallel (i.e., approx. 3.125 seconds).
context$sapply(x = 1:1000, fun = task)

# Get the task output.
output <- backend$get_output()

# Close the backend.
context$stop()

Here is a proposed workflow, which perhaps fits Steve Weston's characterization as a valiant effort. Yet, with some overhead, it accomplishes what I am primarily interested in, i.e., (1) a cross-platform solution, (2) not tempering with low-level parallel implementation details, and (3) being parsimonious concerning the dependencies used.

In a nutshell, the code below does the following:

  • The function prepare_file_for_logging creates a temporary file (i.e., OS-specific location) that will be used later on to report and track the progress of the parallel task execution.
  • The function par_sapply_with_progress starts an R session in the background (i.e., without blocking the main session).
    • In this background session, it sets up a cluster (i.e., it can be either PSOCK or FORK) and runs the task in parallel via the function parallel::parSapply.
    • During each task run, the workers (i.e., cluster nodes) report the progress to the temporary file (i.e., strictly in the form of a new line to avoid race conditions).
  • Back in the main process, the function track_progress monitors the temporary file and displays and updates a progress bar based on its contents.
    • The main process remains blocked until the progress bar is completed and the background process has been terminated.

The libraries used are parallel and callr, and some other functions from base and utils. For the sake of clarity, the code below is explicitly commented.

Usage

# Load libraries.
library(parallel)
library(callr)

# How many times to run?
runs <- 40

# Prepare file for logging the progress.
file_name <- prepare_file_for_logging()

# Run the task in parallel without blocking the main process.
process <- par_sapply_with_progress(
    # Cluster specifications.
    cores = 4,
    type = "PSOCK",

    # Where to write the progress.
    file_name = file_name,

    # Task specifications (i.e., just like in `parallel::parSapply`).
    x = 1:runs,
    fun = function(x, y) {
        # Wait a little.
        Sys.sleep(0.5)

        # Do something useful.
        return(x * y)
    },
    args = list(
        y = 10
    )
)

# Monitor the progress (i.e., blocking the main process until completion).
track_progress(
    process = process,
    iterations = runs,
    file_name = file_name,
    cleanup = TRUE
)

# Show the results.
print(process$get_result())

# |=====================================================================| 100%
#  [1]  10  20  30  40  50  60  70  80  90 100 110 120 130 140 150 160 170 180
# [19] 190 200 210 220 230 240 250 260 270 280 290 300 310 320 330 340 350 360
# [37] 370 380 390 400

Implementation

The function for preparing a temporary file

# Create and get temporary file name.
prepare_file_for_logging <- function(file_name) {
    # If the file name is missing.
    if(missing(file_name)) {
        # Get a temporary file name (i.e., OS specific).
        file_name <- tempfile()
    }

    # Create the actual file to avoid race conditions.
    file_created <- file.create(file_name)

    # Indicate if something went wrong creating the file.
    stopifnot("Failed to create file." = file_created)

    return(file_name)
}

The function for running the task in parallel

# Run task in parallel and log the progress.
par_sapply_with_progress <- function(cores, type, file_name, x, fun, args) {
    # Decorate the task function to enable progress tracking.
    get_decorated_task <- function(task) {
        # Evaluate symbol.
        force(task)

        # Create wrapper.
        return(function(x, file_name, ...) {
            # Update the progress on exit.
            on.exit({
                # Write processed element to file.
                cat("\n", file = file_name, append = TRUE)
            })

            return(task(x, ...))
        })
    }

    # Get the decorated task.
    fun_decorated <- get_decorated_task(fun)

    # Start a background process.
    background_process <- callr::r_bg(function(cores, type, file_name, x, fun_decorated, args) {
        # Make cluster.
        cluster <- parallel::makeCluster(cores, type = type)

        # Close the cluster on exit.
        on.exit({
            # Stop the cluster.
            parallel::stopCluster(cluster)
        })

        # Output.
        output <- do.call(parallel::parSapply, c(
            list(cluster, x, fun_decorated, file_name), args
        ))

        # Return the results to the background process.
        return(output)
    }, args = list(cores, type, file_name, x, fun_decorated, args))

    # Return the background process `R6` object.
    return(background_process)
}

The function for tracking the progress

# Track progress and keep the main process busy.
track_progress <- function(process, iterations, file_name, cleanup = TRUE) {
    if (cleanup) {
        on.exit({
            # Remove the file (i.e., just in case).
            unlink(file_name)
       })
    }

    # Create a progress bar.
    bar <- txtProgressBar(min = 0, max = iterations, initial = NA, style = 3)

    # Record the number of processed iterations (i.e., runs).
    n_tasks_processed <- 0

    # While the process is alive.
    while(n_tasks_processed < iterations) {
        # Get the number of tasks processed.
        n_tasks_processed <- length(scan(file_name, blank.lines.skip = FALSE, quiet = TRUE))

        # If the process that started the workers is finished.
        if(!process$is_alive()) {
            # Indicate that all tasks have been processed.
            n_tasks_processed <- iterations
        }

        # Update the progress bar.
        setTxtProgressBar(bar, n_tasks_processed)
    }

    # Close the progress bar.
    close(bar)

    # Wait for the process to close.
    process$wait()
}

Things to consider

Concerning logging and reading the progress from the temporary file, there are two things I can think of for reducing the overhead:

  • First, one can consider reducing the granularity of the reporting (i.e., perhaps it is not necessary to log the progress after each task run, but, say, every fifth or so).
  • Second, one may also consider reducing the update frequency of the progress bar. Currently, the track_progress continuously scans the temporary file and updates the progress bar. However, this is likely not necessary. Perhaps a better way is to set a timeout between subsequent file scans and progress bar updates.

Finally, I personally prefer opening a cluster once and reusing it across different parts of my code. In this scenario, I would switch from callr::r_bg (i.e., short-lived background R process) to callr::r_session (i.e., permanent R session) for more control (i.e., also see this question).

I hope this helps others that have struggled with this issue as well.

Tafilelt answered 3/10, 2022 at 20:44 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.