使用.NET进行多线程文件处理

17

有一个文件夹里面包含成千上万个小文本文件。我想在更多文件被添加到文件夹时解析和处理它们。我的意图是将此操作多线程化,因为单线程原型需要六分钟才能处理1000个文件。

我希望有读取线程和写入线程,如下所示。当读取线程正在读取文件时,我希望有写入线程来处理它们。一旦读取器开始读取文件,我希望将其标记为正在处理中,例如通过重命名。读取完后,将其重命名为已完成。

如何处理这样的多线程应用程序?

使用分布式哈希表还是队列更好呢?

哪种数据结构可以避免锁定?

这种方案有更好的方法吗?


1
哪个 .net 版本可供使用?.Net 4 在这方面提供了很多帮助,但不确定是否可选。 - Nick Craver
4
一个主要的限制因素是I/O争用,无论您如何尝试并行化工作,所有内容仍然必须通过同一I/O通道进行传输。 - Chris O
好的,那我想充分利用IO。 - DarthVader
.NET 3.5。我怀疑.NET 4对我来说不是一个选项。 - DarthVader
8
@Nick Craver 我非常希望除了3.5版本之外,还能看到针对.Net 4的建议。但前提是这不会给你带来任何困扰,并且只有其他人也感兴趣(他们可以点赞这条评论)。 - Chris
@Chris - 我在下面留了一个答案,展示了方法、好处、需要注意的一些问题以及一个可以尝试的示例测试,希望你觉得有用 :) - Nick Craver
6个回答

27

鉴于大家对.NET 4如何处理这个问题有兴趣,以下是该方法。抱歉,这对提问者可能不是一个选项。免责声明:这不是一项高度科学的分析,只是表明了明显的性能优势。根据硬件配置,您的结果可能会有很大不同。

这是一个快速测试(如果您在这个简单的测试中看到了一个很大的错误,请评论,我们可以改进它使其更加有用/准确)。为此,我只是在一个目录中放了12,000个大小约为60KB的文件作为样本(启动LINQPad; 您可以免费玩耍! - 请确保获取LINQPad 4):

var files = 
Directory.GetFiles("C:\\temp", "*.*", SearchOption.AllDirectories).ToList();

var sw = Stopwatch.StartNew(); //start timer
files.ForEach(f => File.ReadAllBytes(f).GetHashCode()); //do work - serial
sw.Stop(); //stop
sw.ElapsedMilliseconds.Dump("Run MS - Serial"); //display the duration

sw.Restart();
files.AsParallel().ForAll(f => File.ReadAllBytes(f).GetHashCode()); //parallel
sw.Stop();
sw.ElapsedMilliseconds.Dump("Run MS - Parallel");

稍微改变您的循环以并行查询在大多数简单情况下是所需的。通过“简单”,我主要指的是一个操作的结果不会影响下一个操作。最需要记住的是,有些集合(例如我们方便的List<T>不是线程安全的,因此在并行场景中使用它不是一个好主意 :) 幸运的是,在.NET 4中添加了并发集合,这些集合是线程安全的。还要记住,如果您正在使用锁定集合,则根据情况,这可能也是一个瓶颈。

这里使用了.NET 4.0中提供的.AsParallel<T>(IEnumeable<T>).ForAll<T>(ParallelQuery<T>)扩展方法。.AsParallel()调用将IEnumerable<T>包装在一个ParallelEnumerableWrapper<T>(内部类)中,该类实现了ParallelQuery<T>。这样就可以使用并行扩展方法,在这种情况下,我们使用.ForAll()

.ForAll() 内部创建了一个 ForAllOperator<T>(query, action) 并同步运行它。这处理线程和合并线程的操作,直到它运行结束... 在其中有很多操作,如果你想学习更多内容,包括其他选项,我建议从 这里开始


结果(计算机1 - 物理硬盘):

  • 串行:1288 - 1333毫秒
  • 并行:461 - 503毫秒

计算机规格-供比较:

结果(计算机2 - 固态硬盘):

  • 串行:545 - 601 毫秒
  • 并行:248 - 278 毫秒

计算机规格-供比较:

这次我没有CPU/RAM的链接,它们是预装的。这是一台Dell M6400笔记本电脑 (这里有M6500的链接... Dell的自己的6400链接已经失效)。


这些数字是从10次运行中获取的,取内部8个结果的最小值/最大值(尽可能排除每个结果的原始最小值/最大值作为离群值)。我们在这里遇到了I/O瓶颈,特别是在物理驱动器上,但考虑一下串行方法的工作方式。它读取、处理、读取、处理,反复循环。采用并行方法,您可以同时读取和处理(即使存在I/O瓶颈)。在最坏的瓶颈情况下,您正在处理一个文件,同时读取下一个文件。仅仅这样(在任何当前计算机上!)都应该产生一些性能提升。您可以看到我们可以同时处理多个文件,在上面的结果中得到了一些健康的提升。

另一个免责声明:四核+ .NET 4并行不会给您带来四倍的性能,它不会线性扩展...还有其他的考虑因素和瓶颈。

我希望这个回答对于那些对这种方法和潜在好处感兴趣的人有所帮助。欢迎批评或改进...这个回答存在的唯一目的就是为了那些在评论中表明感兴趣的人 :)


很高兴看到这个具体的例子。谢谢。 - Chris
Nick,如果OP使用一个生产者和多个消费者,而不是为每个文件使用并行任务,那不是更好吗?如果OP创建了太多的并行任务,那么在它们之间切换实际上会降低性能...除此之外,这是一篇很棒的文章! - Kiril
1
@Lirik - 不确定我完全理解了,您在此处并没有真正进行上下文切换,这将创建与核心数相对应的线程数,因此您不会进行上下文切换,除非像往常一样发生中断。在您的情况下,生产者是什么(请澄清一下,例如!)?由于扩展文件读取的能力取决于数据源,无论是一个物理硬盘,光纤通道,固态硬盘,RAM等.... 它读取x个文件的速度更快,将取决于媒介...所以不确定单个生产者是否更快...它实际上可能成为瓶颈 :) - Nick Craver
@Nick,我正在尝试弄清楚你的示例如何创建与核心对应的线程数?这是TPL自动完成的,还是有其他魔法在其中? - Kiril
1
@Lirik - 工作线程的数量默认情况下由PLINQ内部自动缩放,但是如果您想使用MaxDegreeOfParallelism指定限制(以及许多其他选项),则可以这样做。Reed Copsey在这里有一个很好的解释:http://reedcopsey.com/2010/02/11/parallelism-in-net-part-9-configuration-in-plinq-and-tpl/ - Nick Craver
如果我需要处理几个大文件(最大的约为13 GB),那么 PLINQ 是否仍会将所有内容存储在内存中,或者如果我使用 ReadAllBytes,它是否会尝试将 13GB 的数据全部读入内存?对于这些庞然大物,我是否注定要逐行处理? - interesting-name-here

6

设计

这种情况下,生产者/消费者模式可能是最有用的。您应该创建足够的线程来最大化吞吐量。

以下是一些关于生产者/消费者模式的问题,以帮助您了解其工作原理:

您应该使用阻塞队列,生产者应将文件添加到队列中,而消费者则从队列中处理文件。阻塞队列不需要锁定,因此这是解决您问题最高效的方法。

如果您使用的是.NET 4.0,则可以直接使用几个并发集合

线程

单个生产者线程可能是从磁盘中加载文件并将其推送到队列的最有效方法;随后,多个消费者将从队列中弹出项目并处理它们。我建议您尝试每个核心使用2-4个消费者线程,并进行一些性能测量,以确定哪个最优化(即提供最大吞吐量的线程数)。对于这个特定的示例,我不建议使用线程池。

P.S. 我不明白为什么会担心单点故障和分布式哈希表的使用?我知道DHT听起来很酷,但除非您有正在解决的具体问题,否则我建议先尝试传统方法。


我认为生产者也应该将文件读入缓冲区,然后将其传递给消费者。这样,您就不会有多个线程同时尝试从硬盘驱动器中读取数据(这会导致磁头频繁跳动并且大大减慢速度,而不是加快进程)。 - Grant Peters
正确授权,一个生产者和多个消费者。 - Kiril

3
我建议为每个文件排队一个线程,并在字典中跟踪正在运行的线程,在一个最大限制内启动新线程,当一个线程完成时启动一个新线程。当线程可能运行时间较长时,我更喜欢创建自己的线程,并使用回调函数来指示何时完成或遇到异常。在下面的示例中,我使用字典来跟踪正在运行的工作实例。这样,如果我想要提前停止工作,就可以调用到一个实例。回调也可以用于更新UI进度和吞吐量。您还可以动态地限制正在运行的线程限制以获得额外的积分。
示例代码是一个简化的演示程序,但它可以运行。
class Program
{
    static void Main(string[] args)
    {
        Supervisor super = new Supervisor();
        super.LaunchWaitingThreads();

        while (!super.Done) { Thread.Sleep(200); }
        Console.WriteLine("\nDone");
        Console.ReadKey();
    }
}

public delegate void StartCallbackDelegate(int idArg, Worker workerArg);
public delegate void DoneCallbackDelegate(int idArg);

public class Supervisor
{
    Queue<Thread> waitingThreads = new Queue<Thread>();
    Dictionary<int, Worker> runningThreads = new Dictionary<int, Worker>();
    int maxThreads = 20;
    object locker = new object();

    public bool Done { 
        get { 
            lock (locker) {
                return ((waitingThreads.Count == 0) && (runningThreads.Count == 0)); 
            } 
        } 
    }

    public Supervisor()
    {
        // queue up a thread for each file
        Directory.GetFiles("C:\\folder").ToList().ForEach(n => waitingThreads.Enqueue(CreateThread(n)));
    }

    Thread CreateThread(string fileNameArg)
    {
        Thread thread = new Thread(new Worker(fileNameArg, WorkerStart, WorkerDone).ProcessFile);
        thread.IsBackground = true;
        return thread;
    }

    // called when a worker starts
    public void WorkerStart(int threadIdArg, Worker workerArg)
    {
        lock (locker)
        {
            // update with worker instance
            runningThreads[threadIdArg] = workerArg;
        }
    }

    // called when a worker finishes
    public void WorkerDone(int threadIdArg)
    {
        lock (locker)
        {
            runningThreads.Remove(threadIdArg);
        }
        Console.WriteLine(string.Format("  Thread {0} done", threadIdArg.ToString()));
        LaunchWaitingThreads();
    }

    // launches workers until max is reached
    public void LaunchWaitingThreads()
    {
        lock (locker)
        {
            while ((runningThreads.Count < maxThreads) && (waitingThreads.Count > 0))
            {
                Thread thread = waitingThreads.Dequeue();
                runningThreads.Add(thread.ManagedThreadId, null); // place holder so count is accurate
                thread.Start();
            }
        }
    }
}

public class Worker
{
    string fileName;
    StartCallbackDelegate startCallback;
    DoneCallbackDelegate doneCallback;
    public Worker(string fileNameArg, StartCallbackDelegate startCallbackArg, DoneCallbackDelegate doneCallbackArg)
    {
        fileName = fileNameArg;
        startCallback = startCallbackArg;
        doneCallback = doneCallbackArg;
    }

    public void ProcessFile()
    {
        startCallback(Thread.CurrentThread.ManagedThreadId, this);
        Console.WriteLine(string.Format("Reading file {0} on thread {1}", fileName, Thread.CurrentThread.ManagedThreadId.ToString()));
        File.ReadAllBytes(fileName);
        doneCallback(Thread.CurrentThread.ManagedThreadId);
    }
}

1

你可以有一个中央队列,读取线程需要在将内存内容推送到队列期间获得写入访问权限。处理线程需要读取此中央队列以弹出下一个要处理的内存流。这样,您可以最小化在锁定中花费的时间,并且不必处理无锁代码的复杂性。

编辑:理想情况下,您应该优雅地处理所有异常/错误条件(如果有),以便您没有失败点。

作为替代方案,您可以拥有多个线程,每个线程在处理之前通过重命名来“声明”文件,因此文件系统成为实现锁定访问的方式。我不知道这是否比我的原始答案更有效,只有测试才能告诉。


这会引入单点故障。我希望采用去中心化的方法。 - DarthVader
2
可能可以,但是让每个线程都“智能”并与其他线程协同工作可能会很复杂。跨线程问题可能会成为调试的噩梦。一个单一(或中央)队列更简单。 - Pretzel
你是说我没有故障点吗?如果服务器宕机会发生什么?集中式方法总是引入单点故障。 - DarthVader

1

一般来说,处理1000个小文件(顺便问一下,有多小?)不应该需要六分钟的时间。作为一个快速测试,在包含这些文件的目录中执行find "foobar" *命令(引号中的第一个参数无关紧要,可以是任何内容),看看处理每个文件需要多长时间。如果超过了一秒钟,我会感到失望。

假设这个测试证实了我的怀疑,那么这个过程就是CPU密集型的,将读取操作分离到自己的线程中不会有任何改进。你应该:

  1. 找出为什么平均处理一个小输入需要超过350毫秒,并希望改进算法。
  2. 如果没有办法加速算法并且你有一个多核机器(现在几乎每个人都有),使用线程池将1000个任务分配给每个任务读取一个文件。

首先,连续进程没有总运行时间。其次,我很清楚六分钟是总处理时间。测试的整个重点是展示其中有多少时间用于I/O操作;我强烈怀疑几乎没有时间用于I/O操作,那么并行化I/O操作就没有任何收益了。 - Marcelo Cantos
我想我应该更明确地表达我所谓的连续过程。我的意思是,只要文件正在创建,这个过程就会一直运行。 - DarthVader
1
你跑了我建议的测试吗?我很好奇结果如何。 - Marcelo Cantos
我完成后会告诉你。我现在没有文件的访问权限。 - DarthVader

0

你可以考虑使用文件队列来处理。在启动时通过扫描目录填充队列,然后使用FileSystemWatcher更新队列以高效地将新文件添加到队列中,而无需不断重新扫描目录。

如果可能的话,请读写不同的物理磁盘。这将为您提供最大的IO性能。

如果您有一大堆要处理的文件,然后不断以不均匀的速度添加新文件,并且所有这些都发生在同一个磁盘(读/写),则可以考虑将已处理的文件缓冲到内存中,直到满足以下两个条件之一:

  • (暂时)没有新文件
  • 您已经缓冲了太多的文件,不想再使用更多的内存进行缓冲(理想情况下是可配置的阈值)

如果您实际处理文件的过程需要大量CPU时间,则可以考虑每个CPU核心使用一个处理线程。但是,对于“正常”处理,CPU时间与IO时间相比微不足道,复杂性不值得任何小的收益。


写入将被指向数据库服务器。 - DarthVader
1
在这种情况下,使用读线程和写线程将能够提高吞吐量,因为写操作发生在网络上而不是本地驱动器上。 - Eric J.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接