foreach loop becomes inactive for large iterations in R
Asked Answered
I

3

5

I have an input csv file with 4500 rows. Each row has a unique ID and for each row, I have to read some data, do some calculation, and write the output in a csv file so that I have 4500 csv files written in my output directory. An individual output csv file contains a single row of data with 8 columns Since I have to perform the same calculation on each row of my input csv, I thought I can parallelise this task using foreach. Following is the overall structure of the logic

 library(doSNOW)
 library(foreach)
 library(data.table)
  
 input_csv <- fread('inputFile.csv')) 

 # to track the progres of the loop
 iterations <- nrow(input_csv)
 pb <- txtProgressBar(max = iterations, style = 3)
 progress <- function(n) setTxtProgressBar(pb, n)
 opts <- list(progress = progress)

 myClusters <- makeCluster(6)
 registerDoSNOW(myClusters)

 results <- 

     foreach(i = 1:nrow(input_csv), 
     .packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
     .errorhandling = 'remove',
     .options.snow = opts) %dopar% 
      
  {
        
       rowRef <- input_csv[i, ]
        
       # read data for the unique location in `rowRef`  
         weather.path <-  arrow(paste0(rowRef$locationID'_weather.parquet')))

       # do some calculations
        
       # save the results as csv
        fwrite(temp_result, file.path(paste0('output_iter_',i,'.csv')))
        
       return(temp_result)
 }
  

The above code works fine but always get stuck/inactive/does not do anything after finishing 25% or 30% of the rows in input_csv. I keep looking at my output directory that after N% of iterations, no file is being written. I suspect if the foreach loop goes into some sleep mode? What I find more confounding is that if I kill the job, re-run the above code, it does say 16% or 30% and then goes inactive again i.e. with each fresh run, it "sleeps" at different progress level.

I can't figure out how to give a minimal reproducible example in this case but thought if anyone knows of any checklist I should go through or potential issues that is causing this would be really helpful. Thanks

EDIT I am still struggling with this issue. If there is any more information I can provide, please let me know.

EDIT2
My original inputFile contains 213164 rows. So I split my the big file into 46 smaller files so that each file has 4634 rows

 library(foreach)
 library(data.table)
 library(doParallel)

myLs <- split(mydat, (as.numeric(rownames(mydat))-1) %/% 46))
 

Then I did this:

for(pr in 1:46){

    input_csv <- myLs[[pr]]

    myClusters <- parallel::makeCluster(6)
    doParallel::registerDoParallel(myClusters)


 results <- 

  foreach(i = 1:nrow(input_csv), 
 .packages = c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr"),
 .errorhandling = 'remove',
 .verbose = TRUE) %dopar% 

 {

   rowRef <- input_csv[i, ]

   # read data for the unique location in `rowRef`  
     weather.path <-  arrow(paste0(rowRef$locationID'_weather.parquet')))

   # do some calculations

   # save the results as csv
    fwrite(temp_result, file.path(paste0('output_iter_',i,'_',pr,'.csv')))
    gc()
 }

 parallel::stopCluster(myClusters)
 gc()
 }

This too works till say pr = 7 or pr = 8 iteration and then does not proceed and also does not generate any error message. I am so confused.

EDIT this is what my CPU usage looks like. I only used 4 cores to generate this image. Will anyone be able to explain if there's anything in this image that might address my question.

enter image description here

Intitule answered 28/7, 2020 at 18:23 Comment(12)
Seems like you're returning temp_result. Is it a memory issue?Courageous
yes I am returning temp_result. Is there any way I can check if it is indeed being caused by a memory issue since no error is generated. The script just stops at 25% or 30% or 10% and does not move. If I kill the job, still no error is generated.Intitule
You should open some sort of system monitor.Courageous
A couple months ago there was someone with issues exporting a ton of files and they also used fwrite(), but it looks like they deleted the question. If I remember correctly, it was faster for e.g., 50 files but slower for e.g., 500 files. I cannot rememebr the magnitude of the difference. All that to say, it may be worth trying to swap out fwrite() for readr::write_csv(). One other possibility, is that you can try to write the files in another step considering you save them all to resultsMorita
Okay. Thank you for your comment. I will read around the readr function and check if it helpsIntitule
I tried using readr but it also did not work. I have also edited my question slightly now to give some more informationIntitule
In the line before gc(), try rm(temp_result). Did you monitor the memory usage for your edited code? You don't need to do this in R, you can also use the system tools. I've also had it before that the garbage collector didn't really work for parallel processing, so it can be a bit trickyDextroamphetamine
I have added the system tools image to show the memory usageIntitule
@Intitule Can you share your machine setup info? sessionInfo() and environment variables for the parent environment and a parallel task? My sense is you want to try running your code outside of RStudio. Why? If the code is run inside RStudio, your system calls will pass through RStudio and be part of its memory footpad. Secondly, remember that you load the packages in each parallel task. Is your package myCustomPkg parallel safe?Bight
@Intitule Further to the above, my sense is that you may not be exporting a variable into each parallel instance. Does any of your code rely on a global or local environment variable that is not being passed into each parallel task? Hope the above helps.Bight
@Intitule One final observation my sense is you may need to look out how you lock() your writes within the parallel tasks. A bit tricky to tell without a reproducible example but that feels like something that could be causing problems especially in light of # do some calculations and # save the results as csv.Bight
You should track the evolution of your RAM. I have had a very similar issue with workers going to sleep as the task progressed. I found out they were actually killed as memory reached 100% until there were none left.Mule
T
5

You could use the progressr package to follow-up memory usage interactively.
For example with furrr package :

library(furrr)
library(pryr)
plan(multisession,workers=6)

library(progressr)
handlers("progress")

#input_csv <- fread('inputFile.csv')) 
#filesID <- as.list(1:nrow(input_csv))
filesID <- as.list(1:12)

with_progress({
  p <- progressor(along = filesID)
  result <- future_map(filesID, function(fileID) {
    #rowRef <- input_csv[fileID, ]
    
    # read data for the unique location in `rowRef`  
    #weather.path <-  arrow(paste0(rowRef$locationID'_weather.parquet')))
  
  # do some calculations : simulate memory increase
  temp_result <- rnorm(2e7)
  # save the results as csv
  #fwrite(temp_result, file.path(paste0('output_iter_',fileID,'.csv')))
  
  Sys.sleep(2)
  p(sprintf("memory used=%g", pryr::mem_used()))
  
  
  return(object.size(temp_result))
  },.options=future_options(packages=c("myCustomPkg","dplyr","arrow","zoo","data.table","rlist","stringr")))
})

[====================================================>-------]  90% memory used=6.75075e+08

The same method applies to foreach.

Another suggestion is not to return the results to the main process as you already store them in a file. Instead of return(temp_result) you could output a summary, for example object.size knowing that the complete results can be found in the associated file.

Teratogenic answered 3/8, 2020 at 19:36 Comment(8)
quick question: what is the purpose of Sys.sleep(3) in your code?Intitule
Just to have enough time to see the progress bar as my code doesn't process dataTeratogenic
@89_Simple, did this help to get more information about the reason for unexpected freeze? Is furrr OK or do you prefer to stay with foreach/doSNOW?Teratogenic
I am testing this at the moment. Sorry for this delay. I will get back to you with more information soonIntitule
I prefer doSNOW since this is what I am familiar with. I am trying to run your example since last night for my actual data which has around ~200000 rows and also I am writing out individual temp_result rather than returning as a list. This seems to take ages now.Intitule
Also while running your solution since now I am not returning any temp_result but rather writing it out, I see that memory is only 0-1% consumed but with time it grew to 32%. I could not understand this behaviourIntitule
Memory increase is an interesting information, and 32% is still encouraging : it could succeed this time, but would never have succeeded without summarizing results. Did you check if the expected number of cores is running? How far is the progress bar? Before launching a 200000 rows operation I would have made some statistics on a smaller subset (perhaps 1000 rows) to make sure all workers are up & running and to get average calculation time for one row in order to estimate complete calculation time.Teratogenic
You could also try to run gc() at the end of each calculation to check if garbage collection helps reducing memory increase.Teratogenic
C
4

From your code it is not entirely possible to see why it should stall. Maybe some parts of your foreach loop is not thread safe (data.table uses multible threads for subsetting for example)?

As it stands there's very little to change to help out, and @Waldi's answer is likely good at diagnosing the actual problem. The only thing that seems obvious to change here, is to avoid iterating over single rows of your data.frame by utilizing the under-the-hood functionality of foreach.

The way foreach performs parallel programming is by creating an iterator over the object. For parallel programming there will be some overhead between each iteration, as the thread/core will need to request new information. As such it is beneficial to minimize this overhead time, by minimizing the number of iterations. We can do this by splitting our dataset up into chunks or manually creating an iterator through the iterators package.
I don't have access to your data, so below is a reproducible example using the mtcars dataset. I've split it into a setup and foreach block for easier readability. Note that files in my example is a simple vector, so requires some minimal alteration for the actual code shown in the question as files within the foreach loop now becomes a data.frame rather than a vector.

Setup

library(iterators)
library(foreach)
library(data.table)
library(arrow)
library(doParallel)
# Set up reproducible example:
data(mtcars)
files <- replicate(100, tempfile())
lapply(files, function(x)write_parquet(mtcars, x))

# Split the files into chunks for the iterator
nc <- parallel::detectCores()
sfiles <- split(files, seq_len(length(files)) %% nc + 1)
# Set up backend
th <- parallel::makeCluster(nc)
registerDoParallel(th)

Foreach

foreach(files = sfiles, #Note the iterator will name each chunk 'files' within the loop. 
        .packages = c('data.table', 'arrow', 'dplyr'), 
        .combine = c, # Because I return the resulting file names
        .multicombine = TRUE) %dopar% {
  # Iterate over each chunk within foreach
  # Reduces loop overhead
  outF <- character(length(files))
  for(i in seq_along(files)){
    tib <- arrow::read_parquet(files[i])
    # Do some stuff
    tib <- tib %>% select(mpg, hp)
    # Save output
    outF[i] <- tempfile(fileext = '.csv')
    fwrite(tib, outF[i])
  }
  # Return list of output files
  return(outF)
}

Now I don't believe this will fix the issue, but it is something that can reduce your overhead slightly.

Crier answered 9/8, 2020 at 8:19 Comment(0)
L
1

You need to take your focus away from each file loop as that is not the issue. Te issue is with processing of content within a file. The issue is that when you are trying to create a file per row you are not committing the write after each row and therefore the whole process for one file and row by row gets stacked up in memory. You need to flush the memory as you write the file and close the connection.

Try to use apply as per below example if possible

For each row in an R dataframe

Try to close the connection to the file as it is written Reference below:

https://stat.ethz.ch/R-manual/R-devel/library/base/html/connections.html

Laveen answered 10/8, 2020 at 13:57 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.