My Java application works on music files within folders, it is designed to process multiple folders in parallel and independently. To do this each folder is processed by an ExecutorService that has a maximum pool size that matches no of CPUs of the computer.
For example, if we have 8-CPU computer then eight folders can (in theory) be processed concurrently, if we have a 16-CPU computer then 16 folders can be processed concurrently. If we only have 1 CPU then we set pool-size to 3, to allow the CPU to continue doing something if one folder blocked on I/O.
However, we don't actually have just one ExecutorService we have more than one because each folder can go through a number of stages.
Process1 (uses ExecutorService1) → Process2 (ExecutorService2) → Process3 (ExecutorService3)
Process 1,2,3 etc all implements Callable and all have their own associated ExecutorService. There is a FileLoader process that we kick off and this loads folders and then create a Process1 callable for each folder and submits to Process1 executor, for each Process1 callable it will do its work and then submit to a different callable, this maybe Process2, Process3 ecetera but we never go backwards, e.g Process3 will never submit to Process1. We actually have 12 processes, but any particular folder is unlikeley to go through all 12 processes
But I realized that this is flawed because in the case of a 16-CPU computer each ES can have pool-size of 16, so we actually have 48 threads running and this will just lead too much contention.
So what I was going to do was have all processes (Process1, Process2…) use the same ExecutorService, that way we only ever worker threads matching CPUs.
However, in my current situation, we have a SongLoader process that has just one task submitted (loading of all folders) and we then call shutdown(), this won't complete until everything has been submitted to Process0, then shutdown() on Process0 won't succeed until everything sent to Process1 and so on.
//Init Services
services.add(songLoaderService);
services.add(Process1.getExecutorService());
services.add(Process2.getExecutorService());
services.add(Process3.getExecutorService());
for (ExecutorService service : services)
//Request Shutdown
service.shutdown();
//Now wait for all submitted tasks to complete
service.awaitTermination(10, TimeUnit.DAYS);
}
//...............
//Finish Off work
However, if everything was on same ES and Process1 was submitting to Process2 this would no longer work because at the time shutdown() was called not all folders that Process1 would have submitted to Process2 so it would be shut down prematurely.
So how do I detect when all work has been completed using a single ExecutorService when tasks on that ES can submit to other tasks on the same ES?
Or is there a better approach?
Note, you might just think why doesnt he just merge the logic of Process1,2 & 3 into a single Process. The difficulty is that although I initially I groups songs by folder, sometimes the songs gets split into smaller groups and they get allocated to seperate processes doiwn the line and not neessarily the same process, there are actually 12 processes in total.
Attempt based on Sholms idea
Main Thread
private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
...
SongLoader loader = SongLoader.getInstanceOf(parentFolder);
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
for(Future future : futures)
{
try
{
future.get();
}
catch (InterruptedException ie)
{
SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
}
catch(ExecutionException e)
{
SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
}
}
songLoaderService.shutdown();
With Process code submitting new tasks using this function from MainAnalyserService
public void submit(Callable<Boolean> task) //throws Exception
{
FixSongsController.getFutures().add(getExecutorService().submit(task));
}
It looked like it was working but it failed with
java.util.ConcurrentModificationException
at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
at java.base/java.util.ArrayList$Itr.next(Unknown Source)
at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
and I now releize I cannot hyave one thread calling future.get() (which waits until done), whilst at the same time other threads are adding to the list.
FileVisitor
et al. to apply processing on individual files – Snocat