如何正确地并行处理大量依赖I/O的任务

23

我正在构建一个控制台应用程序,需要处理大量的数据。

基本上,该应用程序从数据库中获取引用。对于每个引用,解析文件内容并进行一些更改。这些文件是HTML文件,并且该过程使用正则表达式替换来做繁重的工作(查找引用并将其转换为链接)。然后将结果存储在文件系统中并发送到外部系统。

如果我按顺序总结这个过程:

var refs = GetReferencesFromDB(); // ~5000 Datarow returned
foreach(var ref in refs)
{
    var filePath = GetFilePath(ref); // This method looks up in a previously loaded file list
    var html = File.ReadAllText(filePath); // Read html locally, or from a network drive
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); // Copy the result locally, or a network drive
    SendToWs(ref, convertedHtml);
}

我的程序可以正确运行,但速度很慢。因此我想并行化处理过程。

到目前为止,我添加了一个简单的AsParallel并行处理:

var refs = GetReferencesFromDB().AsParallel(); 
refs.ForAll(ref=>
{
    var filePath = GetFilePath(ref); 
    var html = File.ReadAllText(filePath); 
    var convertedHtml = ParseHtml(html);
    File.WriteAllText(destinationFilePath); 
    SendToWs(ref, convertedHtml);
});
这个简单的更改减少了处理时间(时间减少了25%)。但是,根据我的理解,并行化对于依赖I/O资源的并行化不会带来太多好处(或者更糟,会减少好处),因为I/O不会神奇地增加。

因此,我认为我应该改变我的方法,不是并行化整个过程,而是创建依赖链式排队任务。
例如,我应该创建以下流程:

排队读取文件。完成后,排队分析HTML。完成后,将两者发送到WS并本地写入。完成后,记录结果。

但是,我不知道如何实现这样的想法。

我觉得这将最终成为一组消费者/生产者队列,但是我没有找到正确的示例。

另外,我不确定是否会有好处。

谢谢建议。

[编辑] 实际上,我是使用c# 4.5的完美候选人......如果它已经发布了:)

[编辑2] 另一件让我认为它没有正确并行化的事情是,在资源监视器中,我看到CPU、网络I/O和磁盘I/O的图表不稳定。当其中一个高时,其他则低至中等水平。

5
你的问题中有一个令人担忧的短语:“这些文件是HTML文件,并且该进程正在使用正则表达式进行大量替换。”这两个词:“HTML”和“正则表达式”是死对头,永远不会一起使用。如果你试图将它们放在一起,它们最终会相互抵消。 - Darin Dimitrov
1
@DarinDimitrov: 你是绝对正确的。如果你知道前任开发人员用正则表达式解析单个文件所花费的时间,你会笑的。现在,虽然仍然“长”,但由于小于几秒钟,它是可以接受的(与之前的“太长了,我必须杀掉进程”相比)。然而,我不知道如何在我的情况下避免使用正则表达式。 - Steve B
4
您可以使用HTML解析器(如HTML Agility Pack或SgmlReader)来避免使用正则表达式。 - Darin Dimitrov
@DarinDimitrov: 即使使用这些库,我最终也会使用正则表达式测试文本的格式为 XXX-YYY(年份)-ZZZ-lg.ext(不是所有部分都是必需的)。不过我可以想象,与整行 HTML 标记相比,针对单个节点进行测试会更有效率,不是吗?非常感谢您的建议。 - Steve B
@DarinDimitrov:你让我转换到了HtmlAgility包。`private static string ConvertLinks(string html) {var hDoc = new HtmlAgilityPack.HtmlDocument(); hDoc.LoadHtml(html); foreach (var node in hDoc.DocumentNode.DescendantNodes()) { if (node.NodeType == HtmlAgilityPack.HtmlNodeType.Text && node.ParentNode.Name != "a") { var converted = documentRefRegEx.Replace( node.InnerHtml, new MatchEvaluator( m => BuildLink ) ); node.InnerHtml = converted; } } return hDoc.DocumentNode.OuterHtml; }`。感谢您的分享。 - Steve B
5个回答

17
您的代码中没有利用任何异步I/O API。您正在进行的所有操作都是CPU绑定的,而所有的I/O操作都会浪费CPU资源阻塞。如果您想利用异步I/O,那么AsParallel只适用于计算绑定任务。今天在<= v4.0版本中,您需要利用基于异步编程模型(APM)的API。这可以通过查找您使用的基于I/O的类上的BeginXXX/EndXXX方法,并在可用时利用这些方法来完成。
首先,阅读此文章:TPL TaskFactory.FromAsync vs Tasks with blocking methods 接下来,在这种情况下,您不需要使用AsParallelAsParallel启用流式处理,将导致立即为每个项目安排一个新任务,但在这里您不需要/不希望这样做。您最好使用Parallel::ForEach来分区工作。
让我们看看如何利用这些知识来实现您特定情况下的最大并发性:
var refs = GetReferencesFromDB();

// Using Parallel::ForEach here will partition and process your data on separate worker threads
Parallel.ForEach(
    refs,
    ref =>
{ 
    string filePath = GetFilePath(ref);

    byte[] fileDataBuffer = new byte[1048576];

    // Need to use FileStream API directly so we can enable async I/O
    FileStream sourceFileStream = new FileStream(
                                      filePath, 
                                      FileMode.Open,
                                      FileAccess.Read,
                                      FileShare.Read,
                                      8192,
                                      true);

    // Use FromAsync to read the data from the file
    Task<int> readSourceFileStreamTask = Task.Factory.FromAsync(
                                             sourceFileStream.BeginRead
                                             sourceFileStream.EndRead
                                             fileDataBuffer,
                                             fileDataBuffer.Length,
                                             null);

    // Add a continuation that will fire when the async read is completed
    readSourceFileStreamTask.ContinueWith(readSourceFileStreamAntecedent =>
    {
        int soureFileStreamBytesRead;

        try
        {
            // Determine exactly how many bytes were read 
            // NOTE: this will propagate any potential exception that may have occurred in EndRead
            sourceFileStreamBytesRead = readSourceFileStreamAntecedent.Result;
        }
        finally
        {
            // Always clean up the source stream
            sourceFileStream.Close();
            sourceFileStream = null;
        }

        // This is here to make sure you don't end up trying to read files larger than this sample code can handle
        if(sourceFileStreamBytesRead == fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement reading files larger than 1MB. :P");
        }

        // Convert the file data to a string
        string html = Encoding.UTF8.GetString(fileDataBuffer, 0, sourceFileStreamBytesRead);

        // Parse the HTML
        string convertedHtml = ParseHtml(html);

        // This is here to make sure you don't end up trying to write files larger than this sample code can handle
        if(Encoding.UTF8.GetByteCount > fileDataBuffer.Length)
        {
            throw new NotSupportedException("You need to implement writing files larger than 1MB. :P");
        }

        // Convert the file data back to bytes for writing
        Encoding.UTF8.GetBytes(convertedHtml, 0, convertedHtml.Length, fileDataBuffer, 0);

        // Need to use FileStream API directly so we can enable async I/O
        FileStream destinationFileStream = new FileStream(
                                               destinationFilePath,
                                               FileMode.OpenOrCreate,
                                               FileAccess.Write,
                                               FileShare.None,
                                               8192,
                                               true);

        // Use FromAsync to read the data from the file
        Task destinationFileStreamWriteTask = Task.Factory.FromAsync(
                                                  destinationFileStream.BeginWrite,
                                                  destinationFileStream.EndWrite,
                                                  fileDataBuffer,
                                                  0,
                                                  fileDataBuffer.Length,
                                                  null);

        // Add a continuation that will fire when the async write is completed
        destinationFileStreamWriteTask.ContinueWith(destinationFileStreamWriteAntecedent =>
        {
            try
            {
                // NOTE: we call wait here to observe any potential exceptions that might have occurred in EndWrite
                destinationFileStreamWriteAntecedent.Wait();
            }
            finally
            {
                // Always close the destination file stream
                destinationFileStream.Close();
                destinationFileStream = null;
            }
        },
        TaskContinuationOptions.AttachedToParent);

        // Send to external system **concurrent** to writing to destination file system above
        SendToWs(ref, convertedHtml);
    },
    TaskContinuationOptions.AttachedToParent);
});

现在,这里有几点说明:

  1. 这些是示例代码,因此我使用了1MB缓冲区来读/写文件。这对于HTML文件来说过度,浪费系统资源。您可以将其降低以适应您的最大需求,或者将链式读/写实现到StringBuilder中,这是一个练习,由于我需要编写约500行代码才能进行异步链式读/写,所以我将其留给您。 :P
  2. 您会注意到,在读/写任务的继续执行中,我使用了TaskContinuationOptions.AttachedToParent。这非常重要,因为它将防止启动工作的Parallel::ForEach worker线程在所有底层异步调用完成之前完成。如果没有这个选项,您将同时启动所有5000个项目的工作,这将污染TPL子系统,并且无法正确扩展。
  3. 我在这里同时调用了SendToWs和将文件写入文件共享。我不知道SendToWs的实现情况,但它似乎也是使其异步化的一个很好的候选项。现在假设它是纯计算工作,并且在执行时将烧掉CPU线程。我将其留给您的练习,以找出如何最好地利用我向您展示的内容来提高吞吐量。
  4. 这是所有打字自由格式的,我的大脑是唯一的编译器,并且我只使用了SO的语法突出显示来确保语法正确。因此,请原谅任何语法错误,并让我知道如果我搞砸了任何东西,以至于您无法看懂,我会跟进。

当我进一步阅读您的提案时,我在想它是否适用于我的情况。在我的工作“流水线”中,有些步骤是I/O限制(读取文件、写入文件、将文件发送到WS)。您的代码优雅地管道化操作,但我认为这不会限制针对同一I/O通道的并发作业。我的意思是,您的代码将避免阻塞线程,但由于可以对同一I/O通道进行多次调用,因此我将没有受益(甚至会稍微降低速度)。实际上,我认为我必须为每个I/O通道创建队列,每个队列最多只能有一个工作人员。我是正确的吗? - Steve B
2
我们正在进入一个领域,在原始问题中没有足够的信息来提供您所寻找的绝对最佳方法。提供的代码将受到限制,因为Parallel::ForEach只会并发执行有限数量的工作任务(基于maxdegreeofparallelism和启发式算法),而且由于I/O工作流步骤被链接到这些任务上,这也将限制正在进行的I/O操作数量,同时仍然释放CPU资源(这是关键)。即使您使用p/c模式,仍然要使用异步I/O来最大化吞吐量。 - Drew Marsh
1
除了手动生产者/消费者之外,还有一种选择是研究[TPL DataFlow][1],这是.NET 4.0中的一种宠物项目库,但在.NET 4.5中已经内置。这确实是将步骤“管道化”并在流程中为每个特定步骤提供精细的并发控制的最佳方法。[1] http://www.microsoft.com/download/en/details.aspx?id=14610 - Drew Marsh

5
好消息是,你的逻辑可以很容易地分成一系列步骤,形成一个生产者-消费者管道。
步骤1:读取文件
步骤2:解析文件
步骤3:写入文件
步骤4:发送到Ws
如果你正在使用.NET 4.0,你可以使用BlockingCollection数据结构作为每个步骤的生产者-消费者队列的支撑。主线程将每个工作项入队到第1步的队列中,在那里它将被获取并处理,然后转发到第2步的队列等等。
如果你愿意转移到 Async CTP, 你也可以利用新的TPL Dataflow结构。其中有BufferBlock<T>数据结构等,与BlockingCollection行为类似,并且与新的asyncawait关键字很好地集成在一起。
因为你的算法是IO绑定的,生产者消费者策略可能不会给你带来你所期望的性能提升,但至少你将拥有一个非常优雅的解决方案,如果你可以增加IO吞吐量,这个方案将会很好地扩展。我担心第1步和第3步将成为瓶颈,管道将无法平衡,但值得尝试。

+1 for the BlockingCollection,看起来很有前途。我需要仔细阅读一下,但它似乎是我这种情况的一个好方法。 - Steve B

3

我有一个建议,你是否了解消费者/生产者模式?一定数量的线程会从磁盘上读取文件并将内容提供给队列。然后另一组线程,称为消费者,会在队列填充时“消耗”队列。 http://zone.ni.com/devzone/cda/tut/p/id/3023


1
同时从同一磁盘读取实际上会使性能变差。在这种情况下,只建议使用一个生产者。 - Tudor
1
在所有情况下都会更慢吗?我记得在我的一个特定项目上进行过测试,性能实际上随着一定数量的线程增加而增加。但是超过那个特定数量的线程确实会降低性能。因此,创建10个线程是不明智的。尽管如此,我认真考虑你的意见,并在时机成熟时重新进行性能分析。 - Hussein Khalil
1
我之所以这么说是因为磁盘必须寻找下一个读取位置。因此,如果多个线程尝试读取,它们只会在寻道之间反弹磁盘,甚至降低性能。 - Tudor

2

在这种情况下,您最好采用生产者消费者模型。一个线程负责获取数据,一组工作者处理数据。由于I/O问题不容易解决,所以最好专注于优化计算本身。

现在我将尝试勾勒出一个模型:

// producer thread
var refs = GetReferencesFromDB(); // ~5000 Datarow returned

foreach(var ref in refs)
{
    lock(queue)
    {   
       queue.Enqueue(ref);
       event.Set();
    }

    // if the queue is limited, test if the queue is full and wait.
}

// consumer threads
while(true)
{
    value = null;
    lock(queue)
    {
       if(queue.Count > 0)
       {
           value = queue.Dequeue();
       }
    }        

    if(value != null) 
       // process value
    else        
       event.WaitOne(); // event to signal that an item was placed in the queue.           
}

您可以在C#中的多线程的第4部分中找到有关生产者/消费者更多的细节:http://www.albahari.com/threading/part4.aspx


如果您可以使用.NET 4或更高版本,则可以使用ConcurrentQueue(或任何其他实现IProducerConsumerCollection的类)来完成此操作,而无需锁定。 - Scott Chamberlain
@Scott Chamberlain:你说得对。我还没有亲自升级,所以我还是老派的。:)) - Tudor

0

我认为你分割文件列表并批量处理每个文件的方法是可行的。 我的感觉是,如果你调整并行度,可能会获得更多的性能提升。 例如:var refs = GetReferencesFromDB().AsParallel().WithDegreeOfParallelism(16); 这将同时启动处理 16 个文件。目前,你可能正在处理 2 或 4 个文件,具体取决于你拥有的核心数。这只在没有 IO 的计算中才有效率。对于 IO 密集型任务,调整可以带来令人难以置信的性能提升,减少处理器空闲时间。

如果你要使用生产者-消费者模式拆分和合并任务,请查看此示例:使用并行 Linq 扩展合并两个序列,如何尽快产生最快的结果?


我的瓶颈主要在于磁盘输入/输出和网络输入/输出... 我认为在我的情况下增加更多的工作线程是没有帮助的。 - Steve B
如果三者的吞吐量完全相同,那么并行性不会带来显著的性能提升。并行性可以很好地利用某些资源的空闲时间,并与另一个任务同时运行。将处理链分解为特定于资源的块不会改变情况,因此您具有有限的缓冲区(RAM),不能在解析1个文件之前读取1000个文件,反之亦然。 - George Mamaladze

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接