ForkJoinPool - Why program is throwing OutOfMemoryError?
Asked Answered
L

2

6

I wanted to try out ForkJoinPool in Java 8 so i wrote a small program for searching all the files whose name contains a specific keyword in a given directory.

Program:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
                if(mainDirectory.canRead()) {
                    File[] fileList = mainDirectory.listFiles();
                    for(File file : fileList) {
                        System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
                        if(file.isDirectory()) {
                            FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
                            recursiveTasks.add(task);
                            task.fork();
                        } else {
                            if (file.getName().contains("hello")) {
                                System.out.println(file.getName());
                                filetedFileList.add(file.getName());
                            }
                        }
                    }
                }

                for(FileSearchRecursiveTask task : recursiveTasks) {
                  filetedFileList.addAll(task.join());
                }

        }
        return filetedFileList;

    }
}

This program works fine when directory doesn't have too many sub-directories and files but if its really big then it throws OutOfMemoryError.

My understanding is that max number of threads (including compensation threads) are bounded so why their is this error? Am i missing anything in my program?

Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)   
Lover answered 2/8, 2018 at 10:1 Comment(6)
If the authors of newWorkStealingPool() wanted you to assume that the returned executor is always a ForkJoinPool, they declared the return type as such. Since they didn’t, you should not cast the returned object to ForkJoinPool. After all, there is no sense in using a factory method and then assume that it invariably has a particular, even undocumented behavior. If you want to get a new ForkJoinPool without any doubt, just use new ForkJoinPool()Gorgoneion
The main reason for this is that neither RecursiveTask nor ForkJoinTask are of type Callable or Runnable so i can't call methods on ExecutorService. Is there anything that i am missing here?Lover
Yes. For using RecursiveTask or ForkJoinTask, you need a ForkJoinPool, hence you should use a ForkJoinPool, either ForkJoinPool.commonPool() or create a new one explicitly, e.g. via new ForkJoinPool(). If a factory method’s declared return type is not suitable for your task, don’t use that factory method for your task.Gorgoneion
Thanks, i will keep this in mind but IMHO, author should have considered the support for ForkJoinTask as well.Lover
For symmetry, yes. But in the end, there is no advantage over Executors.newForkJoinPool() (if it existed) and new ForkJoinPool(), when the contract of the former is to do precisely what the latter does.Gorgoneion
well, if you think for the object creation perspective, it makes sense but if we look from use-case coverage perspective their is definitely a gap.Lover
G
8

You should not fork new tasks beyond all recognition. Basically, you should fork as long as there’s a chance that another worker thread can pick up the forked job and evaluate locally otherwise. Then, once you have forked a task, don’t call join() right afterwards. While the underlying framework will start compensation threads to ensure that your jobs will proceed instead of just having all threads blocked waiting for a sub-task, this will create that large amount of threads that may exceed the system’s capabilities.

Here is a revised version of your code:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR"));
        List<String> files = task.invoke();
        System.out.println("Total no of files with hello " + files.size());
    }

}

class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
    private static final int TARGET_SURPLUS = 3;
    private File path;
    public FileSearchRecursiveTask(File file) {
        this.path = file;
    }

    @Override
    protected List<String> compute() {
        File directory = path;
        if(directory.isDirectory() && directory.canRead()) {
            System.out.println(Thread.currentThread() + " - Directory is " + directory.getName());
            return scan(directory);
        }
        return Collections.emptyList();
    }

    private List<String> scan(File directory)
    {
        File[] fileList = directory.listFiles();
        if(fileList == null || fileList.length == 0) return Collections.emptyList();
        List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
        List<String> filteredFileList = new ArrayList<>();
        for(File file: fileList) {
            System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
            if(file.isDirectory())
            {
                if(getSurplusQueuedTaskCount() < TARGET_SURPLUS)
                {
                    FileSearchRecursiveTask task = new FileSearchRecursiveTask(file);
                    recursiveTasks.add(task);
                    task.fork();
                }
                else filteredFileList.addAll(scan(file));
            }
            else if(file.getName().contains("hello")) {
                filteredFileList.add(file.getAbsolutePath());
            }
        }

        for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
            FileSearchRecursiveTask task = recursiveTasks.get(ix);
            if(task.tryUnfork()) task.complete(scan(task.path));
        }

        for(FileSearchRecursiveTask task: recursiveTasks) {
            filteredFileList.addAll(task.join());
        }
        return filteredFileList;
    }
}

The method doing the processing has been factored out into a method receiving the directory as parameter, so we are able to use it locally for arbitrary directories not necessarily being associated with a FileSearchRecursiveTask instance.

Then, the method uses getSurplusQueuedTaskCount() to determine the number of locally enqueued tasks which have not been picked up by other worker threads. Ensuring that there are some helps work balancing. But if this number exceeds the threshold, the processing will be done locally without forking more jobs.

After the local processing, it iterates over the tasks and uses tryUnfork() to identify jobs which have not been stolen by other worker threads and process them locally. Iterating backwards to start this with the youngest jobs raises the chances to find some.

Only afterwards, it join()s with all sub-jobs which are now either, completed or currently processed by another worker thread.

Note that I changed the initiating code to use the default pool. This uses “number of CPU cores” minus one worker threads, plus the initiating thread, i.e. the main thread in this example.

Gorgoneion answered 2/8, 2018 at 12:44 Comment(0)
S
3

Just a minor change is required. You need to specify the parallelism for newWorkStealingPool as follows:

ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(5);

As per its documentation:

newWorkStealingPool(int parallelism) -> Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention. The parallelism level corresponds to the maximum number of threads actively engaged in, or available to engage in, task processing. The actual number of threads may grow and shrink dynamically. A work-stealing pool makes no guarantees about the order in which submitted tasks are executed.

As per the attached Java Visual VM screenshot, this parallelism allows the program to work within the memory specified and never goes out of memory. enter image description here

And, one more thing (not sure if it will make any effect):

Change the order in which fork is called and the task is added to list. That is, change

FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
recursiveTasks.add(task);
task.fork();

to

FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
task.fork();
recursiveTasks.add(task);
Scoter answered 2/8, 2018 at 10:23 Comment(4)
By default Runtime.getRuntime().availableProcessors() is used as parallelism parameter. By setting 5 you might have actually increased the parallelism if the code is running on a VM with limited number of CPUs.Irtysh
@KarolDowbecki right, I think that the only hint I found at this is a comment inside ForJoinPool: Unless there are already enough live threads, method tryCompensate() may create or re-activate a spare thread to compensate for blocked joiners until they unblockWommera
But using the parallelism, helped me to never get out of memory. Please have a look at the screenshot.Scoter
@Scoter - I think you may have less number of files in the <PATH>. I tried with param and got the same error.Lover

© 2022 - 2024 — McMap. All rights reserved.