我做错了什么还是无法并行提取zip文件?

12

我创建了这个来测试并行提取:

    public static async Task ExtractToDirectoryAsync(this FileInfo file, DirectoryInfo folder)
    {

        ActionBlock<ZipArchiveEntry> block = new ActionBlock<ZipArchiveEntry>((entry) =>
        {
            var path = Path.Combine(folder.FullName, entry.FullName);

            Directory.CreateDirectory(Path.GetDirectoryName(path));
            entry.ExtractToFile(path);

        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

        using (var archive = ZipFile.OpenRead(file.FullName))
        {
            foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
            {
                block.Post(entry);
            }
            block.Complete();
            await block.Completion;
        }

    }

以下是用于测试的单元测试:

    [TestMethod]
    public async Task ExtractTestAsync()
    {
        if (Resources.LocalExtractFolder.Exists)
            Resources.LocalExtractFolder.Delete(true);
        //  Resources.LocalExtractFolder.Create();
        await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);
    }

当 MaxDegreeOfParallelism = 1 时,事情可以正常工作,但当为 2 时就不行。

Test Name:  ExtractTestAsync
Test FullName:  Composite.Azure.Tests.ZipFileTests.ExtractTestAsync
Test Source:    c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs : line 21
Test Outcome:   Failed
Test Duration:  0:00:02.4138753

Result Message: 
Test method Composite.Azure.Tests.ZipFileTests.ExtractTestAsync threw exception: 
System.IO.InvalidDataException: Unknown block type. Stream might be corrupted.
Result StackTrace:  
at System.IO.Compression.Inflater.Decode()
   at System.IO.Compression.Inflater.Inflate(Byte[] bytes, Int32 offset, Int32 length)
   at System.IO.Compression.DeflateStream.Read(Byte[] array, Int32 offset, Int32 count)
   at System.IO.Stream.InternalCopyTo(Stream destination, Int32 bufferSize)
   at System.IO.Stream.CopyTo(Stream destination)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName, Boolean overwrite)
   at System.IO.Compression.ZipFileExtensions.ExtractToFile(ZipArchiveEntry source, String destinationFileName)
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<>c__DisplayClass6.<ExtractToDirectoryAsync>b__3(ZipArchiveEntry entry) in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 37
   at System.Threading.Tasks.Dataflow.ActionBlock`1.ProcessMessage(Action`1 action, KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.ActionBlock`1.<>c__DisplayClass5.<.ctor>b__0(KeyValuePair`2 messageWithId)
   at System.Threading.Tasks.Dataflow.Internal.TargetCore`1.ProcessMessagesLoopCore()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Storage.Compression.ZipArchiveExtensions.<ExtractToDirectoryAsync>d__8.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Storage\Compression\ZipArchiveExtensions.cs:line 48
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()
   at Composite.Azure.Tests.ZipFileTests.<ExtractTestAsync>d__2.MoveNext() in c:\Development\C1\local\CompositeC1\Composite.Azure.Tests\ZipFileTests.cs:line 25
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.GetResult()

更新2

这是我自己尝试并行处理的结果,它也不能正常工作 :) 记得在continueWith中处理异常。

public static void ExtractToDirectorySemaphore(this FileInfo file, DirectoryInfo folder)
        {

            int MaxDegreeOfParallelism = 2;
            using (var archive = ZipFile.OpenRead(file.FullName))
            {

                var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                {
                    semaphore.WaitOne();

                    var task = Task.Run(() =>
                    {
                        var path = Path.Combine(folder.FullName, entry.FullName);

                        Directory.CreateDirectory(Path.GetDirectoryName(path));
                        entry.ExtractToFile(path);
                    });
                    task.ContinueWith(handle =>
                    {
                        try
                        {
                            //do any cleanup/post processing
                        }
                        finally
                        {
                            // Release the semaphore so the next thing can be processed
                            semaphore.Release();
                        }
                    });
                }
                while(MaxDegreeOfParallelism-->0)
                    semaphore.WaitOne(); //Wait here until the last task completes.


            }

        }

这里是异步版本:

public static Task ExtractToDirectorySemaphoreAsync(this FileInfo file, DirectoryInfo folder)
        {
            return Task.Factory.StartNew(() =>
            {
                int MaxDegreeOfParallelism = 50;
                using (var archive = ZipFile.OpenRead(file.FullName))
                {

                    var semaphore = new Semaphore(MaxDegreeOfParallelism, MaxDegreeOfParallelism);

                    foreach (var entry in archive.Entries.Where(e => e.Name != string.Empty))
                    {
                        semaphore.WaitOne();

                        var task = Task.Run(() =>
                        {
                            var path = Path.Combine(folder.FullName, entry.FullName);

                            Directory.CreateDirectory(Path.GetDirectoryName(path));
                            entry.ExtractToFile(path);
                        });
                        task.ContinueWith(handle =>
                        {
                            try
                            {
                                //do any cleanup/post processing
                            }
                            finally
                            {
                                // Release the semaphore so the next thing can be processed
                                semaphore.Release();
                            }
                        },TaskContinuationOptions.AttachedToParent); // the outher task will wait for all.
                    }

                }
            });
        }

更新 3

在 handle.Exception 中抛出以下异常。

{"Block length does not match with its complement."}  
[0] = {"A local file header is corrupt."}

需要确定ZipFile是否线程安全。


我怀疑存在竞态条件。如果压缩文件中的文件非常大,比如一兆字节或更多,那么看看您的并行(非异步)版本是否有效会很有趣。您能测试一下吗?创建一个包含几个1兆字节文件的zip文件? - Jim Mischel
那么无法并行提取吗? - Poul K. Sørensen
有趣。ZipFile类(以及扩展方法)的文档表明这些东西应该是线程安全的。我认为这意味着您可以同时提取多个文件。 - Jim Mischel
7
ZipArchiveEntry 有一个指向 ZipFile 的引用(以便进行提取或删除),因此当您调用 ExtractToFile 时,您最终会在多个线程上同时使用原始的 ZipFile,这是不允许的。 - Raymond Chen
1
RaymondChen所说的是:但是你可以通过复制输入流(即一些byte[])并创建单独的ZipFiles,然后并行提取它们作为解决方案(显然需要手动分区文件条目) :) - leppie
显示剩余6条评论
5个回答

4

免责声明:这仅是一个概念证明。

在代码示例中将ZipFile.OpenRead替换为ParallelZipFile.OpenRead后,所有4个单元测试均通过。

   public class ParallelZipFile
    {
        public static ParallelZipArchive OpenRead(string path)
        {

            return new ParallelZipArchive(ZipFile.OpenRead(path),path);
        }
    }
    public class ParallelZipArchive : IDisposable
    {
        internal ZipArchive _archive;
        internal string _path;
        internal ConcurrentQueue<ZipArchive> FreeReaders = new ConcurrentQueue<ZipArchive>();

        public ParallelZipArchive(ZipArchive zip,string path)
        {
            _path = path;
            _archive = zip;
            FreeReaders.Enqueue(zip);
        }

        public ReadOnlyCollection<ParallelZipArchiveEntry> Entries
        {
            get
            {
                var list = new List<ParallelZipArchiveEntry>(_archive.Entries.Count);
                int i = 0;
                foreach (var entry in _archive.Entries)
                    list.Add(new ParallelZipArchiveEntry(i++, entry, this));

                return  new ReadOnlyCollection<ParallelZipArchiveEntry>(list);
            }
        }


        public void Dispose()
        {
            foreach (var archive in FreeReaders)
                archive.Dispose();
        }
    }
    public class ParallelZipArchiveEntry
    {
        private ParallelZipArchive _parent;
        private int _entry;
        public string Name { get; set; }
        public string FullName { get; set; }

        public ParallelZipArchiveEntry(int entryNr, ZipArchiveEntry entry, ParallelZipArchive parent)
        {
            _entry = entryNr;
            _parent = parent;
            Name = entry.Name;
            FullName = entry.FullName;
        }

        public void ExtractToFile(string path)
        {
            ZipArchive value;
            Trace.TraceInformation(string.Format("Number of readers: {0}", _parent.FreeReaders.Count));

            if (!_parent.FreeReaders.TryDequeue(out value))
                value = ZipFile.OpenRead(_parent._path);

            value.Entries.Skip(_entry).First().ExtractToFile(path);



            _parent.FreeReaders.Enqueue(value);
        }
    }

单元测试

[TestClass]
    public class ZipFileTests
    {
        [ClassInitialize()]
        public static void PreInitialize(TestContext context)
        {
            if (Resources.LocalExtractFolderTruth.Exists)
                Resources.LocalExtractFolderTruth.Delete(true);

            ZipFile.ExtractToDirectory(Resources.WebsiteZip.FullName, Resources.LocalExtractFolderTruth.FullName);
        }

        [TestInitialize()]
        public void InitializeTests()
        {
            if (Resources.LocalExtractFolder.Exists)
                Resources.LocalExtractFolder.Delete(true);

        }

        [TestMethod]
        public void ExtractTest()
        {

            Resources.WebsiteZip.ExtractToDirectory(Resources.LocalExtractFolder);

            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
                Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));

        }
        [TestMethod]
        public async Task ExtractAsyncTest()
        {

            await Resources.WebsiteZip.ExtractToDirectoryAsync(Resources.LocalExtractFolder);

            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        }
        [TestMethod]
        public void ExtractSemaphoreTest()
        {

            Resources.WebsiteZip.ExtractToDirectorySemaphore(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        }
        [TestMethod]
        public async Task ExtractSemaphoreAsyncTest()
        {

            await Resources.WebsiteZip.ExtractToDirectorySemaphoreAsync(Resources.LocalExtractFolder);
            Assert.IsTrue(Helpers.DirectoryTools.CompareDirectories(
               Resources.LocalExtractFolderTruth, Resources.LocalExtractFolder));
        }

    }

.NET框架中内置的ZipArchive类型是否有类似的线程安全方法? - SuperJMN
这个解决方案无法从ZipArchive缓存的条目列表中受益。如果zip文件有许多小文件,则此解决方案比顺序读取更慢。 - dube

2

我最近做了同样的任务,以下是我的结果:

我在Xeon 1socket、8核、16cpu、32GB RAM、SSD驱动器上使用了DotNetZip.reduced (Ionic.Zip.Reduced.dll v1.9.1.8)

压缩文件 | 打包大小 | 解压文件数 | 解压大小

  • SmallFile1 | 778 MB | 4,926个文件 | 1.4 GB
  • LargeFile2 | 6 GB | 29,557个文件 | 10.0 GB

我有5种方法:第一种方法使用单线程完成所有工作,其余4种方法使用PLINQ和TPL Parallel类。

胜利者是V4和V5,它们的速度比V1快六倍。下面是详细的结果和代码。

  • V1使用ExtractAll
  • V2并行提取条目(不是线程安全的)
  • V3通过为每个条目打开新文件句柄并行提取条目
  • V4使用仅N+1个文件句柄并行提取条目
  • V5最终版本

性能结果表格 压缩文件 | V1,秒 | V2,秒 | V3,秒 | V4,秒 | V5,秒

  • SmallFile1 | 32 | Exception | 8 | 8 | 5
  • LargeFile2 | 200 | Exception | 2000 | 35 | 30

小文件处理 Small file processing

V1处理大文件 Large file processing by V1

V4处理大文件 Large file processing by V4

V5处理大文件 Large file processing by V5

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Ionic.Zip;
using Ionic.Zlib;

namespace A1
{
    public static class Program
    {
        static void Main(string[] args)
        {
            Console.ReadKey();

            CancellationToken cancellationToken = CancellationToken.None;

            string path = @"e:\1\";
            string zf1 = Path.Combine(path, "1.zip");
            string zf2 = Path.Combine(path, "2.zip");
            Stopwatch sw = new Stopwatch();

            List<string> zipFiles = new List<string>
            {
                zf1,
                zf2,
            };
            List<Action<string, string, CancellationToken>> methods = new List<Action<string, string, CancellationToken>>
            {
                ExtractAllFilesFromZipFileV1,
                //ExtractAllFilesFromZipFileV2,
                //ExtractAllFilesFromZipFileV3,
                ExtractAllFilesFromZipFileV4,
                ExtractAllFilesFromZipFileV5,
                ExtractAllFilesFromZipFileV4,
                ExtractAllFilesFromZipFileV5,
                ExtractAllFilesFromZipFileV4,
                ExtractAllFilesFromZipFileV5,
            };

            zipFiles.Reverse();
            methods.Reverse();

            zipFiles.ForEach(f => methods.ForEach(m =>
            {
                string fileName = Path.GetFileName(f);
                string targetDirectory = path + Guid.NewGuid().ToString("N");
                sw.Restart();
                // Unzip
                try
                {
                    m(f, targetDirectory, cancellationToken);
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
                sw.Stop();
                Console.WriteLine("{0} processed by {1} in {2} seconds", fileName, m.GetMethodInfo().Name, sw.Elapsed.TotalSeconds.ToString("F3"));
                Thread.Sleep(5 * 1000);
                Directory.Delete(targetDirectory, true);
                Thread.Sleep(5 * 1000);
            }));
        }

        private static void ExtractAllFilesFromZipFileV1(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                zipFile.ExtractAll(targetDirectory);
            }
        }

        private static void ExtractAllFilesFromZipFileV2(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                zipFile.Entries
                    .AsParallel()
                    .ForAll(v =>
                    {
                        v.Extract(targetDirectory);
                    });
            }
        }


        private static void ExtractAllFilesFromZipFileV3(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                int count = zipFile.Entries.Count;

                Enumerable.Range(0, count)
                    .AsParallel()
                    .ForAll(v =>
                    {
                        cancellationToken.ThrowIfCancellationRequested();

                        using (ZipFile zf = new ZipFile(zipFileName))
                        {
                            // Get the right entry to extract
                            zf.Entries
                                .Skip(v)
                                .First()
                                .Extract(targetDirectory);
                        }
                    });
            }
        }

        private static void ExtractAllFilesFromZipFileV4(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                // Get count of files, files and keep the lock on the file
                int count = zipFile.Entries.Count();

                // Cache instances of ZipFile used by threads
                // Make sure that we have only open zip file not more than N times, where N is maxDop.
                ConcurrentDictionary<int, ZipFile> dictionary = new ConcurrentDictionary<int, ZipFile>();

                try
                {
                    Parallel.For(0, count,
                        () =>
                        {
                            // GetOrAdd. Use existing open ZipFile or open a new one for this thread.
                            return dictionary.GetOrAdd(Thread.CurrentThread.ManagedThreadId, v =>
                            {
                                return new ZipFile(zipFileName);
                            });
                        },
                        (int i, ParallelLoopState loopState, ZipFile zf) =>
                        {
                            cancellationToken.ThrowIfCancellationRequested();

                            // Get the right entry to extract
                            ZipEntry entry = zf.Entries
                                .Skip(i)
                                .First();

                            // Extract to a file
                            entry.Extract(targetDirectory);

                            return zf;
                        },
                        zf =>
                        {
                        });
                }
                finally
                {
                    // Dispose cached ZipFiles
                    foreach (ZipFile zf in dictionary.Values)
                    {
                        zf.Dispose();
                    }
                }
            } // using
        }

        private static void ExtractAllFilesFromZipFileV5(string zipFileName, string targetDirectory, CancellationToken cancellationToken)
        {
            using (ZipFile zipFile = new ZipFile(zipFileName))
            {
                // Get count of files, files and keep the lock on the file
                ICollection<ZipEntry> zipEntries = zipFile.Entries;
                int count = zipEntries.Where(v => !v.IsDirectory).Count();

                // Caclulate max DOP
                int maxDop = (int)1.5 * Math.Min(count, Environment.ProcessorCount);


                List<Tuple<int, long>> entries = zipEntries
                    .Select((v, i) => Tuple.Create(i, v))
                    .Where(v => !v.Item2.IsDirectory)
                    .Select(v => Tuple.Create(v.Item1, v.Item2.UncompressedSize))
                    .ToList();

                // Load balance between threads
                List<List<Tuple<int, long>>> groupedItems = entries.ToBuckets(maxDop, v => v.Item2 + 10 * 1024 * 1024).ToList();

                // Ensure seq reading from zip file.
                for (int i = 0; i < groupedItems.Count; ++i)
                {
                    groupedItems[i] = groupedItems[i].OrderBy(v => v.Item1).ToList();
                }

                // Cache instances of ZipFile used by threads
                // Make sure that we have open zip file not more than N times, where N is maxDop.
                ConcurrentDictionary<int, Tuple<ZipFile, List<ZipEntry>>> dictionary = new ConcurrentDictionary<int, Tuple<ZipFile, List<ZipEntry>>>(maxDop, maxDop);
                ParallelOptions parallelOptions = new ParallelOptions { MaxDegreeOfParallelism = maxDop, };

                try
                {
                    Parallel.For(0, maxDop, parallelOptions,
                        () =>
                        {
                            // GetOrAdd. Re-use existing open ZipFile or open a new one for this thread.
                            return dictionary.GetOrAdd(Thread.CurrentThread.ManagedThreadId, v =>
                            {
                                ZipFile zf = new ZipFile(zipFileName) { ExtractExistingFile = ExtractExistingFileAction.Throw, FlattenFoldersOnExtract = false, ZipErrorAction = ZipErrorAction.Throw, };
                                zf.ExtractProgress += (sender, e) =>
                                {
                                    cancellationToken.ThrowIfCancellationRequested();
                                };
                                return Tuple.Create(zf, zf.Entries.ToList());
                            });
                        },
                        (int j, ParallelLoopState loopState, Tuple<ZipFile, List<ZipEntry>> zf) =>
                        {

                            List<Tuple<int, long>> list = groupedItems[j];
                            for (int n = 0; n < list.Count; ++n)
                            {
                                cancellationToken.ThrowIfCancellationRequested();

                                int i = list[n].Item1;

                                // Get the right entry to extract
                                ZipEntry entry = zf.Item2[i];
                                Debug.Assert(entry.UncompressedSize == list[n].Item2);

                                // Extract to a file
                                entry.Extract(targetDirectory);
                            }

                            return zf;
                        },
                        zf =>
                        {
                        });
                }
                finally
                {
                    // Dispose cached ZipFiles
                    foreach (Tuple<ZipFile, List<ZipEntry>> zf in dictionary.Values)
                    {
                        try
                        {
                            zf.Item2.Clear();
                            zf.Item1.Dispose();
                        }
                        catch (ZlibException)
                        {
                            // There is a well known defect in Ionic.ZLib
                            // This exception may happen when you read only part of file (not entire file)
                            // and close its handle.
                            // Ionic.Zlib.ZlibException: Bad CRC32 in GZIP trailer. (actual(D202EF8D)!=expected(A39D1010))
                        }
                    }
                }
            }
        }


        private static IEnumerable<List<T>> ToBuckets<T>(this IEnumerable<T> list, int bucketCount, Func<T, long> getWeight)
        {
            List<T> sortedList = list.OrderByDescending(v => getWeight(v)).ToList();

            List<long> runningTotals = Enumerable.Repeat(0L, bucketCount).ToList();
            List<List<T>> buckets = Enumerable.Range(0, bucketCount)
                .Select(v => new List<T>(sortedList.Count / bucketCount))
                .ToList();

            foreach (T item in sortedList)
            {
                // MinBy runningTotal
                int i = runningTotals.IndexOfMin();
                // Add to bucket
                runningTotals[i] += getWeight(item);
                buckets[i].Add(item);
            }

            return buckets;
        }

        public static int IndexOfMin<T>(this IEnumerable<T> source, IComparer<T> comparer = null)
        {
            if (source == null)
            {
                throw new ArgumentNullException(nameof(source));
            }

            if (comparer == null)
            {
                comparer = Comparer<T>.Default;
            }

            using (IEnumerator<T> enumerator = source.GetEnumerator())
            {
                if (!enumerator.MoveNext())
                {
                    return -1; // or maybe throw InvalidOperationException
                }

                int minIndex = 0;
                T minValue = enumerator.Current;

                int index = 0;
                while (enumerator.MoveNext())
                {
                    ++index;
                    if (comparer.Compare(enumerator.Current, minValue) < 0)
                    {
                        minIndex = index;
                        minValue = enumerator.Current;
                    }
                }

                return minIndex;
            }
        }

        public static int IndexOfMinBy<TSource, TKey>(this IEnumerable<TSource> source, Func<TSource, TKey> selector, IComparer<TKey> comparer = null)
        {
            if (source == null)
            {
                throw new ArgumentNullException(nameof(source));
            }

            if (comparer == null)
            {
                comparer = Comparer<TKey>.Default;
            }

            using (IEnumerator<TSource> enumerator = source.GetEnumerator())
            {
                if (!enumerator.MoveNext())
                {
                    return -1; // or maybe throw InvalidOperationException
                }

                int minIndex = 0;
                TKey minValue = selector(enumerator.Current);

                int index = 0;
                while (enumerator.MoveNext())
                {
                    ++index;
                    TKey value = selector(enumerator.Current);
                    if (comparer.Compare(value, minValue) < 0)
                    {
                        minIndex = index;
                        minValue = value;
                    }
                }

                return minIndex;
            }
        }
    }
}

1
问题在于你只使用一个句柄打开文件。一个句柄只有一个读取位置,如果在同一个句柄上进行并行读取,读取位置会混乱。请使用多个句柄多次打开文件,这样就可以解决问题。

1
我同意Floste的答案,但我建议采用不同的方法。如果您的文件在zip文件中的大小约为50k:
1)为每个文件创建一个字节数组队列。 2)队列的每个成员都是已提取的zip文件条目。 3)尝试将文件从zip文件中提取到字节数组中,并在提取完成后将其添加到队列中。 4)提取线程应该是单线程,没有并行性。 5)当提取线程正在执行其工作时,创建另一个线程/任务来清空队列。这些任务将从队列中获取提取的数据并将其写入磁盘。由于它们是不同的文件,因此不会出现竞争条件或不可用资源。
可能需要对队列进行互斥或锁定。
这可能不是最好的方法,但我相信您会得到一些速度。

1

我需要对一个大型压缩文件进行并行解压(约30GB,约45k个变量大小的条目),并想到了这个利用DotNetZip的解决方案:

    public static void ParallelExtract(
        string archivePath,
        string destinationPath,
        string password,
        CancellationToken token,
        ProgressReportDelegate progress // Could also be Progress<T> or whatever you prefer.
        )
    {
        if (String.IsNullOrEmpty(archivePath))
            throw new ArgumentNullException("archivePath");

        if (String.IsNullOrEmpty(destinationPath))
            throw new ArgumentNullException("destinationPath");

        Stopwatch elapsed = new Stopwatch();
        Stopwatch progressReportingTimer = new Stopwatch();

        elapsed.Start();
        progressReportingTimer.Start();

        object obj = new object();

        int count = -1;
        long bytesExtracted = 0;
        long bytesTotal = -1;

        List<Task> decompressors = new List<Task>();

        for (int i = 0; i < Environment.ProcessorCount; i++)
        {
            decompressors.Add(Task.Run(() =>
            {
                using (ZipFile zipFile = new ZipFile(archivePath))
                {
                    if (!String.IsNullOrEmpty(password))
                        zipFile.Password = password;

                    zipFile.ExtractProgress += delegate (object zipSender, ExtractProgressEventArgs zipArgs)
                    {
                        // Report progress after each EntryBytesWritten event, as long as it's been at least 250ms since the last report, so as to not overwhelm listeners like a progress bar. 
                        // Fire regardless upon completion (bytesExtracted == bytesTotal) to provide a final update before finishing.
                        if ((zipArgs.EventType == ZipProgressEventType.Extracting_EntryBytesWritten && progressReportingTimer.ElapsedMilliseconds >= 250) || bytesExtracted == bytesTotal)
                        {
                            int percentage = Percentage(bytesExtracted, bytesTotal);

                            lock (obj)
                            {
                                progress?.Invoke(); // <-- Handle your progress updates here.

                                progressReportingTimer.Restart();
                            }
                        }
                    };

                    // Block all threads until we sum the total size of all entries so that when we begin processing on the threadpool we 
                    // can report progress relative to the total.
                    lock (obj)
                        if (bytesTotal == -1)
                            foreach (var entry in zipFile.Entries)
                                bytesTotal += entry.CompressedSize;

                    var array = zipFile.Entries.ToArray();

                    int index;
                    ZipEntry zipEntry;

                    // Iterate through the archive's entries sequentially despite being on multiple threads.
                    while (count < zipFile.Entries.Count && !token.IsCancellationRequested)
                    {
                        index = Interlocked.Increment(ref count);

                        if (index >= zipFile.Entries.Count)
                            return;

                        zipEntry = array[index];

                        Interlocked.Add(ref bytesExtracted, zipEntry.CompressedSize);

                        zipEntry.Extract(destinationPath, ExtractExistingFileAction.OverwriteSilently);
                    }
                }
            },
            token
            ));
        }

        Task.WaitAll(decompressors.ToArray());
    }

结果:

硬件配置:Intel Core i7-4710HQ @ 3.50 GHz(4核心,8线程),16 GB RAM,(SATA)SSD,Win10x64 1903:

存档:44.5k 条目,~30 GB DEFLATE(存储)

Threads:    Time:
-----------------
1           35:20 (NOTE: This is virtually identical to ZipArchive.ExtractAll())
2           22:14
3           18:40
4           16:49
8           14:42

1
@maytham-ɯɐɥʇʎɐɯ 谢谢;这是我编辑注释时不小心破坏的结果。 - Justin Shidell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接