使用多线程读取大型文本文件?

29

我有一个包含 100000 行文本的大型 txt 文件。 我需要启动 n 个线程,并为每个线程分配唯一的文件行。

最好的方法是什么?我认为我需要逐行读取文件,迭代器必须是全局变量以进行锁定。将文本文件加载到列表中将耗费时间,我可能会遇到 OutofMemory 异常。有什么想法吗?


展示一下你尝试过的东西 - Peter
创建n个唯一的随机数,按升序排序,使用File.ReadLines方法,获取正确位置的行并将它们传递给线程。 - Ilya Ivanov
你不能使用这个吗:http://msdn.microsoft.com/zh-cn/library/dd460720.aspx? - Daan Timmer
6个回答

43
您可以使用File.ReadLines 方法逐行读取文件,而无需一次性将整个文件加载到内存中,并且可以使用Parallel.ForEach 方法在多个线程中并行处理这些行:
Parallel.ForEach(File.ReadLines("file.txt"), (line, _, lineNumber) =>
{
    // your code here
});

如果文件太大,此处可能会抛出OutOfMemory异常。 - sinitram
2
可能会对@SteffenWinkler感兴趣的是https://dotnetfiddle.net/wX7VhA。请注意,第三项是在第一项结束后开始的,而不是在第二项结束后开始的。我不确定您对捆绑问题的担忧是否有效。 - mjwills
2
@mjwills 嗯,在进一步的测试和玩耍后,我不得不同意你的观点。我的最初观察可能是巧合,或者是我没有足够地注意到正在发生的事情。然而,我想指出的一件事是 Parallel.Foreach 似乎会将条目列表分成可用线程的数量,并且每个线程执行一个子列表。因此,线程1获取条目1-20,线程2获取条目21-40,而不是只取下一个可用条目。 - Steffen Winkler
1
是的,它确实像那样进行分区。如果您不想要这种行为,https://dev59.com/sm435IYBdhLWcg3wigux#20929046 可能值得一读。 - mjwills
1
请考虑删除您之前(不正确的)评论。 - mjwills
显示剩余3条评论

24

在将61,277,203行加载到内存中并将值推入字典/ConcurrentDictionary()的基准测试后,结果似乎支持@dtb上面的答案,即使用以下方法是最快的:

Parallel.ForEach(File.ReadLines(catalogPath), line =>
{

}); 

我的测试结果如下:
  1. 在这个文件大小的情况下,File.ReadAllLines() 和 File.ReadAllLines().AsParallel() 的运行速度几乎相同。从我的 CPU 活动情况来看,它们似乎都使用了我的 8 核中的两个?
  2. 使用 File.ReadAllLines() 预先读取所有数据比在 Parallel.ForEach() 循环中使用 File.ReadLines() 要慢得多。
  3. 我还尝试了生产者/消费者或 MapReduce 风格的模式,其中一个线程用于读取数据,另一个线程用于处理数据。但这种方式似乎并没有超过上述简单模式的表现。

为了参考,我在此处提供了该模式的示例代码,因为该页面上未包含:

var inputLines = new BlockingCollection<string>();
ConcurrentDictionary<int, int> catalog = new ConcurrentDictionary<int, int>();

var readLines = Task.Factory.StartNew(() =>
{
    foreach (var line in File.ReadLines(catalogPath)) 
        inputLines.Add(line);

        inputLines.CompleteAdding(); 
});

var processLines = Task.Factory.StartNew(() =>
{
    Parallel.ForEach(inputLines.GetConsumingEnumerable(), line =>
    {
        string[] lineFields = line.Split('\t');
        int genomicId = int.Parse(lineFields[3]);
        int taxId = int.Parse(lineFields[0]);
        catalog.TryAdd(genomicId, taxId);   
    });
});

Task.WaitAll(readLines, processLines);

这是我的基准测试结果:

enter image description here

我怀疑在某些处理条件下,生产者/消费者模式可能会比简单的Parallel.ForEach(File.ReadLines())模式更优。然而,在这种情况下并不是这样。


嗨,Jake,感谢分享基准测试。虽然我同意使用File.ReadLines()来避免大内存消耗的必要性,但是Parallel.ForEach(File.ReadLines())真的比单线程处理更优吗?流是按顺序设计的,并且硬件只支持一次读取一个东西,因此使用多个线程来处理结果可能会增加阻塞和上下文切换的开销;有趣的是,仅使用单个线程处理File.ReadLines()的结果的性能指标将会非常有趣。 - dragonfly02
去试试吧!基准测试足够简单。回答你的问题,我认为这取决于你的foreach循环对每行进行了多少处理。想象一下这样一个场景:流在Parallel.ForEach()中向线程池提供工作,读入的每一行都会被传递给下一个可用的线程进行处理。如果你有一个空的foreach循环,那么使用单个线程读取可能会更快。然而,这通常不是这种情况。 - Jake Drew
虽然流可能是按顺序设计的,但它也被实现为使用Yield Returns的IEnumerable。因此,在单线程或多线程场景中,File.ReadLines()的行为相同,即“yielding”处理直到下一个行请求,而不是来自单个或多个线程。这一切都将归结于您在每个文件行上执行多少工作,与并行处理获得的加速(如果有)有关! - Jake Drew
我注意到BlockingCollection非常慢。也许使用不同的backingstore会更快。 - Walter Verhoeven

7

5

Something like:

public class ParallelReadExample
{
    public static IEnumerable LineGenerator(StreamReader sr)
    {
        while ((line = sr.ReadLine()) != null)
        {
            yield return line;
        }
    }

    static void Main()
    {
        // Display powers of 2 up to the exponent 8:
        StreamReader sr = new StreamReader("yourfile.txt")

        Parallel.ForEach(LineGenerator(sr), currentLine =>
            {
                // Do your thing with currentLine here...
            } //close lambda expression
        );

        sr.Close();
    }
}

我认为这会奏效。(这里没有C#编译器/IDE)


使用 thr = new Thread[j] 重写它怎么样?for (; i < j; i++) { thr[i] = new Thread(new ThreadStart(go)); thr[i].IsBackground = true; thr[i].Start(); }而不是 Parallel.ForEach。 - obdgy
2
@obdgy:你为什么想要这样做? - dtb
1
@obdgy 相比于 Parallel.ForEach,它有什么用途? - Daan Timmer
1
@obdgy 如果你在双核、四核或八核上运行,使用100-300个线程并没有速度优势。甚至可能比在八核上只运行8个线程还要慢。简单来说:运行的线程数超过CPU核心数只会减缓进程速度。 - Daan Timmer

4
如果您想限制线程数为 n,最简单的方法是使用 AsParallel()WithDegreeOfParallelism(n) 来限制线程数量:
string filename = "C:\\TEST\\TEST.DATA";
int n = 5;

foreach (var line in File.ReadLines(filename).AsParallel().WithDegreeOfParallelism(n))
{
    // Process line.
}

如果我正确理解File.ReadLines(),它基本上是一种类似于Python生成器的东西,内部使用Yield实现的? - Daan Timmer
@DaanTimmer 我对Python一无所知,但File.ReadLines()只返回一个IEnumerable<string>,它是通过yield实现的。 - Matthew Watson
在这种情况下,你的答案可以简单地归纳为“是的” :-) - Daan Timmer

2
如@dtb上面提到的,读取文件并处理文件中每一行的最快方法是: 1)使用File.ReadAllLines()将其读入数组中 2)使用Parallel.For循环遍历数组。 您可以在此处阅读更多性能基准。 您需要编写的代码的基本要点是:
string[] AllLines = File.ReadAllLines(fileName);
Parallel.For(0, AllLines.Length, x =>
{
    DoStuff(AllLines[x]);
    //whatever you need to do
});

随着 .Net4 中数组大小的增加,只要您拥有足够的内存,这将不会成为问题。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接