I had quite a few issues with parallel EC2 setup too when trying to keep the master node local. Using StarCluster to setup the pool helped greatly, but the real improvement came with using StarCluster and having the master node within the EC2 private ip pool.
StarCluster sets up all of the key handling for all the nodes as well as any mounts used. Dynamic node allocation wasn't doable, but unless spot instances are going to be used long term and your bidding strategy doesn't 'keep' your instances then dynamic allocation should be an issue.
Some other lessons learned:
- Create a variable containing the private IPs to pass to createCluster and export it, so when you have need to restart with the same nodes it is easier.
- Have the master node run byobu and set it up for R session logging.
- Running RStudio server on the master can be very helpful at times, but should be a different AMI than the slave nodes. :)
- Have the control script offload data rda files to a path that is remotely monitored for new files and automatically download them.
- Use htop to monitor the slaves so you can easily see the instances and determine script requirements (memory/cpu/scalability).
- Make use of processor hyper-threading enable/disable scripts.
I had quite a bit of an issue with the slave connections and serialize/unserialize and found that one of the things was the connection limit, and that the connection limit needed to be reduced by the number of nodes; and when the control script was stopped the easiest method of cleanup was restarting the master R session, and using a script to kill the slave processes instead of waiting for timeout.
It did take a bit of work to setup, but hopefully these thoughts help...
Although it was 8 months ago and both StarCluster and R have changed here's some of how it was setup... You'll find 90% of this in the StarCluster docs.
- Setup .starcluster/config AWS and key-pair sections based on the seurity info from AWS console.
- Define the [smallcluster]
- key-name
- availability-zone
- Define a cluster template extending [smallcluster]. Using AMIs based on the StarCluster 64bit HVM AMI. Instead of creating new public AMI instances I just saved a configured instance (with all the tools I needed) and used that as the AMI.
Here's an example of one...
[cluster Rnodes2]
EXTENDS=smallcluster
MASTER_INSTANCE_TYPE = cc1.4xlarge
MASTER_IMAGE_ID= ami-7621f91f
NODE_INSTANCE_TYPE = cc2.8xlarge
NODE_IMAGE_ID= ami-7621f91f
CLUSTER_SIZE= 8
VOLUMES= rdata
PLUGINS= pkginstaller
SPOT_BID= 1.00
- Setup the shared volume, this is where the screen/byoubu logs, the main .R script checkpoint output, shared R data, and the source for the production package live. It was monitored for new files in a child path called export so if the cluster or control script died/abended a max number of records would all that would be lost and need to be re-calculated.
After creating the shared volume, the definition was simply:
[volume rdata]
VOLUME_ID = vol-1145497c
MOUNT_PATH = /rdata
The package installer which ensured the latest (and equal) R versions on all nodes.
[plugin pkginstaller]
setup_class = starcluster.plugins.pkginstaller.PackageInstaller
packages = r-base, r-base-dev, r-recommended
Lastly, access permissions for both ssh and RStudio server. Https via proxy would be safer, but since RStudio was only used for the control script setup...
[permission ssh]
# protocol can be: tcp, udp, or icmp
protocol = tcp
from_port = 22
to_port = 22
# [permission http]
protocol = tcp
from_port = 8787
to_port = 8787
Then startup a cluster using the StarCluster interface. It handles all of the access controls, system names, shares, etc... Once the cluster was running I ran an ssh session into each from my local system, and ran a script to stop hyper-threading:
#!/bin/sh
# disable hyperthreading
for cpunum in $(
cat /sys/devices/system/cpu/cpu*/topology/thread_siblings_list |
cut -s -d, -f2- | tr ',' '\n' | sort -un); do
echo 0 > /sys/devices/system/cpu/cpu$cpunum/online
done
then started an htop session on each for monitoring scalability against the exported checkpoint logs.
Then, logged into the master, started a screen session (I've since preferred byobu) and fired up R from within the StarCluster mounted volume. That way when the cluster stopped for some reason I could easily setup again just by starting R. Once in R the first thing was to create a workers.list
variable using the nodeXXX
names, which was simply something along the lines of:
cluster.nodes <- c("localhost", paste("node00", 1:7, sep='' ) )
workers.list <- rep( cluster.nodes, 8 )
Then I loaded up the control script, quit and saved the workspace. The control script handled all of the table output for exporting and checkpoints and par wrapped calls to the production package. The main function of the script also took a cpus
argument which is where the workers list was placed, which was then passed as cores
to the cluster initializer.
initialize.cluster <- function( cores )
{
if( exists( 'cl' ) ) stopCluster( cl )
print("Creating Cluster")
cl <- makePSOCKcluster( cores )
print("Cluster created.")
assign( 'cl', cl, envir=.GlobalEnv )
print( cl )
# All workers need to have the bounds generator functions...
clusterEvalQ( cl, require('scoreTarget') )
# All workers need to have the production script and package.
clusterExport( cl, varlist=list('RScoreTarget', 'scoreTarget'))
return ( cl )
}
Once the R session was restarted (after initially creating the worker.list) the control script was sourced, and the main func called. That was it. With this setup, if the cluster ever stopped, I'd just quit the rsession on the main host; stop the slave processes via htop on each of the slaves and startup again.
Here's an example of it in action::
R
R version 2.15.0 (2012-03-30)
Copyright (C) 2012 The R Foundation for Statistical Computing
ISBN 3-900051-07-0
Platform: x86_64-pc-linux-gnu (64-bit)
R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.
Natural language support but running in an English locale
R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.
Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.
[Previously saved workspace restored]
> source('/rdata/buildSatisfactionRangeTable.R')
Loading required package: data.table
data.table 1.7.7 For help type: help("data.table")
Loading required package: parallel
Loading required package: scoreTarget
Loading required package: Rcpp
> ls()
[1] "build.satisfaction.range.table" "initialize.cluster"
[3] "initialize.table" "parallel.choices.threshold"
[5] "rolled.lower" "rolled.upper"
[7] "RScoreTarget" "satisfaction.range.table"
[9] "satisfaction.search.targets" "search.range.bound.offsets"
[11] "search.range.bounds" "search.range.center"
[13] "Search.Satisfaction.Range" "update.bound.offset"
[15] "workers.list"
> workers.list
[1] "localhost" "localhost" "localhost" "localhost" "localhost" "localhost"
[7] "localhost" "localhost" "node001" "node002" "node003" "node004"
[13] "node005" "node006" "node007" "node001" "node002" "node003"
[19] "node004" "node005" "node006" "node007" "node001" "node002"
[25] "node003" "node004" "node005" "node006" "node007" "node001"
[31] "node002" "node003" "node004" "node005" "node006" "node007"
[37] "node001" "node002" "node003" "node004" "node005" "node006"
[43] "node007" "node001" "node002" "node003" "node004" "node005"
[49] "node006" "node007" "node001" "node002" "node003" "node004"
[55] "node005" "node006" "node007" "node001" "node002" "node003"
[61] "node004" "node005" "node006" "node007" "node001" "node002"
[67] "node003" "node004" "node005" "node006" "node007" "node001"
[73] "node002" "node003" "node004" "node005" "node006" "node007"
[79] "node001" "node002" "node003" "node004" "node005" "node006"
[85] "node007" "node001" "node002" "node003" "node004" "node005"
[91] "node006" "node007" "node001" "node002" "node003" "node004"
[97] "node005" "node006" "node007" "node001" "node002" "node003"
[103] "node004" "node005" "node006" "node007" "node001" "node002"
[109] "node003" "node004" "node005" "node006" "node007" "node001"
[115] "node002" "node003" "node004" "node005" "node006" "node007"
> build.satisfaction.range.table(500000, FALSE, workers.list )
[1] "Creating Cluster"
[1] "Cluster created."
socket cluster with 120 nodes on hosts ‘localhost’, ‘node001’, ‘node002’, ‘node003’, ‘node004’, ‘node005’, ‘node006’, ‘node007’
Parallel threshold set to: 11000
Starting at: 2 running to: 5e+05 :: Sat Apr 14 22:21:05 2012
If you have read down to here then you may be interested to know that I tested each cluster setup I could (including openMPI) and found that there wasn't a speed difference, perhaps that is because my calculations where so CPU bound, perhaps not.
Also, don't give up even though it can be a pain to get going with HPC. It can be totally worth it. I would still be waiting to complete the first 100,000 iterations of the calculations I was running had I stuck with a naive implementation in base-R on a commodity workstation (well, not really as I would never have stuck with R :D ). With the cluster, 384,000 iterations completed in under a week. Totally worth the time (and it took a lot of it) to setup.