Does anyone know of any parallel equivalent of java Files.walkFileTree or something similar? It can be Java or Scala library.
Let's assume that executing a callback on each file is enough.
This code will not handle loops in the file system--you'd need a registry of where you've been for that (e.g. java.util.concurrent.ConcurrentHashMap
). There are all sorts of improvements you could add, like reporting exceptions instead of silently ignoring them.
import java.io.File
import scala.util._
def walk(f: File, callback: File => Unit, pick: File => Boolean = _ => true) {
Try {
val (dirs, fs) = f.listFiles.partition(_.isDirectory)
fs.filter(pick).foreach(callback)
dirs.par.foreach(f => walk(f, callback, pick))
}
}
Collecting the files using a fold instead of a foreach
is not drastically harder, but I leave that as an exercise to the reader. (A ConcurrentLinkedQueue
is probably fast enough to accept them all in a callback anyway unless you have really slow threads and a awesome filesystem.)
As others have pointed out, walking a file tree is almost certainly IO bound instead of CPU bound so the benefits of doing a multithreaded file tree walk are questionable. But if you really wanted to, you could probably roll your own with a ForkJoinPool
or similar.
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class MultiThreadedFileTreeWalk {
private static class RecursiveWalk extends RecursiveAction {
private static final long serialVersionUID = 6913234076030245489L;
private final Path dir;
public RecursiveWalk(Path dir) {
this.dir = dir;
}
@Override
protected void compute() {
final List<RecursiveWalk> walks = new ArrayList<>();
try {
Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
if (!dir.equals(RecursiveWalk.this.dir)) {
RecursiveWalk w = new RecursiveWalk(dir);
w.fork();
walks.add(w);
return FileVisitResult.SKIP_SUBTREE;
} else {
return FileVisitResult.CONTINUE;
}
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file + "\t" + Thread.currentThread());
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
e.printStackTrace();
}
for (RecursiveWalk w : walks) {
w.join();
}
}
}
public static void main(String[] args) throws IOException {
RecursiveWalk w = new RecursiveWalk(Paths.get(".").toRealPath());
ForkJoinPool p = new ForkJoinPool();
p.invoke(w);
}
}
This example walks each directory on a separate thread. Here's the tutorial for Java 7's fork/join library.
This exercise is neither as brief as the Scala answer nor as Java-like as the Java answer.
The idea here was to start parallel walks with something like a thread per device.
The walkers are on ForkJoinPool threads, so when they kick off a future for each path test, those are forked tasks on the pool. The directory test uses managed blocking when it reads the directory, looking for files.
The result is returned by completing a promise depending on the future path test. (No mechanism here for detecting empty-handed completion.)
A more interesting test would include reading zip files, since the decompression will eat some CPU.
I wonder if paulp will do something clever with deep listing.
import util._
import collection.JavaConverters._
import concurrent.{ TimeoutException => Timeout, _ }
import concurrent.duration._
import ExecutionContext.Implicits._
import java.io.IOException
import java.nio.file.{ FileVisitResult => Result, _ }
import Result.{ CONTINUE => Go, SKIP_SUBTREE => Prune, TERMINATE => Stop }
import java.nio.file.attribute.{ BasicFileAttributes => BFA }
object Test extends App {
val fileSystem = FileSystems.getDefault
val starts = (if (args.nonEmpty) args.toList else mounts) map (s => (fileSystem getPath s))
val p = Promise[(Path, BFA)]
def pathTest(path: Path, attrs: BFA) =
if (attrs.isDirectory ) {
val entries = blocking {
val res = Files newDirectoryStream path
try res.asScala.toList finally res.close()
}
List("hello","world") forall (n => entries exists (_.getFileName.toString == n))
} else {
path.getFileName.toString == "enough"
}
def visitor(root: Path) = new SimpleFileVisitor[Path] {
def stopOrGo = if (p.isCompleted) Stop else Go
def visiting(path: Path, attrs: BFA) = {
future { pathTest(path, attrs) } onComplete {
case Success(true) => p trySuccess (path, attrs)
case Failure(e) => p tryFailure e
case _ =>
}
stopOrGo
}
override def preVisitDirectory(dir: Path, attrs: BFA) = (
if ((starts contains dir) && dir != root) Prune
else visiting(dir, attrs)
)
override def postVisitDirectory(dir: Path, e: IOException) = {
if (e != null) p tryFailure e
stopOrGo
}
override def visitFile(file: Path, attrs: BFA) = visiting(file, attrs)
}
//def walk(p: Path): Path = Files walkFileTree (p, Set().asJava, 10, visitor(p))
def walk(p: Path): Path = Files walkFileTree (p, visitor(p))
def show(store: FileStore) = {
val ttl = store.getTotalSpace / 1024
val used = (store.getTotalSpace - store.getUnallocatedSpace) / 1024
val avail = store.getUsableSpace / 1024
Console println f"$store%-40s $ttl%12d $used%12d $avail%12d"
store
}
def mounts = {
val devs = for {
store <- fileSystem.getFileStores.asScala
if store.name startsWith "/dev/"
if List("ext4","fuseblk") contains store.`type`
} yield show(store)
val devstr = """(\S+) \((.*)\)""".r
(devs.toList map (_.toString match {
case devstr(name, dev) if devs.toList exists (_.name == dev) => Some(name)
case s => Console println s"Bad dev str '$s', skipping" ; None
})).flatten
}
starts foreach (f => future (walk(f)))
Try (Await result (p.future, 20.seconds)) match {
case Success((name, attrs)) => Console println s"Result: ${if (attrs.isDirectory) "dir" else "file"} $name"
case Failure(e: Timeout) => Console println s"No result: timed out."
case Failure(t) => Console println s"No result: $t."
}
}
Let's assume that executing a callback on each file is enough.
This code will not handle loops in the file system--you'd need a registry of where you've been for that (e.g. java.util.concurrent.ConcurrentHashMap
). There are all sorts of improvements you could add, like reporting exceptions instead of silently ignoring them.
import java.io.File
import scala.util._
def walk(f: File, callback: File => Unit, pick: File => Boolean = _ => true) {
Try {
val (dirs, fs) = f.listFiles.partition(_.isDirectory)
fs.filter(pick).foreach(callback)
dirs.par.foreach(f => walk(f, callback, pick))
}
}
Collecting the files using a fold instead of a foreach
is not drastically harder, but I leave that as an exercise to the reader. (A ConcurrentLinkedQueue
is probably fast enough to accept them all in a callback anyway unless you have really slow threads and a awesome filesystem.)
© 2022 - 2024 — McMap. All rights reserved.