I wanted to try out ForkJoinPool in Java 8 so i wrote a small program for searching all the files whose name contains a specific keyword in a given directory.
Program:
public class DirectoryService {
public static void main(String[] args) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
List<String> files = pool.invoke(task);
pool.shutdown();
System.out.println("Total no of files with hello" + files.size());
}
}
class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
private String path;
public FileSearchRecursiveTask(String path) {
this.path = path;
}
@Override
protected List<String> compute() {
File mainDirectory = new File(path);
List<String> filetedFileList = new ArrayList<>();
List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
if(mainDirectory.isDirectory()) {
System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
if(mainDirectory.canRead()) {
File[] fileList = mainDirectory.listFiles();
for(File file : fileList) {
System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
if(file.isDirectory()) {
FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
recursiveTasks.add(task);
task.fork();
} else {
if (file.getName().contains("hello")) {
System.out.println(file.getName());
filetedFileList.add(file.getName());
}
}
}
}
for(FileSearchRecursiveTask task : recursiveTasks) {
filetedFileList.addAll(task.join());
}
}
return filetedFileList;
}
}
This program works fine when directory doesn't have too many sub-directories and files but if its really big then it throws OutOfMemoryError.
My understanding is that max number of threads (including compensation threads) are bounded so why their is this error? Am i missing anything in my program?
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
newWorkStealingPool()
wanted you to assume that the returned executor is always aForkJoinPool
, they declared the return type as such. Since they didn’t, you should not cast the returned object toForkJoinPool
. After all, there is no sense in using a factory method and then assume that it invariably has a particular, even undocumented behavior. If you want to get a newForkJoinPool
without any doubt, just usenew ForkJoinPool()
… – GorgoneionRecursiveTask
orForkJoinTask
, you need aForkJoinPool
, hence you should use aForkJoinPool
, eitherForkJoinPool.commonPool()
or create a new one explicitly, e.g. vianew ForkJoinPool()
. If a factory method’s declared return type is not suitable for your task, don’t use that factory method for your task. – GorgoneionExecutors.newForkJoinPool()
(if it existed) andnew ForkJoinPool()
, when the contract of the former is to do precisely what the latter does. – Gorgoneion