This is using Scala 2.8 Actors. I have a long running job which can be parallelized. It consists of about 650,000 units of work. I divide it into 2600 different separate subtasks, and for each of these I create a new actor:
actor {
val range = (0L to total by limit)
val latch = new CountDownLatch(range.length)
range.foreach { offset =>
actor {
doExpensiveStuff(offset,limit)
latch.countDown
}
}
latch.await
}
This works fairly well, but overall takes 2+h to complete. The issue is that in the meanwhile, any other actors I create to do normal tasks seem to be starved out by the initial 2600 actors which are also patiently awaiting their time to be run on a thread but have been waiting longer than any new actors that come along.
How might I go about avoiding this starvation?
Initial thoughts:
- Instead of 2600 actors, use one actor that sequentially plows through the large pile of work. I'm not fond of this because I'd like this job to finish sooner by splitting it up.
- Instead of 2600 actors, use two actors, each processing a different half of the total work set. This might work better, but what if my machine has 8 cores? I'd likely want to utilize more than that.
UPDATE
Some folks have questioned the use of Actors at all, especially since the message passing capability was not being used within the workers. I had assumed that the Actor was a very lightweight abstraction around a ThreadPool at or near the same performance level of simply coding the ThreadPool-based execution manually. So I wrote a little benchmark:
import testing._
import java.util.concurrent._
import actors.Futures._
val count = 100000
val poolSize = 4
val numRuns = 100
val ActorTest = new Benchmark {
def run = {
(1 to count).map(i => future {
i * i
}).foreach(_())
}
}
val ThreadPoolTest = new Benchmark {
def run = {
val queue = new LinkedBlockingQueue[Runnable]
val pool = new ThreadPoolExecutor(
poolSize, poolSize, 1, TimeUnit.SECONDS, queue)
val latch = new CountDownLatch(count)
(1 to count).map(i => pool.execute(new Runnable {
override def run = {
i * i
latch.countDown
}
}))
latch.await
}
}
List(ActorTest,ThreadPoolTest).map { b =>
b.runBenchmark(numRuns).sum.toDouble / numRuns
}
// List[Double] = List(545.45, 44.35)
I used the Future abstraction in the ActorTest to avoid passing a message back to another actor to signal work was done. I was surprised to find that my Actor code was over 10 times slower. Note that I also created my ThreadPoolExecutor with an initial pool size with which the default Actor pool is created.
Looking back, it seems like I've possibly overused the Actor abstraction. I'm going to look into using separate ThreadPools for these distinct, expensive, long-running tasks.