Which Java synchronisation object should I use to ensure an arbitrarily large number of tasks are completed? The constraints are that:
- Each task takes a non-trivial amount of time to complete and it is appropriate to perform tasks in parallel.
- There are too many tasks to fit into memory (i.e. I cannot put a
Future
for every task into aCollection
and then callget
on all the futures). - I do not know how many tasks there will be (i.e. I cannot use a
CountDownLatch
). - The
ExecutorService
may be shared so I cannot useawaitTermination( long, TimeUnit )
For example, with Grand Central Dispatch, I might do something like this:
let workQueue = dispatch_get_global_queue( QOS_CLASS_BACKGROUND, 0 )
let latch = dispatch_group_create()
let startTime = NSDate()
var itemsProcessed = 0
let countUpdateQueue = dispatch_queue_create( "countUpdateQueue", DISPATCH_QUEUE_SERIAL )
for item in fetchItems() // generator returns too many items to store in memory
{
dispatch_group_enter( latch )
dispatch_async( workQueue )
{
self.processItem( item ) // method takes a non-trivial amount of time to run
dispatch_async( countUpdateQueue )
{
itemsProcessed++
}
dispatch_group_leave( latch )
}
}
dispatch_group_wait( latch, DISPATCH_TIME_FOREVER )
let endTime = NSDate()
let totalTime = endTime.timeIntervalSinceDate( startTime )
print( "Processed \(itemsProcessed) items in \(totalTime) seconds." )
It produces output that looks like this (for 128 items): Processed 128 items in 1.846794962883 seconds.
I tried something similar with a Phaser
:
final Executor executor = new ThreadPoolExecutor( 64, 64, 1l, MINUTES, new LinkedBlockingQueue<Runnable>( 8 ), new CallerRunsPolicy() );
final Phaser latch = new Phaser( 0 );
final long startTime = currentTimeMillis();
final AtomicInteger itemsProcessed = new AtomicInteger( 0 );
for( final String item : fetchItems() ) // iterator returns too many items to store in memory
{
latch.register();
final Runnable task = new Runnable() {
public void run() {
processItem( item ); // method takes a non-trivial amount of time to run
itemsProcessed.incrementAndGet();
latch.arrive();
}
};
executor.execute( task );
}
latch.awaitAdvance( 0 );
final long endTime = currentTimeMillis();
out.println( "Processed " + itemsProcessed.get() + " items in " + ( endTime - startTime ) / 1000.0 + " seconds." );
The tasks do not always complete before the last print statement and I might get output that looks like this (for 128 items): Processed 121 items in 5.296 seconds.
Is the Phaser
even the right object to use? The documentation indicates it only supports 65,535 parties so I would need to either batch the items to be processed or introduce some sort of Phaser
tiering.