如何使用Java进行多线程解压大型文件夹 - 最好使用Java8?

11

参考网址: http://www.pixeldonor.com/2013/oct/12/concurrent-zip-compression-java-nio/

我正在尝试解压缩一个大小为 5GB 的压缩文件,平均需要花费约 30 分钟的时间,对于我们的应用程序来说这是很多的,我试图缩短时间。

我已经尝试了很多组合方式,改变了缓冲区大小(默认情况下我的写入块为 4096 字节),改变了 NIO 方法、库,但所有的结果都差不多。

仍未尝试的一件事是将压缩文件分成块,因此使用多线程块读取它。

代码片段如下:

  private static ExecutorService e = Executors.newFixedThreadPool(20);
  public static void main(String argv[]) {
        try {
            String selectedZipFile = "/Users/xx/Documents/test123/large.zip";
            String selectedDirectory = "/Users/xx/Documents/test2";
            long st = System.currentTimeMillis();

            unzip(selectedDirectory, selectedZipFile);

            System.out.println(System.currentTimeMillis() - st);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


public static void unzip(String targetDir, String zipFilename) {
    ZipInputStream archive;
            try {
                List<ZipEntry> list = new ArrayList<>();
                archive = new ZipInputStream(new BufferedInputStream(new FileInputStream(zipFilename)));
                ZipEntry entry;
                while ((entry = archive.getNextEntry()) != null) {
                    list.add(entry);
                }

                for (List<ZipEntry> partition : Lists.partition(list, 1000)) {
                    e.submit(new Multi(targetDir, partition, archive));
                }
            } catch (Exception e){
                e.printStackTrace();
            }
}

可执行的内容为:

  static class Multi implements Runnable {

    private List<ZipEntry> partition;
    private ZipInputStream zipInputStream;
    private String targetDir;

    public Multi(String targetDir, List<ZipEntry> partition, ZipInputStream zipInputStream) {
        this.partition = partition;
        this.zipInputStream = zipInputStream;
        this.targetDir = targetDir;
    }

    @Override
    public void run() {
        for (ZipEntry entry : partition) {
            File entryDestination = new File(targetDir, entry.getName());
            if (entry.isDirectory()) {
                entryDestination.mkdirs();
            } else {
                entryDestination.getParentFile().mkdirs();

                BufferedOutputStream output = null;
                try {
                    int n;
                    byte buf[] = new byte[BUFSIZE];
                    output = new BufferedOutputStream(new FileOutputStream(entryDestination), BUFSIZE);
                    while ((n = zipInputStream.read(buf, 0, BUFSIZE)) != -1) {
                        output.write(buf, 0, n);
                    }
                    output.flush();


                } catch (FileNotFoundException e1) {
                    e1.printStackTrace();
                } catch (IOException e1) {
                    e1.printStackTrace();
                } finally {

                    try {
                        output.close();
                    } catch (IOException e1) {
                        e1.printStackTrace();
                    }

                }
            }
        }
    }
}

但出于某种原因,它仅存储目录而不包含文件内容...

我的问题是:根据上述“压缩”文章的方式,如何以多线程的方式在大型zip文件中制作块?

2个回答

8

ZipInputStream是一条数据流,无法分割。

如果您想要多线程解压缩,需要使用ZipFile。在Java 8中,您甚至可以免费获得多线程功能。

public static void unzip(String targetDir, String zipFilename) {
    Path targetDirPath = Paths.get(targetDir);
    try (ZipFile zipFile = new ZipFile(zipFilename)) {
        zipFile.stream()
               .parallel() // enable multi-threading
               .forEach(e -> unzipEntry(zipFile, e, targetDirPath));
    } catch (IOException e) {
        throw new RuntimeException("Error opening zip file '" + zipFilename + "': " + e, e);
    }
}

private static void unzipEntry(ZipFile zipFile, ZipEntry entry, Path targetDir) {
    try {
        Path targetPath = targetDir.resolve(Paths.get(entry.getName()));
        if (Files.isDirectory(targetPath)) {
            Files.createDirectories(targetPath);
        } else {
            Files.createDirectories(targetPath.getParent());
            try (InputStream in = zipFile.getInputStream(entry)) {
                Files.copy(in, targetPath, StandardCopyOption.REPLACE_EXISTING);
            }
        }
    } catch (IOException e) {
        throw new RuntimeException("Error processing zip entry '" + entry.getName() + "': " + e, e);
    }
}

您可能还想查看这个答案,它使用FileSystem访问zip文件内容,以获得真正的Java 8体验。

检查你的评论 :) 顺便问一下,哪个可以给出最低的处理时间?是“步行”还是你的答案? - VitalyT
@VitalyT 运行时将严重依赖目标系统 - 主要是IO速度,CPU核心数量和CPU速度 - 时间将在不同机器之间有很大变化。 - Hulk
谢谢,我知道这个。我正在尝试在同一台机器上使用不同的场景进行测量,以找出理想的参数...我需要从概念上理解最佳的 unzip 多线程实践。目前它减少了 10%...但这还不够... - VitalyT
1
@VitalyT 多线程可能不会有太大帮助,除非CPU是性能瓶颈。更有可能的是你的硬盘跟不上,因此多线程只意味着更多的线程在等待磁盘。 - Andreas

0
这里是一个利用FileSystem的并行版本。你应该稍微调整一下它(例如实际使用流式处理,添加错误处理)。但它应该是一个不错的起点。
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class ParallelUnzip {

    static class UnzipVisitor extends SimpleFileVisitor<Path> {
        private Consumer<Path> unzipper;

        public UnzipVisitor(Consumer<Path> unzipper) {
            this.unzipper = unzipper;
        }
        @Override
        public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
            if (Files.isRegularFile(file)) {
                unzipper.accept(file);
            }
            return FileVisitResult.CONTINUE;
        }
    }

    // I would not risk creating directories in parallel, so adding synchronized here
    synchronized static void createDirectories(Path path) throws IOException {
        if (!Files.exists(path.getParent())) {
            Files.createDirectories(path.getParent());
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {

        FileSystem fs = FileSystems.newFileSystem(URI.create("jar:file:/tests.zip"), new HashMap<>());
        Path root = fs.getRootDirectories().iterator().next();
        Path target = Paths.get("target");

        ExecutorService executor = Executors.newFixedThreadPool(2);

        Files.walkFileTree(root, new UnzipVisitor((path) -> {
            System.out.println(Thread.currentThread().getName() + " " + path.toAbsolutePath().toString());

            executor.submit(() -> {
                try {
                    Path t = target.resolve(path.toString().substring(1));

                    createDirectories(t);

                    System.out.println("Extracting with thread " + Thread.currentThread().getName() + " File: "
                            + path.toAbsolutePath().toString() + " -> " + t.toAbsolutePath().toString());
                    // Should be using streaming here
                    byte[] bytes = Files.readAllBytes(path);
                    Files.write(t, bytes);
                } catch (Exception ioe) {
                    ioe.printStackTrace();
                    throw new RuntimeException(ioe);
                }
            });

        }));

        executor.shutdown();
        executor.awaitTermination(1000, TimeUnit.SECONDS);
    }
}

@k5_ 谢谢,正在尝试您的解决方案...希望它能将解压时间减少超过10%... :) - VitalyT
@k5_,顺便说一下,我看到“FileSystem fs = FileSystems.newFileSystem(URI.create("jar:file:/tests.zip"), new HashMap<>());”需要大约5分钟才能加载...你为什么需要这个?不像之前的评论那样使用Files或Path更容易吗? - VitalyT

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接