How do I know when ExecutorService has finished if items on the ES can resubmit to the ES
Asked Answered
O

3

2

My Java application works on music files within folders, it is designed to process multiple folders in parallel and independently. To do this each folder is processed by an ExecutorService that has a maximum pool size that matches no of CPUs of the computer.

For example, if we have 8-CPU computer then eight folders can (in theory) be processed concurrently, if we have a 16-CPU computer then 16 folders can be processed concurrently. If we only have 1 CPU then we set pool-size to 3, to allow the CPU to continue doing something if one folder blocked on I/O.

However, we don't actually have just one ExecutorService we have more than one because each folder can go through a number of stages.

Process1 (uses ExecutorService1) → Process2 (ExecutorService2) → Process3 (ExecutorService3)

Process 1,2,3 etc all implements Callable and all have their own associated ExecutorService. There is a FileLoader process that we kick off and this loads folders and then create a Process1 callable for each folder and submits to Process1 executor, for each Process1 callable it will do its work and then submit to a different callable, this maybe Process2, Process3 ecetera but we never go backwards, e.g Process3 will never submit to Process1. We actually have 12 processes, but any particular folder is unlikeley to go through all 12 processes

But I realized that this is flawed because in the case of a 16-CPU computer each ES can have pool-size of 16, so we actually have 48 threads running and this will just lead too much contention.

So what I was going to do was have all processes (Process1, Process2…) use the same ExecutorService, that way we only ever worker threads matching CPUs.

However, in my current situation, we have a SongLoader process that has just one task submitted (loading of all folders) and we then call shutdown(), this won't complete until everything has been submitted to Process0, then shutdown() on Process0 won't succeed until everything sent to Process1 and so on.

 //Init Services
 services.add(songLoaderService);
 services.add(Process1.getExecutorService());
 services.add(Process2.getExecutorService());
 services.add(Process3.getExecutorService());

 for (ExecutorService service : services)
     //Request Shutdown
     service.shutdown();

     //Now wait for all submitted tasks to complete
     service.awaitTermination(10, TimeUnit.DAYS);
 }
 //...............
 //Finish Off work

However, if everything was on same ES and Process1 was submitting to Process2 this would no longer work because at the time shutdown() was called not all folders that Process1 would have submitted to Process2 so it would be shut down prematurely.

So how do I detect when all work has been completed using a single ExecutorService when tasks on that ES can submit to other tasks on the same ES?

Or is there a better approach?

Note, you might just think why doesnt he just merge the logic of Process1,2 & 3 into a single Process. The difficulty is that although I initially I groups songs by folder, sometimes the songs gets split into smaller groups and they get allocated to seperate processes doiwn the line and not neessarily the same process, there are actually 12 processes in total.

Attempt based on Sholms idea

Main Thread

    private static List<Future> futures = Collections.synchronizedList(new ArrayList<Future>());
    private static AnalyserService analyserService = new MainAnalyserService(SongKongThreadGroup.THREAD_WORKER);
    ...
    SongLoader loader = SongLoader.getInstanceOf(parentFolder);
    ExecutorService songLoaderService =  SongLoader.getExecutorService();
    songLoaderService.submit(loader);
    for(Future future : futures)
    {
        try
        {
             future.get();
        }
        catch (InterruptedException ie)
        {
            SongKong.logger.warning(">>>>>> Interrupted - shutting down tasks immediately");
            getAnalyserService().getExecutorService().awaitTermination(30, TimeUnit.SECONDS);
        }
        catch(ExecutionException e)
        {
            SongKong.logger.log(Level.SEVERE, ">>>>>> ExecutionException:"+e.getMessage(), e);
        }
    }
    songLoaderService.shutdown();

With Process code submitting new tasks using this function from MainAnalyserService

public void submit(Callable<Boolean> task) //throws Exception
{
    FixSongsController.getFutures().add(getExecutorService().submit(task));
}

It looked like it was working but it failed with

java.util.ConcurrentModificationException
    at java.base/java.util.ArrayList$Itr.checkForComodification(Unknown Source)
    at java.base/java.util.ArrayList$Itr.next(Unknown Source)
    at com.jthink.songkong.analyse.toplevelanalyzer.FixSongsController.start(FixSongsController.java:220)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:49)
    at com.jthink.songkong.ui.swingworker.FixSongs.doInBackground(FixSongs.java:18)
    at java.desktop/javax.swing.SwingWorker$1.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.desktop/javax.swing.SwingWorker.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

and I now releize I cannot hyave one thread calling future.get() (which waits until done), whilst at the same time other threads are adding to the list.

Octant answered 16/6, 2019 at 7:53 Comment(2)
Just a word of caution: too many concurrent accesses to the file system will negate the benefits of multiple threads (depending on how much I/O is performed of course).Snocat
You might want to check FileVisitor et al. to apply processing on individual filesSnocat
S
2

I agree with Shloim that you don't need multiple ExecutorService instances here -- just one (sized to the number of CPUs you have available) is sufficient and actually optimal. Actually, I think you might not need ExecutorService; a simple Executor can do the job if you use an external mechanism of signaling completeness.

I would start by building a class to represent the entirety of a larger work item. If you need to consume the results from each child work item, you could use a queue, but if you just want to know if there is work left to do, you only need a counter.

For example, you could do something like this:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private int pendingItems;  // guarded by monitor lock on this instance

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public synchronized void enqueueMoreWork(File file) {
        pendingItems++;
        executor.execute(new FileWork(file, this));
    }

    public synchronized void markWorkItemCompleted() {
        pendingItems--;
        notifyAll();
    }

    public synchronized boolean hasPendingWork() {
        return pendingItems > 0;
    }

    public synchronized void awaitCompletion() {
       while (pendingItems > 0) {
           wait();
       }
    }
}

public class FileWork implements Runnable {
    private final File file;
    private final FolderWork parent;

    public FileWork(File file, FolderWork parent) {
        this.file = file;
        this.parent = parent;
    }

    @Override
    public void run() {
        try {
           // do some work with the file

           if (/* found more work to do */) {
               parent.enqueueMoreWork(...);
           }
        } finally {
            parent.markWorkItemCompleted();
        }
    }
}

If you're worried about synchronization overhead for the pendingItems counter, you can use an AtomicInteger for it instead. Then you need a separate mechanism for notifying a waiting thread that we are done; for example, you can use a CountDownLatch. Here's an example implementation:

public class FolderWork implements Runnable {
    private final Executor executor;
    private final File folder;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    public FolderWork(Executor executor, File folder) {
        this.executor = executor;
        this.folder = folder;
    }

    @Override
    public void run() {
        for (File file : folder.listFiles()) {
            enqueueMoreWork(file);
        }
    }

    public void enqueueMoreWork(File file) {
        if (latch.getCount() == 0) {
            throw new IllegalStateException(
                "Cannot call enqueueMoreWork() again after awaitCompletion() returns!");
        }
        pendingItems.incrementAndGet();
        executor.execute(new FileWork(file, this));
    }

    public void markWorkItemCompleted() {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0) {
            latch.countDown();
        }
    }

    public boolean hasPendingWork() {
        return pendingItems.get() > 0;
    }

    public void awaitCompletion() {
       latch.await();
    }
}

You would call this like so:

Executor executor = Executors.newCachedThreadPool(...);
FolderWork topLevel = new FolderWork(executor, new File(...));
executor.execute(topLevel);
topLevel.awaitCompletion();

This example only shows one level of child work items, but you can use any number of child work items as long as they all use the same pendingItems counter to keep track of how much work is left to do.

Shaylashaylah answered 17/6, 2019 at 13:22 Comment(13)
Not sure how to apply this to my problem and I dont like all this synchronization, and seems to low level for my liking, but your suggestion of CompletableFutures sounds more promising.Octant
The synchronization is unlikely to be a problem in practice, since the monitor should not be heavily contended. But if you're worried about it, you can easily use an atomic instead. Then you just need a one-time signal to trigger so that your outer code can know when all the work is done; I used a CountDownLatch for that, but you could use a Semaphore instead if you prefer.Shaylashaylah
@PaulTaylor: Please take a look at my updated answer. Is that not what you're looking for?Shaylashaylah
Actually, after further thinking, I think the lower-level wait()/notifyAll() solution is slightly more correct here, since the monitor lock prevents another enqueueMoreWork() call from sneaking in after pendingItems reaches zero but before we trigger the latch. If there really was too much contention on that monitor lock (again, I think this is exceedingly unlikely given the kind of workload you're describing) you could do something fancier with e.g. AbstractQueuedSynchronizer.Shaylashaylah
If you're looking for an out-of-the-box solution and you're OK with using the ForkJoinPool rather than a custom ExecutorService, you might want to check out java.util.concurrent.RecursiveAction.Shaylashaylah
I dont understand is Process0 equivalent to FolderWork or FileWork, I dont have this conceot of parents and children, I think of it as WorkPackets going through a pipleine of stages in order, but some stages can be missed out and WorkPackets can also be split into smaller WorkPacketsOctant
FolderWork is just the top-level task (the one you want to wait for). It can spawn arbitrarily many work item tasks that can be associated however you like. I'm sorry if the choice of name made things less clear.Shaylashaylah
I don't currenlty have that concept, there is no top-level task Im waiting for, rather Im just waiting for the number of oustanding tasks to finishOctant
I have used ForkJoin but I dont think it would be suitable since it seems to be for small pieces of work, and also not sure it ensure orderOctant
@PaulTaylor: I don't know what else you want from me. If you want to learn more about concurrency, I highly recommend the book Java Concurrency in Practice by Goetz et al. If you have a specific question, feel free to ask a new question. If you don't like any of the answers you've received, you are welcome to try to find someone who will tell you what you want to hear. I am convinced this approach would solve your problem, but if you're not convinced, that ultimately doesn't matter to me. I hope my answer here will be useful to someone, someday, even if it isn't useful to you today.Shaylashaylah
I do actually have the Goetz book, I suppose I was hoping there would be a fairly simple modification I could make to how I was doing things to get a solution, instead I have a number of potential solutions but its really not clear to me if they do actually solve my problem, for example in your solution I don't see how FolderWorker is possibly going to know when childworkers it doesn't actually call itself are completed. But thankyou for your help, and I will study your answer/coments in more detail to see if they could work.Octant
I went to the pub, had a think and finally understood what you were getting at, I have implemented and looks like its working, thanks. Marked your answer as right but added mine own just to make things a bit clearer.Octant
Daniel, Im getting some unreliable behaviour with this solution (Executor terminating even though has tasks) , if you have insight on this question would be most appreciated - #56937585Octant
P
1

Do not shutdown() the ExecutorService. Instead, create Callable objects and keep the Future objects that they create. Now you can wait on the Future objects instead of waiting on the ExecutorService. Note that now you will have to wait on every future object separately, but if you only need to know when the last one finishes, then you can just as well iterate on them at any given order and call get().

Any task can submit more tasks and needs to make sure to put its future object in a queue that will be monitored by your main thread.

// put these somewhere public
ConcurrentLinkedQueue<Future<Boolean>> futures = new ConcurrentLinkedQueue<Future<Boolean>>();
ExecutorService executor = ...

void submit(Callable<Boolean> c) {
    futures.add(executor.submit(c));
}

Now your main thread can start submitting tasks and wait for all tasks and subtasks:

void mainThread() {
    // add some tasks from main thread
   for(int i=0 ; i<N ; ++i){
        Callable<Boolean> callable = new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
                ...
            }
        submit(callable);
    }

    Future<Boolean> head = null;
    while((head=futures.poll()) != null){
        try {
            head.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
    // At this point, all of your tasks are complete including subtasks.
    executor.shutdown();
    executor.awaitTermination(); // should return almost immediately
}
Photima answered 16/6, 2019 at 8:20 Comment(22)
This could work, however two points, firstly in your example you show all the callables being submitted at the start from a main thread, that is not how it works they are submitted over time and from within the callables themselves. I could adapt your method by making futures a concurrentlist that could be accessed from within the callables but doesnt seem as elegant. Secondly, I worry about the looping round all the futures, could anything happen that could prevent this loop completing ?Octant
My code is just an example for how to use callables and futures. Of course you can submit another callable at any given time from any given thread and adapt this code accordingly. If you get stuck during the loop on future.get() then you would have also gotten stuck on executor.awaitTermination() so you are not taking more risks than you already had in this method (consider that awaitTermination() is basically looping on multiple thread.join() calls for you)Photima
okay guess would be better if your solution better matched my question that gives me confidence you fully understand my question, but more importantly I never have got stuck on awaitTermination(), is there any way I could get stuck on the Future loop ? it just seems weird to loop round when not actually doing anything with the result of the futureOctant
Your question did not show how you actually use the executors, so my example was general, and not specific to your case. You can easily extend my example to verify that it matches your needs. Just think of future.get() as a way to call awaitTermination() on a per-task basis. The way to get stuck in all the cases we discussed up until now is to submit a task that will run forever, e.g. void run() { while (true) ; }Photima
On second thought, if all of your "Processes" run the code according to my example, they will be able to do exactly what you wanted using a single ExecutorService. I'll remodel my answer to be more useful to you.Photima
There will always be a race condition between completing work and discovering more work. One way to solve that is to have each Callable itself return a Future of more work; so then you get a Future<Future<Boolean>>. Then when you call future.get() you need to enqueue any Future it returns back into the queue of futures to wait on. (And you probably want a Queue and not an ArrayList; use ArrayDeque if you don't need it to be thread-safe, or LinkedBlockingQueue otherwise.)Shaylashaylah
Now I dont get it, so you have multiple classes that subclass ProcessBase (e.g Process1, Process2 ..) and when they have done their work they submit to another Process. But what calls awaittermination(), how does the overall main thread know when all the processes completed (i.e all folders below the starting folder have been fully processed)Octant
@DanielPryden 'There will always be a race condition between completing work and discovering more work', what do you mean here. There is an end to the work (i.e all the folders that live under the starting folder) once that work has been completed shoud stopOctant
@PaulTaylor: My point is that you know how many top-level work items there are, but if each work item can enqueue more work, you have to make sure that you have a happens-before edge between adding more work (for child items) and marking the current work (for the current folder) as complete. If I understand the problem, your folder work items can add more work as they go along, and you don't know ahead of time exactly how many work items will need to be completed until you're done. So you need to synchronize the adding of new work (as you find it) with removing work (as each one is done).Shaylashaylah
Im not sure yo do understand the problem. In fact I dont know how many work items there will be at start but I just want to let it run until completionOctant
The main thread submits main tasks and waits on them. When they finish everything is finished, because tasks wait for their subtasks (at least that is what you let me to believe)Photima
@Shloim no the main task doesnt really submit tasks,a single task is started and this reads folders and submit tasks to Process1, then for each task Proces1 receives it does work and then submit to another task (or finishes). So apart from the starting task the main thread doesnt submit task, it just waits for the tasks to complete. I will update my question with my attempt based on your original suggestion which nearly works but doesnt.Octant
@Shloim i have updated my attempt, I didnt understand your second approach because I couldnt see when you could call awaitTermination for each instance of ProcessBase. Or am I supposed to add each Class to a list and call awaitTermination() on each in turn similar to how I called shutdown on each Executor in turn, but I dont I run into the same problem as I have with my code of calling get() on a future when I still may be adding to the list of futures.Octant
@PaulTaylor: I think I understand your problem exactly. Check my answer below. Does that not answer your question?Shaylashaylah
@DanielPryden I dont get your answer, for one it talks about Folder and File work Items, I just have WorkItems (that may consist of one one folder, multiple folders or parts of folders), but also there is alot of synchronization in your code, every time i have used synchronization in the past it has led to performance problems.I would prefer a solution that modifies my current solution a little so I can use one executorServioce rather than a completely new solutionOctant
@PaulTaylor: Feel free to comment about my answer on my answer. But the short version is that your problem is entirely about using an asynchronous work item to enqueue more asynchronous work items. You can do this sort of thing using Future but that forces you to consume the asynchronous outputs synchronously. If you have a recent version of Java, you can use CompletableFuture instead. Or you can use the Guava library's ListenableFuture. Or you can use a simple callback. Or you can build an awaitable data structure that keeps track of how many things you're waiting for.Shaylashaylah
'problem is entirely about using an asynchronous work item to enqueue more asynchronous work items', yes I think you have hit the nail on the head, I am using Java 8 so I can use CompletableFuture, that sounds like may be the right solution, but I dont know what that solution is.Octant
Now I got you. Use a ConcurrentLinkedQueue<Future> instead of List<Future> in your solution.Photima
I've updated my answer with a revised solution where the main thread collects all tasks and subtasks.Photima
Thanks, but ther work is submitted by the callables themselves not the main thread and I think @DanilePryden is right when he says 'There will always be a race condition between completing work and discovering more work', if futures is empty it maybe because al the work is done, or it maybe because it is just about to submit a future (I think)Octant
Please go over my solution again, because there is no race condition between completing work and submitting new work, because a task cannot submit asubtask after it has finished. So the main thread cannot wake from waiting for task A until task A has completed submitting subtasks and enqueuing it.Photima
@Shloim maybe you are right but the solution is no good for me because the tasks are bing submitted by the main thread, thanks anywayOctant
O
0

This is essentally @DanielPrydens solution, but I have massaged it a little just so it more clearly shows how to solve my particular issue

Created a new class MainAnalyserService that handles creation of the ExecutorService and provides the ability to count when new Callable tasks are submitted and when they have completed

public class MainAnalyserService 
{
    public static final int MIN_NUMBER_OF_WORKER_THREADS = 3;
    protected static int BOUNDED_QUEUE_SIZE = 100;

    private final AtomicInteger pendingItems = new AtomicInteger(0);
    private final CountDownLatch latch = new CountDownLatch(1);

    private static final int TIMEOUT_PER_TASK = 30;

    protected  ExecutorService      executorService;

    protected String threadGroup;

    public MainAnalyserService(String threadGroup)
    {
       this.threadGroup=threadGroup;
       initExecutorService();
    }

    protected void initExecutorService()
    {
        int workerSize = Runtime.getRuntime().availableProcessors();
        //Even if only have single cpu we still have multithread so we dont just have single thread waiting on I/O
        if(workerSize< MIN_NUMBER_OF_WORKER_THREADS)
        {
            workerSize = MIN_NUMBER_OF_WORKER_THREADS;
        }

        executorService = new TimeoutThreadPoolExecutor(workerSize,
                new SongKongThreadFactory(threadGroup),
                new LinkedBlockingQueue<Runnable>(BOUNDED_QUEUE_SIZE),
                TIMEOUT_PER_TASK,
                TimeUnit.MINUTES);
    }

    public void submit(Callable<Boolean> task) //throws Exception
    {
        executorService.submit(task);
        pendingItems.incrementAndGet();
    }

    public void workDone()
    {
        int remainingItems = pendingItems.decrementAndGet();
        if (remainingItems == 0)
        {
            latch.countDown();
        }
    }

    public void awaitCompletion() throws InterruptedException{
        latch.await();
    }
}

In the FixSongsController thread we have

analyserService = new MainAnalyserService(THREAD_WORKER);

//SongLoader uses CompletionService when calls LoadFolderWorkers so shutdown wont return until all initial folder submissions completed
ExecutorService songLoaderService = SongLoader.getExecutorService();
songLoaderService.submit(loader);
songLoaderService.shutdown();

//Wait for all aysnc tasks to complete
analyserService.awaitCompletion();

Then any Callable (such as Process1, Process2 etc) calls submit() to submit a new Callable on the ExecutorService, and then it must call workDone() when it has completed, so to ensure I do this I add to a finally block in the call() of each Process class method

e.g

public Boolean call() 
{
    try
    {
        //do stuff
        //Possibly make multiple calls to                      
        FixSongsController.getAnalyserService().submit();
    }
    finally
    {
        FixSongsController.getAnalyserService().workDone();
    }
}
Octant answered 17/6, 2019 at 20:50 Comment(1)
FWIW another approach would be to actually wrap the Executor (or, if you're bold enough, the ExecutorService) API and use your own subclass of FutureTask that adds the finally block to every Runnable or Callable that gets submitted. Up to you whether that helps or not.Shaylashaylah

© 2022 - 2024 — McMap. All rights reserved.