Parallel version of Files.walkFileTree (java or scala)
Asked Answered
B

3

10

Does anyone know of any parallel equivalent of java Files.walkFileTree or something similar? It can be Java or Scala library.

Ballplayer answered 18/7, 2013 at 19:59 Comment(3)
I do not think it makes sense because all parallel threads will have same bottleneck - HDD. And it can't be paralleled as network io operations.Sibie
Why is walking your file tree in parallel a good idea? This is usually IO bound, not CPU bound.Happily
In my case file processing is CPU bound and I/O utilization is around 10%-20%.Ballplayer
H
3

Let's assume that executing a callback on each file is enough.

This code will not handle loops in the file system--you'd need a registry of where you've been for that (e.g. java.util.concurrent.ConcurrentHashMap). There are all sorts of improvements you could add, like reporting exceptions instead of silently ignoring them.

import java.io.File
import scala.util._
def walk(f: File, callback: File => Unit, pick: File => Boolean = _ => true) {
  Try {
    val (dirs, fs) = f.listFiles.partition(_.isDirectory)
    fs.filter(pick).foreach(callback)
    dirs.par.foreach(f => walk(f, callback, pick))
  }
}

Collecting the files using a fold instead of a foreach is not drastically harder, but I leave that as an exercise to the reader. (A ConcurrentLinkedQueue is probably fast enough to accept them all in a callback anyway unless you have really slow threads and a awesome filesystem.)

Happily answered 18/7, 2013 at 21:17 Comment(1)
Actually I hoped to get link to 'mature-ish' library that does that and has some additional futures, but Your example is enough for my current needs. Thanks!Ballplayer
P
14

As others have pointed out, walking a file tree is almost certainly IO bound instead of CPU bound so the benefits of doing a multithreaded file tree walk are questionable. But if you really wanted to, you could probably roll your own with a ForkJoinPool or similar.

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class MultiThreadedFileTreeWalk {
    private static class RecursiveWalk extends RecursiveAction {
        private static final long serialVersionUID = 6913234076030245489L;
        private final Path dir;

        public RecursiveWalk(Path dir) {
            this.dir = dir;
        }

        @Override
        protected void compute() {
            final List<RecursiveWalk> walks = new ArrayList<>();
            try {
                Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
                    @Override
                    public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                        if (!dir.equals(RecursiveWalk.this.dir)) {
                            RecursiveWalk w = new RecursiveWalk(dir);
                            w.fork();
                            walks.add(w);

                            return FileVisitResult.SKIP_SUBTREE;
                        } else {
                            return FileVisitResult.CONTINUE;
                        }
                    }

                    @Override
                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                        System.out.println(file + "\t" + Thread.currentThread());
                        return FileVisitResult.CONTINUE;
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }

            for (RecursiveWalk w : walks) {
                w.join();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        RecursiveWalk w = new RecursiveWalk(Paths.get(".").toRealPath());
        ForkJoinPool p = new ForkJoinPool();
        p.invoke(w);
    }
}

This example walks each directory on a separate thread. Here's the tutorial for Java 7's fork/join library.

Periostitis answered 18/7, 2013 at 20:25 Comment(3)
If there is some functionality to perform on each element, from past experience significant performance can be gained when walking a file tree and executing the task at each node concurrently vs serially.Cassiterite
@Cassiterite It depends on the functionality. If the functionality is very CPU intensive, then it may outweigh the IO boundness of walking a file tree. If that is the case, then making your code concurrent may be worthwhile. However, this will not always be the case.Periostitis
Agreed, which is why I qualified the statement. I just wanted to point out that there are cases where performance gains can be achieved since it was stated as "questionable" in the answer.Cassiterite
M
4

This exercise is neither as brief as the Scala answer nor as Java-like as the Java answer.

The idea here was to start parallel walks with something like a thread per device.

The walkers are on ForkJoinPool threads, so when they kick off a future for each path test, those are forked tasks on the pool. The directory test uses managed blocking when it reads the directory, looking for files.

The result is returned by completing a promise depending on the future path test. (No mechanism here for detecting empty-handed completion.)

A more interesting test would include reading zip files, since the decompression will eat some CPU.

I wonder if paulp will do something clever with deep listing.

import util._
import collection.JavaConverters._
import concurrent.{ TimeoutException => Timeout, _ }
import concurrent.duration._
import ExecutionContext.Implicits._
import java.io.IOException
import java.nio.file.{ FileVisitResult => Result, _ }
import Result.{ CONTINUE => Go, SKIP_SUBTREE => Prune, TERMINATE => Stop }
import java.nio.file.attribute.{ BasicFileAttributes => BFA }

object Test extends App {
  val fileSystem = FileSystems.getDefault
  val starts = (if (args.nonEmpty) args.toList else mounts) map (s => (fileSystem getPath s))
  val p = Promise[(Path, BFA)]

  def pathTest(path: Path, attrs: BFA) =
    if (attrs.isDirectory ) {
      val entries = blocking {
        val res = Files newDirectoryStream path
        try res.asScala.toList finally res.close()
      }
      List("hello","world") forall (n => entries exists (_.getFileName.toString == n))
    } else {
      path.getFileName.toString == "enough"
    }

  def visitor(root: Path) = new SimpleFileVisitor[Path] {
    def stopOrGo = if (p.isCompleted) Stop else Go
    def visiting(path: Path, attrs: BFA) = {
      future { pathTest(path, attrs) } onComplete {
        case Success(true) => p trySuccess (path, attrs)
        case Failure(e)    => p tryFailure e
        case _             =>
      }
      stopOrGo
    }
    override def preVisitDirectory(dir: Path, attrs: BFA) = (
      if ((starts contains dir) && dir != root) Prune
      else visiting(dir, attrs)
    )
    override def postVisitDirectory(dir: Path, e: IOException) = {
      if (e != null) p tryFailure e
      stopOrGo
    }
    override def visitFile(file: Path, attrs: BFA) = visiting(file, attrs)
  }
  //def walk(p: Path): Path = Files walkFileTree (p, Set().asJava, 10, visitor(p))
  def walk(p: Path): Path = Files walkFileTree (p, visitor(p))

  def show(store: FileStore) = {
    val ttl   = store.getTotalSpace / 1024
    val used  = (store.getTotalSpace - store.getUnallocatedSpace) / 1024
    val avail = store.getUsableSpace / 1024
    Console println f"$store%-40s $ttl%12d $used%12d $avail%12d"
    store
  }
  def mounts = {
    val devs = for {
      store <- fileSystem.getFileStores.asScala
      if store.name startsWith "/dev/"
      if List("ext4","fuseblk") contains store.`type`
    } yield show(store)
    val devstr = """(\S+) \((.*)\)""".r
    (devs.toList map (_.toString match {
      case devstr(name, dev) if devs.toList exists (_.name == dev) => Some(name)
      case s => Console println s"Bad dev str '$s', skipping" ; None
    })).flatten
  }

  starts foreach (f => future (walk(f)))

  Try (Await result (p.future, 20.seconds)) match {
    case Success((name, attrs)) => Console println s"Result: ${if (attrs.isDirectory) "dir" else "file"} $name"
    case Failure(e: Timeout)    => Console println s"No result: timed out."
    case Failure(t)             => Console println s"No result: $t."
  }
}
Megalo answered 19/7, 2013 at 13:49 Comment(2)
Thank You for taking so much time to write this code. I decided to accept Rex Kerr solution because it's so brief and by that easy to debug.Ballplayer
@lucek Rex is the best. Thx for the question, it was fun exploring the API. I also upped the other answers.Megalo
H
3

Let's assume that executing a callback on each file is enough.

This code will not handle loops in the file system--you'd need a registry of where you've been for that (e.g. java.util.concurrent.ConcurrentHashMap). There are all sorts of improvements you could add, like reporting exceptions instead of silently ignoring them.

import java.io.File
import scala.util._
def walk(f: File, callback: File => Unit, pick: File => Boolean = _ => true) {
  Try {
    val (dirs, fs) = f.listFiles.partition(_.isDirectory)
    fs.filter(pick).foreach(callback)
    dirs.par.foreach(f => walk(f, callback, pick))
  }
}

Collecting the files using a fold instead of a foreach is not drastically harder, but I leave that as an exercise to the reader. (A ConcurrentLinkedQueue is probably fast enough to accept them all in a callback anyway unless you have really slow threads and a awesome filesystem.)

Happily answered 18/7, 2013 at 21:17 Comment(1)
Actually I hoped to get link to 'mature-ish' library that does that and has some additional futures, but Your example is enough for my current needs. Thanks!Ballplayer

© 2022 - 2024 — McMap. All rights reserved.