foreach %dopar% + RPostgreSQL
Asked Answered
O

2

13

I am using RPostgreSQL to connect to a local database. The setup works just fine on my Linux machine. R 2.11.1, Postgres 8.4.

I was playing with the 'foreach' with the multicore (doMC) parallel backend to wrap some repetitive queries (numbering a few thousand) and appending the results into a data structure. Curiously enough, it works if I use %do% but fails when I switch to %dopar%, with the exception when there is only one iteration (as shown below)

I wondered whether it had something to do with a single connection object, so I created 10 connection objects and depending on what 'i' was, a certain con object was given for that query, depending on i modulo 10. (indicated below by just 2 connection objects). The expression which is evaluated eval(expr.01), contains/is the query which depends on what 'i' is.

I can't make sense of these particular error messages. I am wondering whether there is any way to make this work.

Thanks.
Vishal Belsare

R snippet follows:

> id.qed2.foreach <- foreach(i = 1588:1588, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
> id.qed2.foreach
[[1]]
  [1]   411   414  2140  2406  4490  4507  4519  4570  4571  4572  4703  4731
[109] 48765 84312 91797

> id.qed2.foreach <- foreach(i = 1588:1589, .inorder=FALSE) %dopar% { 
+ if (i %% 2 == 0) {con <- con0}; 
+ if (i %% 2 == 1) {con <- con1}; 
+ fetch(dbSendQuery(con,eval(expr.01)),n=-1)$idreuters};
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in stop(paste("expired", class(con))) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
> 

EDIT: I changed a few things, (still unsuccessful), but a few things come to light. Connection objects made in the loop and not 'disconnected' via dbDisconnect, lead to hanging connections as evident by the /var/log for Postgres. A few new error messages show up when I do this:

> system.time(
+ id.qed2.foreach <- foreach(i = 1588:1590, .inorder=FALSE, 
.packages=c("DBI", "RPostgreSQL")) %dopar% {drv0 <- dbDriver("PostgreSQL"); 
con0 <- dbConnect(drv0, dbname='nseindia');
list(idreuters=fetch(dbSendQuery(con0,eval(expr.01)),n=-1)$idreuters);
dbDisconnect(con0)})
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in postgresqlExecStatement(conn, statement, ...) : 
  no function to return from, jumping to top level
Error in { : 
  task 1 failed - "error in evaluating the argument 'res' in selecting a method for function 'fetch'"
Overplus answered 11/10, 2010 at 0:19 Comment(1)
If you solved it, please post your solution as an answer below and mark it accepted. It'll be useful for future reference.Billings
O
2

The following works and speeds up by ~ 1.5x over a sequential form. As a next step, I am wondering whether it is possible to attach a connection object to each of the workers spawned by registerDoMC. If so, then there would be no need to create/destroy the connection objects, which prevents from overwhelming the PostgreSQL server with connections.

pgparquery <- function(i) {
drv <- dbDriver("PostgreSQL"); 
con <- dbConnect(drv, dbname='nsdq'); 
lst <- eval(expr.01); #contains the SQL query which depends on 'i'
qry <- dbSendQuery(con,lst);
tmp <- fetch(qry,n=-1);
dt <- dates.qed2[i]
dbDisconnect(con);
result <- list(date=dt, idreuters=tmp$idreuters)
return(result)}

id.qed.foreach <- foreach(i = 1588:3638, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {pgparquery(i)}

--
Vishal Belsare

Overplus answered 11/10, 2010 at 15:26 Comment(2)
Vishal, were you ever able to figure out how to attach a persistent connection object to each worker as you describe above? I've also had this problem. Interesting that I was able to get dbListTables(conn) to work on the workers but dbgetQuery(conn, ...)-type commands don't seem to work for an attached connection object.Congregationalist
@Congregationalist You could try using the technique that I describe in my answer.Anzus
A
23

It's more efficient to create the database connection once per worker, rather than once per task. Unfortunately, mclapply doesn't provide a mechanism for initializing the workers before executing tasks, so it's not easy to do this using the doMC backend, but if you use the doParallel backend, you can initialize the workers using clusterEvalQ. Here's an example of how to restructure the code:

library(doParallel)
cl <- makePSOCKcluster(detectCores())
registerDoParallel(cl)

clusterEvalQ(cl, {
  library(DBI)
  library(RPostgreSQL)
  drv <- dbDriver("PostgreSQL")
  con <- dbConnect(drv, dbname="nsdq")
  NULL
})

id.qed.foreach <- foreach(i=1588:3638, .inorder=FALSE,
                          .noexport="con",
                          .packages=c("DBI", "RPostgreSQL")) %dopar% {
  lst <- eval(expr.01)  #contains the SQL query which depends on 'i'
  qry <- dbSendQuery(con, lst)
  tmp <- fetch(qry, n=-1)
  dt <- dates.qed2[i]
  list(date=dt, idreuters=tmp$idreuters)
}

clusterEvalQ(cl, {
  dbDisconnect(con)
})

Since doParallel and clusterEvalQ are using the same cluster object cl, the foreach loop will have access to the database connection object con when executing the tasks.

Anzus answered 8/7, 2014 at 14:14 Comment(6)
Steve, this is helpful. Wonder how you got to this rather old question after all this time!Overplus
Is there any way to use foreach to pass the database setup code to each worker? I'd like to try to use this code with an arbitrary parallel backend.Ochone
@Ochone There isn't a backend independent way to initialize the workers, and that's always bugged me. I've experimented with backend-specific mechanisms (in doMPI, for instance), but that's never turned into a general feature, partly because the implementation with doMC would probably be very ugly since mclapply doesn't support worker initialization.Anzus
@SteveWeston it's been a while since you posted this answer, but I'm hoping you'll still see this, as it's the closest one I've found to the issue I'm having preconfiguring connections (and hopefully prepared statements) in each worker. I tried your code, modified to use R package "odbc" instead of PostgreSQL for the database. I get 'Error in { : task 1 failed - "external pointer is not valid"' All I'm doing is a simple select count(*) from MyTable in the foreach loop. Any idea what's wrong?Plutonian
@Plutonian If you defined "con" in the master's global environment (which my example does not), then it would be auto-exported to the workers and cause errors. I added the ".noexport" option to my example to prevent that, even though it isn't an issue in my case. I suggest that you use the foreach .verbose=TRUE option to see if anything unexpected is auto-exported, since your error message suggests that something is auto-exported and used that shouldn't be.Anzus
I have a question, in clusterEvalQ, why put a NULL in the end? Can we just put an invisible() in the end? Thanks.Bayle
O
2

The following works and speeds up by ~ 1.5x over a sequential form. As a next step, I am wondering whether it is possible to attach a connection object to each of the workers spawned by registerDoMC. If so, then there would be no need to create/destroy the connection objects, which prevents from overwhelming the PostgreSQL server with connections.

pgparquery <- function(i) {
drv <- dbDriver("PostgreSQL"); 
con <- dbConnect(drv, dbname='nsdq'); 
lst <- eval(expr.01); #contains the SQL query which depends on 'i'
qry <- dbSendQuery(con,lst);
tmp <- fetch(qry,n=-1);
dt <- dates.qed2[i]
dbDisconnect(con);
result <- list(date=dt, idreuters=tmp$idreuters)
return(result)}

id.qed.foreach <- foreach(i = 1588:3638, .inorder=FALSE, .packages=c("DBI", "RPostgreSQL")) %dopar% {pgparquery(i)}

--
Vishal Belsare

Overplus answered 11/10, 2010 at 15:26 Comment(2)
Vishal, were you ever able to figure out how to attach a persistent connection object to each worker as you describe above? I've also had this problem. Interesting that I was able to get dbListTables(conn) to work on the workers but dbgetQuery(conn, ...)-type commands don't seem to work for an attached connection object.Congregationalist
@Congregationalist You could try using the technique that I describe in my answer.Anzus

© 2022 - 2024 — McMap. All rights reserved.