Parallel.ForEach在处理一个包含大对象的可枚举对象时可能会引发"内存不足"异常。

73
我正在尝试将数据库中存储的图像迁移到指向硬盘上文件的数据库记录。我试图使用Parallel.ForEach来加速这个过程,使用此方法查询数据。但是,我发现我得到了一个OutOfMemory异常。我知道Parallel.ForEach会查询一批可枚举对象以减少开销的成本,如果有一个用于间隔查询的开销(因此,如果您一次执行大量查询而不是将它们分散开来,源代码更可能在内存中缓存下一个记录)。问题是我返回的其中一个记录是一个1-4MB的字节数组,缓存导致整个地址空间被耗尽(程序必须在x86模式下运行,因为目标平台将是32位机器)。是否有任何方法可以禁用TPL的缓存或使其更小?以下是一个示例程序,用于演示此问题。如果需要太长时间或在您的计算机上没有发生此问题,请增加数组大小(我发现1<<20在我的计算机上大约需要30秒,而4<<20几乎是瞬间完成)。
class Program
{

    static void Main(string[] args)
    {
        Parallel.ForEach(CreateData(), (data) =>
            {
                data[0] = 1;
            });
    }

    static IEnumerable<byte[]> CreateData()
    {
        while (true)
        {
            yield return new byte[1 << 20]; //1Mb array
        }
    }
}

这个程序运行时有多少个线程是活跃的?设置ParallelOptions.MaxDegreeOfParallelism值会有帮助吗? - Kevin Pullin
@Kevin Pullin,在异常发生时,有9个任务正在运行(我在四核处理器上运行此代码示例)。将其设置为最大值2和数组大小为4Mb,它稳定在约64Mb的工作集。将此发布为答案,我会点赞。我认为这样做或不使用TPL可能是我的唯一选择。我会让它在这些设置下运行一夜,看看是否仍然会出现异常。 - Scott Chamberlain
4个回答

109
Parallel.ForEach的默认选项只适用于CPU密集型任务并且能够线性扩展。当任务是CPU密集型时,一切都很完美。如果你有一个四核处理器且没有其他进程在运行,则Parallel.ForEach会使用全部四个处理器。但如果你的计算机上有其他进程占用了一个完整的CPU,则Parallel.ForEach仅使用大约三个处理器。
但如果任务不是CPU密集型,则Parallel.ForEach会不断启动任务,尝试保持所有CPU忙碌。然而,无论有多少任务在并行运行,总会有更多未使用的CPU资源,因此它会继续创建任务。
如何判断您的任务是否为CPU密集型?希望只需检查即可得知。如果您正在分解质数,则很明显。但其他情况可能不太明显。判断任务是否为CPU密集型的实验方法是使用ParallelOptions.MaximumDegreeOfParallelism限制最大并行度,并观察程序的行为。如果您的任务是CPU密集型,则在四核系统上应该看到以下模式:
  • ParallelOptions.MaximumDegreeOfParallelism = 1:使用一个完整的CPU或25%的CPU利用率
  • ParallelOptions.MaximumDegreeOfParallelism = 2:使用两个CPU或50%的CPU利用率
  • ParallelOptions.MaximumDegreeOfParallelism = 4:使用全部CPU或100%的CPU利用率
如果程序表现出这样的行为,则可以使用默认的Parallel.ForEach选项并获得良好的结果。线性CPU利用率意味着良好的任务调度。
但如果我在我的Intel i7上运行您的示例应用程序,无论我设置什么最大并行度,CPU利用率都只有约20%。为什么会这样?因为太多的内存被分配了,垃圾收集器正在阻塞线程。该应用程序受到资源限制,而资源是内存。
同样,执行针对数据库服务器的长时间查询的I/O密集型任务也永远无法有效地利用本地计算机上所有可用的CPU资源。在这种情况下,任务调度程序无法"知道何时停止"启动新任务。
如果您的任务不是CPU密集型,或者CPU利用率与最大并行度的线性比例不成立,则应建议Parallel.ForEach不要同时启动太多任务。最简单的方法是指定一个允许一些I/O密集型任务重叠的数量,但不要过多地占用本地计算机的资源需求或过度满负载任何远程服务器。需要通过试错来获得最佳结果:
static void Main(string[] args)
{
    Parallel.ForEach(CreateData(),
        new ParallelOptions { MaxDegreeOfParallelism = 4 },
        (data) =>
            {
                data[0] = 1;
            });
}

24
我认为这与我的问题有关,他说得很准确。我可能会使用Enviorment.ProcessorCount并将其设置为最大并行度的限制。 - Scott Chamberlain
1
不,这确实与问题无关,我尝试过了,但失败了,下面会详细说明。 - evolvedmicrobe

47

因此,虽然Rick提出的建议肯定是一个重要的点,但我认为缺少讨论分区

Parallel::ForEach将使用默认的Partitioner<T>实现,对于没有已知长度的IEnumerable<T>,它将使用块分区策略。这意味着Parallel::ForEach将要用来处理数据集的每个工作线程都将从IEnumerable<T>中读取一些元素,然后只有该线程处理这些元素(暂时忽略工作窃取)。它这样做是为了节省不断返回源并分配一些新工作并安排另一个工作线程处理的费用。因此,通常情况下,这是一件好事。然而,在您特定的场景中,想象一下您在四核上,并且将MaxDegreeOfParallelism设置为4个线程进行工作,现在每个线程从您的IEnumerable<T>中拉取100个元素的块。那么,就在那个特定的工作线程中,就有100-400兆的内存占用,对吧?

所以,你该如何解决这个问题呢?很简单,你需要编写一个自定义的Partitioner<T>实现。现在,在你的情况下,分块仍然很有用,因此你可能不想选择单个元素分区策略,因为那样会引入所有任务协调所必需的开销。相反,我会编写一个可配置的版本,你可以通过appsetting进行调整,直到找到适合你的工作负载的最佳平衡点。好消息是,虽然编写这样的实现非常简单,但你实际上甚至不必自己编写,因为PFX团队已经将其放入了并行编程示例项目中

感谢您提供的额外信息。这个问题正在变得非常有启发性。 - Scott Chamberlain
1
这是一个很好的问题,我希望很多人能看到并从中学习。PLINQ/TPL通常会在很多方面为你屏蔽这些内容,但有时候不可避免地需要进入其中并调整一些参数,以便根据给定的工作负载将其引导到正确的路径上。这恰好是其中之一。 :) - Drew Marsh
1
样例项目的链接不再存在,为什么他们不能在不可用的页面中给出新页面的链接。 - zish
我也使用带有限制的MaxDegreeOfParallelism的Parallel Foreach来进行大型数据库导入,但是几个小时后它仍然耗尽了内存。问题出在分区上。 - stmax
3
自 .NET 4.5 版本起,.NET框架提供了一个单元素分区器,可通过Partitioner.Create(CreateData(), EnumerablePartitionerOptions.NoBuffering)进行使用。 - Scott Chamberlain
当我在生产者和消费者模式中使用GetConsumingEnumerable()时,我是否可以期望OutOfMemory异常?在这里,我有调整队列大小的能力。 - Mahesh kumar Chiliveri

15
这个问题完全与分区器有关,而不是与并行度有关。解决方法是实现自定义数据分区器。
如果数据集很大,似乎TPL的单线程实现保证会耗尽内存。我最近遇到了这个问题(基本上我正在运行上面的循环,并发现内存呈线性增长,直到给出OOM异常)。
追踪问题后,我发现默认情况下,mono将使用EnumerablePartitioner类来划分枚举器。该类具有一种行为,即每次向任务提供数据时,它通过逐渐增加(且不可更改)的2倍因子“分块”数据。所以第一次任务请求数据时,它获取大小为1的块,下一次获取大小为2 * 1 = 2的块,接下来是2 * 2 = 4,然后是2 * 4 = 8等等。结果是任务处理的数据量以及同时存储在内存中的数据量随任务长度的增加而增加,如果处理大量数据,则不可避免地会出现内存不足异常。

大概来说,这种行为的原因是希望避免每个线程多次返回获取数据,但它似乎基于一个假设:所有正在处理的数据都可以适应内存(当从大文件中读取时并非如此)。

如前所述,可以通过自定义分区器来避免这个问题。以下是一个简单的通用示例,它只是将数据逐个返回给每个任务:

https://gist.github.com/evolvedmicrobe/7997971

只需首先实例化该类并将其交给Parallel.For,而不是直接使用可枚举对象。


-1

虽然使用自定义分区器无疑是最“正确”的答案,但更简单的解决方案是让垃圾收集器追上。在我尝试的情况下,我在函数内部进行了重复调用并行.for循环。尽管每次退出函数,程序使用的内存都会线性增加,如此处所述。我添加了:

//Force garbage collection.
GC.Collect();
// Wait for all finalizers to complete before continuing.
GC.WaitForPendingFinalizers();

虽然它不是超级快,但它解决了内存问题。可能在高CPU使用率和内存利用率下,垃圾收集器效率不高。


1
有一些备受认可的作者提出了一个理由,如果你在生产代码中调用GC.Collect(),那么你实际上是在宣布你比GC的作者更懂。这可能是事实。然而,通常情况下并非如此,因此强烈不建议这样做。GC不是开发人员工具包,而是编译器工具包,与之相关的有一些惯例,例如对于非托管资源使用IDisposable。请参考https://dev59.com/13VD5IYBdhLWcg3wAWoO和CLR via C#。 - Bhanu Chhabra
问题在于等待的任务并非垃圾。 - uli78

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接