如何使用Parallel.ForEach正确地向文件写入数据?

8

我有一个任务需要逐行读取一个大文件,进行一些逻辑处理,并返回一个字符串,我需要将其写入一个文件中。输出的顺序并不重要。然而,当我尝试使用下面的代码时,在读取我的文件的15-20k行后,它会变得非常慢或停止。

public static Object FileLock = new Object();
...
Parallel.ForEach(System.IO.File.ReadLines(inputFile), (line, _, lineNumber) =>
{
    var output = MyComplexMethodReturnsAString(line);
    lock (FileLock)
    {
        using (var file = System.IO.File.AppendText(outputFile))
        {
            file.WriteLine(output);
        }
    }
});

为什么我的程序运行一段时间后变慢了?有没有更正确的方法来执行这个任务?

1
我不确定,但感觉这样使用并行方式会产生/加剧IO瓶颈,而不是避免它。除非你在这些行上执行非常昂贵的操作。 - TaW
这是一个相当昂贵的操作。至少前5k行速度明显增加。但很快之后就开始变慢了。 - justindao
2
变量file = System.IO.File.AppendText(outputFile)可以放在foreach循环之外,因为您正在锁定它。检查一下是否会提高性能。 - Fábio Junqueira
1
你使用了“锁”来进行同步,以确保只有一个线程可以写入文件。这肯定会减慢速度,因为它限制了顺序写操作。多个线程将一直等待,直到第一个线程写入文件。 - Abhinav Galodha
1
你的代码相当于用一把剪刀修剪草坪,但是你不想独自完成(因为那需要很长时间),于是你找了100个朋友来帮忙,但是你却让他们共用一把剪刀。 - Enigmativity
显示剩余3条评论
2个回答

12

你本质上通过让所有线程尝试写入文件来序列化了你的查询。相反,你应该计算需要写什么,然后将它们写在最后到来的位置。

var processedLines = File.ReadLines(inputFile).AsParallel()
    .Select(l => MyComplexMethodReturnsAString(l));
File.AppendAllLines(outputFile, processedLines);
如果你需要实时刷新数据,可以打开流并启用自动刷新(或手动刷新):
var processedLines = File.ReadLines(inputFile).AsParallel()
    .Select(l => MyComplexMethodReturnsAString(l));
using (var output = File.AppendText(outputFile))
{
    output.AutoFlush = true;
    foreach (var processedLine in processedLines)
        output.WriteLine(processedLine);
}

如果这个文件确实是一个文件,我不确定这种方法是否足够适用,因为它需要作为第一步读取整个文件。 - Souhaieb Besbes
2
不过当使用 File.ReadLines() 时,它会给你一个可枚举对象,以便在读取每一行文件时进行枚举。这与使用 File.ReadAllLines() 不同,后者返回包含文件所有行的数组。而 File.ReadAllLines() 会将整个文件读入内存中。 - Jeff Mercado

6
这与Parallel.ForEach内部负载均衡器的工作原理有关。当它发现您的线程花费了大量时间进行阻塞时,它会推断出通过向问题投入更多线程可以加速解决问题,导致更高的并行开销、FileLock争用和整体性能下降。
为什么会这样?因为Parallel.ForEach不适用于IO工作。
如何解决?仅将Parallel.ForEach用于CPU工作,并在并行循环外执行所有IO操作。
一个快速的解决方法是限制Parallel.ForEach可以招募的线程数,使用接受ParallelOptions的重载方法,如下所示:
Parallel.ForEach(
    System.IO.File.ReadLines(inputFile),
    new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount },
    (line, _, lineNumber) =>
    {
        ...
    }

我非常喜欢你的回答,直到“一个快速解决方法...”。它似乎是你之前所说的一切的倒退。也许如果你详细说明代码,我会更明白。 - Enigmativity
好奇:我一直以为 Environment.ProcessorCount 是 MaxDegreeOfParallelism 的自然限制。这是错误的吗? - TaW
1
@TaW,不,它将远远超出Environment.ProcessorCount。这里有一个演示,每秒钟添加约1个线程,直到您杀死进程(我在100后放弃了):https://dotnetfiddle.net/dT1eBM(不用说,您可能不应该在生产服务器上运行此代码)。 - Kirill Shlenskiy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接