遍历文件树的并行版本(Java或Scala)

10

有没有人知道任何Java Files.walkFileTree的并行等效方法或类似的东西?可以是Java或Scala库。

有没有任何Java Files.walkFileTree 的并行等效方法或类似工具呢?可以使用Java或Scala库来实现。

2
我认为这没有意义,因为所有并行线程都会有相同的瓶颈 - 硬盘驱动器。而且它无法像网络IO操作一样并行化。 - aim
为什么并行遍历文件树是个好主意?这通常是IO绑定,而不是CPU绑定。 - Rex Kerr
1
在我的情况下,文件处理是 CPU 绑定的,I/O 利用率大约在 10%-20%。 - matt
3个回答

14

正如其他人指出的那样,遍历文件树几乎肯定是IO绑定而不是CPU绑定,因此进行多线程文件树遍历的好处是可疑的。但如果你真的想要,你可能可以使用ForkJoinPool或类似工具自己编写。

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class MultiThreadedFileTreeWalk {
    private static class RecursiveWalk extends RecursiveAction {
        private static final long serialVersionUID = 6913234076030245489L;
        private final Path dir;

        public RecursiveWalk(Path dir) {
            this.dir = dir;
        }

        @Override
        protected void compute() {
            final List<RecursiveWalk> walks = new ArrayList<>();
            try {
                Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
                    @Override
                    public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
                        if (!dir.equals(RecursiveWalk.this.dir)) {
                            RecursiveWalk w = new RecursiveWalk(dir);
                            w.fork();
                            walks.add(w);

                            return FileVisitResult.SKIP_SUBTREE;
                        } else {
                            return FileVisitResult.CONTINUE;
                        }
                    }

                    @Override
                    public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                        System.out.println(file + "\t" + Thread.currentThread());
                        return FileVisitResult.CONTINUE;
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }

            for (RecursiveWalk w : walks) {
                w.join();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        RecursiveWalk w = new RecursiveWalk(Paths.get(".").toRealPath());
        ForkJoinPool p = new ForkJoinPool();
        p.invoke(w);
    }
}

这个示例在单独的线程上遍历每个目录。这是Java 7的fork/join库的教程。


1
如果需要对每个元素执行某些功能,根据过去的经验,在遍历文件树并同时在每个节点上执行任务与串行执行相比,可以获得显著的性能提升。 - Hazok
1
@Hazok 这取决于功能。如果功能非常消耗CPU,那么它可能会超过遍历文件树的IO限制。如果是这种情况,那么让你的代码并发可能是值得的。然而,并不总是这种情况。 - Jeffrey
1
同意,这就是为什么我对该声明进行了限定。我只是想指出,在回答中它被称为“有问题”,但实际上有些情况下可以实现性能提升。 - Hazok

4
这个练习既不像Scala答案那样简短,也不像Java答案那样类似Java。
这里的想法是使用类似每个设备一个线程的方式开始并行步行。
步行者在ForkJoinPool线程上,因此当它们为每个路径测试启动未来时,这些未来是池中的分叉任务。目录测试在读取目录查找文件时使用受控阻塞。
通过完成取决于未来路径测试的承诺来返回结果。(这里没有机制来检测空手完成。)
更有趣的测试将包括读取zip文件,因为解压缩将消耗一些CPU。
我想知道paulp是否会用深度列出做些聪明的事情
import util._
import collection.JavaConverters._
import concurrent.{ TimeoutException => Timeout, _ }
import concurrent.duration._
import ExecutionContext.Implicits._
import java.io.IOException
import java.nio.file.{ FileVisitResult => Result, _ }
import Result.{ CONTINUE => Go, SKIP_SUBTREE => Prune, TERMINATE => Stop }
import java.nio.file.attribute.{ BasicFileAttributes => BFA }

object Test extends App {
  val fileSystem = FileSystems.getDefault
  val starts = (if (args.nonEmpty) args.toList else mounts) map (s => (fileSystem getPath s))
  val p = Promise[(Path, BFA)]

  def pathTest(path: Path, attrs: BFA) =
    if (attrs.isDirectory ) {
      val entries = blocking {
        val res = Files newDirectoryStream path
        try res.asScala.toList finally res.close()
      }
      List("hello","world") forall (n => entries exists (_.getFileName.toString == n))
    } else {
      path.getFileName.toString == "enough"
    }

  def visitor(root: Path) = new SimpleFileVisitor[Path] {
    def stopOrGo = if (p.isCompleted) Stop else Go
    def visiting(path: Path, attrs: BFA) = {
      future { pathTest(path, attrs) } onComplete {
        case Success(true) => p trySuccess (path, attrs)
        case Failure(e)    => p tryFailure e
        case _             =>
      }
      stopOrGo
    }
    override def preVisitDirectory(dir: Path, attrs: BFA) = (
      if ((starts contains dir) && dir != root) Prune
      else visiting(dir, attrs)
    )
    override def postVisitDirectory(dir: Path, e: IOException) = {
      if (e != null) p tryFailure e
      stopOrGo
    }
    override def visitFile(file: Path, attrs: BFA) = visiting(file, attrs)
  }
  //def walk(p: Path): Path = Files walkFileTree (p, Set().asJava, 10, visitor(p))
  def walk(p: Path): Path = Files walkFileTree (p, visitor(p))

  def show(store: FileStore) = {
    val ttl   = store.getTotalSpace / 1024
    val used  = (store.getTotalSpace - store.getUnallocatedSpace) / 1024
    val avail = store.getUsableSpace / 1024
    Console println f"$store%-40s $ttl%12d $used%12d $avail%12d"
    store
  }
  def mounts = {
    val devs = for {
      store <- fileSystem.getFileStores.asScala
      if store.name startsWith "/dev/"
      if List("ext4","fuseblk") contains store.`type`
    } yield show(store)
    val devstr = """(\S+) \((.*)\)""".r
    (devs.toList map (_.toString match {
      case devstr(name, dev) if devs.toList exists (_.name == dev) => Some(name)
      case s => Console println s"Bad dev str '$s', skipping" ; None
    })).flatten
  }

  starts foreach (f => future (walk(f)))

  Try (Await result (p.future, 20.seconds)) match {
    case Success((name, attrs)) => Console println s"Result: ${if (attrs.isDirectory) "dir" else "file"} $name"
    case Failure(e: Timeout)    => Console println s"No result: timed out."
    case Failure(t)             => Console println s"No result: $t."
  }
}

感谢您花费如此多的时间编写这段代码。我决定接受Rex Kerr的解决方案,因为它非常简洁,易于调试。 - matt
@lucek Rex是最棒的。谢谢你的问题,探索API很有趣。我也点赞了其他的答案。 - som-snytt

3

假设在每个文件上执行回调足够了。

这段代码不能处理文件系统中的循环--你需要一个记录已访问位置的注册表(例如java.util.concurrent.ConcurrentHashMap)。你可以添加各种改进,如报告异常而不是默默忽略它们。

import java.io.File
import scala.util._
def walk(f: File, callback: File => Unit, pick: File => Boolean = _ => true) {
  Try {
    val (dirs, fs) = f.listFiles.partition(_.isDirectory)
    fs.filter(pick).foreach(callback)
    dirs.par.foreach(f => walk(f, callback, pick))
  }
}

使用折叠(fold)而不是 foreach 循环收集文件并不怎么难,但我把它留给读者作为练习。(ConcurrentLinkedQueue 可能足够快,能在回调函数中接受所有文件,除非你的线程非常慢,而文件系统则非常好。)


实际上,我希望得到一个“成熟的”库的链接,该库可以完成这项工作并具有一些额外的功能,但您的示例已经满足了我的当前需求。谢谢! - matt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接