Update - Feb 20, 2023
You can achieve this using the parabar
package. Disclaimer: I am the author of the package.
You can use the package in an interactive R
session as follows.
# Load the package.
library(parabar)
# Define a task to run in parallel.
task <- function(x) {
# Sleep a bit.
Sys.sleep(0.01)
# Return the result of a computation.
return(x + 1)
}
# Start a backend that supports progress tracking (i.e., `async`).
backend <- start_backend(cores = 4, cluster_type = "psock", backend_type = "async")
# Configure the bar if necessary, or change the bar engine.
configure_bar(
format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)
# Run the task.
results <- par_sapply(backend, 1:1000, task)
# Update the progress bar options.
configure_bar(
format = "[:bar] :percent"
)
# Run the task again.
results <- par_sapply(backend, 1:1000, task)
# Stop the backend.
stop_backend(backend)
If you need more flexibility (e.g., when building an R
package). There is
also a lower-level API based on R6
classes.
# Create a specification object.
specification <- Specification$new()
# Set the number of cores.
specification$set_cores(cores = 4)
# Set the cluster type.
specification$set_type(type = "psock")
# Create a progress tracking context.
context <- ProgressDecorator$new()
# Get a backend that supports progress-tracking.
backend <- AsyncBackend$new()
# Register the backend with the context.
context$set_backend(backend)
# Start the backend.
context$start(specification)
# Get a modern bar instance.
bar <- ModernBar$new()
# Register the bar with the context.
context$set_bar(bar)
# Configure the bar.
context$configure_bar(
show_after = 0,
format = " > completed :current out of :total tasks [:percent] [:elapsed]"
)
# Run a task in parallel (i.e., approx. 3.125 seconds).
context$sapply(x = 1:1000, fun = task)
# Get the task output.
output <- backend$get_output()
# Close the backend.
context$stop()
Here is a proposed workflow, which perhaps fits Steve Weston's characterization
as a valiant effort. Yet, with some overhead, it accomplishes what I am
primarily interested in, i.e., (1) a cross-platform solution, (2) not tempering
with low-level parallel
implementation details, and (3) being parsimonious
concerning the dependencies used.
In a nutshell, the code below does the following:
- The function
prepare_file_for_logging
creates a temporary file (i.e.,
OS
-specific location) that will be used later on to report and track the
progress of the parallel task execution.
- The function
par_sapply_with_progress
starts an R
session in the
background (i.e., without blocking the main session).
- In this background session, it sets up a cluster (i.e., it can be either
PSOCK
or FORK
) and runs the task in parallel via the function
parallel::parSapply
.
- During each task run, the workers (i.e., cluster nodes) report the
progress to the temporary file (i.e., strictly in the form of a new line
to avoid race conditions).
- Back in the main process, the function
track_progress
monitors the temporary
file and displays and updates a progress bar based on its contents.
- The main process remains blocked until the progress bar is completed and
the background process has been terminated.
The libraries used are parallel
and callr
, and some other functions from
base
and utils
. For the sake of clarity, the code below is explicitly
commented.
Usage
# Load libraries.
library(parallel)
library(callr)
# How many times to run?
runs <- 40
# Prepare file for logging the progress.
file_name <- prepare_file_for_logging()
# Run the task in parallel without blocking the main process.
process <- par_sapply_with_progress(
# Cluster specifications.
cores = 4,
type = "PSOCK",
# Where to write the progress.
file_name = file_name,
# Task specifications (i.e., just like in `parallel::parSapply`).
x = 1:runs,
fun = function(x, y) {
# Wait a little.
Sys.sleep(0.5)
# Do something useful.
return(x * y)
},
args = list(
y = 10
)
)
# Monitor the progress (i.e., blocking the main process until completion).
track_progress(
process = process,
iterations = runs,
file_name = file_name,
cleanup = TRUE
)
# Show the results.
print(process$get_result())
# |=====================================================================| 100%
# [1] 10 20 30 40 50 60 70 80 90 100 110 120 130 140 150 160 170 180
# [19] 190 200 210 220 230 240 250 260 270 280 290 300 310 320 330 340 350 360
# [37] 370 380 390 400
Implementation
The function for preparing a temporary file
# Create and get temporary file name.
prepare_file_for_logging <- function(file_name) {
# If the file name is missing.
if(missing(file_name)) {
# Get a temporary file name (i.e., OS specific).
file_name <- tempfile()
}
# Create the actual file to avoid race conditions.
file_created <- file.create(file_name)
# Indicate if something went wrong creating the file.
stopifnot("Failed to create file." = file_created)
return(file_name)
}
The function for running the task in parallel
# Run task in parallel and log the progress.
par_sapply_with_progress <- function(cores, type, file_name, x, fun, args) {
# Decorate the task function to enable progress tracking.
get_decorated_task <- function(task) {
# Evaluate symbol.
force(task)
# Create wrapper.
return(function(x, file_name, ...) {
# Update the progress on exit.
on.exit({
# Write processed element to file.
cat("\n", file = file_name, append = TRUE)
})
return(task(x, ...))
})
}
# Get the decorated task.
fun_decorated <- get_decorated_task(fun)
# Start a background process.
background_process <- callr::r_bg(function(cores, type, file_name, x, fun_decorated, args) {
# Make cluster.
cluster <- parallel::makeCluster(cores, type = type)
# Close the cluster on exit.
on.exit({
# Stop the cluster.
parallel::stopCluster(cluster)
})
# Output.
output <- do.call(parallel::parSapply, c(
list(cluster, x, fun_decorated, file_name), args
))
# Return the results to the background process.
return(output)
}, args = list(cores, type, file_name, x, fun_decorated, args))
# Return the background process `R6` object.
return(background_process)
}
The function for tracking the progress
# Track progress and keep the main process busy.
track_progress <- function(process, iterations, file_name, cleanup = TRUE) {
if (cleanup) {
on.exit({
# Remove the file (i.e., just in case).
unlink(file_name)
})
}
# Create a progress bar.
bar <- txtProgressBar(min = 0, max = iterations, initial = NA, style = 3)
# Record the number of processed iterations (i.e., runs).
n_tasks_processed <- 0
# While the process is alive.
while(n_tasks_processed < iterations) {
# Get the number of tasks processed.
n_tasks_processed <- length(scan(file_name, blank.lines.skip = FALSE, quiet = TRUE))
# If the process that started the workers is finished.
if(!process$is_alive()) {
# Indicate that all tasks have been processed.
n_tasks_processed <- iterations
}
# Update the progress bar.
setTxtProgressBar(bar, n_tasks_processed)
}
# Close the progress bar.
close(bar)
# Wait for the process to close.
process$wait()
}
Things to consider
Concerning logging and reading the progress from the temporary file, there are
two things I can think of for reducing the overhead:
- First, one can consider reducing the granularity of the reporting (i.e.,
perhaps it is not necessary to log the progress after each task run, but, say,
every fifth or so).
- Second, one may also consider reducing the update frequency of the progress
bar. Currently, the
track_progress
continuously scans the temporary file and
updates the progress bar. However, this is likely not necessary. Perhaps a
better way is to set a timeout between subsequent file scans and progress bar
updates.
Finally, I personally prefer opening a cluster once and reusing it across
different parts of my code. In this scenario, I would switch from callr::r_bg
(i.e., short-lived background R
process) to callr::r_session
(i.e.,
permanent R
session) for more control (i.e., also see this question).
I hope this helps others that have struggled with this issue as well.
parabar
package. It is vaguely based on my answer below. Disclaimer: I am the author of the package. – Tafilelt