如何尽可能高效地处理大量并发磁盘写入请求

9

假设以下方法在 .net 4 应用程序中被不同的线程调用了数千次。如何处理这种情况最好?请注意,瓶颈在于磁盘,但我希望 WriteFile() 方法能够快速返回。

数据可能高达几 MB。我们需要考虑线程池、TPL 或类似的方式吗?

public void WriteFile(string FileName, MemoryStream Data)
{
   try
   {
      using (FileStream DiskFile = File.OpenWrite(FileName))
      {
         Data.WriteTo(DiskFile);
         DiskFile.Flush();
         DiskFile.Close();
      }
   }
   catch (Exception e)
   {
      Console.WriteLine(e.Message);
   }
}

文件必须立即写入吗?另外,使用数据库是否可以满足您的需求? - Cameron
不是立即的,也不是按顺序的。不能使用数据库。 - Canacourse
打开文件非常耗费资源。只要机器有足够的内存,写入文件就非常便宜。几兆字节不是问题。 - Hans Passant
4个回答

6
如果您想快速返回并不太在意操作是否同步,可以创建一种内存中的“队列”,在其中放入写入请求,而在队列未填满时即可快速从方法中返回。另一个线程将负责调度队列并编写文件。如果调用您的“WriteFile”时队列已满,则必须等待直到可以排队,并且执行将再次变为同步,但这样可以拥有更大的缓冲区,因此如果处理文件写入请求不是线性的,而是更加峰状(具有暂停在写文件调用之间的间歇期),则可以将此更改视为性能的提高。
更新: 为您制作了一张小图片。请注意,瓶颈始终存在,您可能唯一能做的就是通过使用队列来优化请求。请注意队列有限制,因此当其填满时,您无法立即将文件排队,必须等待空闲空间。但对于图中呈现的情况(3个桶请求),很明显您可以迅速将桶放入队列中并返回,而在第一种情况下,您必须逐个完成并阻止执行。
请注意,您永远不需要同时执行多个IO线程,因为它们都将使用相同的瓶颈,并且如果您尝试强制并行执行,您将浪费内存。我相信最多只需使用2-10个线程就可以轻松占用所有可用的IO带宽,并限制应用程序内存使用量。

尝试过类似的方法。在WriteFile前放置了一个ConcurrentBag,但问题是文件是通过第三方回调传递给我的,因此ConcurrentBag最终会积累大量文件,并且在更多文件到达之前永远没有机会清空。 - Canacourse
2
这就是为什么在返回之前需要检查队列是否已满。设置队列的最大大小,并检查它是否填充少于该大小。如果您的写入速度为10mb /秒,而传入请求非常高以至于您需要1GB /秒来存储它们,那么除非进行严重的硬件更改,否则无法解决此问题。 - Valentin Kuzub

3

您说文件无需按顺序或立即写入,最简单的方法是使用 Task

private void WriteFileAsynchronously(string FileName, MemoryStream Data)
{
    Task.Factory.StartNew(() => WriteFileSynchronously(FileName, Data));
}

private void WriteFileSynchronously(string FileName, MemoryStream Data)
{
    try
    {
        using (FileStream DiskFile = File.OpenWrite(FileName))
        {
            Data.WriteTo(DiskFile);
            DiskFile.Flush();
            DiskFile.Close();
        }
    }

    catch (Exception e)
    {
        Console.WriteLine(e.Message);
    }
}

TPL 在内部使用线程池,即使对于大量的任务,也应该相当高效。

@Canacourse:不是的!线程池限制了实际执行工作的线程数量。请参阅此博客文章,了解.NET 4.0中任务如何实现的高级解释,其中包括有关工作窃取队列的信息。 - Cameron
这种方法有其局限性,因为您并没有尝试以任何方式控制流程。想象一下,磁盘写入速度为每秒1kb(为了更清楚地看到问题)。如果您从Web获取每秒兆字节的写入请求,您的应用程序将很快崩溃。 - Valentin Kuzub
1
@Valentin:说得好。如果这是OP的情况,那么我的解决方案就很糟糕了!如果队列一直比它们被清空的速度更快地填满,那么在某个时候,我们必须停止填充队列并等待它们稍微排空一下。在这种情况下,你的答案要好得多(我的只是简单)。顺便说一句,漂亮的图表 :-) - Cameron
糟糕,刚发现我没有接受答案。有趣的是,.net 4.5将内置异步文件I/O。 - Canacourse
我想你可能想用另一个名字来命名第一个函数。也许叫 WriteFileASynchronous - 2i3r
看起来是这样! - Cameron

2
如果数据进来的速度比你记录它的速度快,那么你就有一个真正的问题。将 WriteFile 直接扔到 ConcurrentQueue 或类似结构中的生产者/消费者设计和单独服务于该队列的线程一起工作得很好...直到队列被填满。如果你要打开 50,000 个不同的文件,事情会很快堵塞。更不用说你的数据可能每个文件都有几兆字节,这会进一步限制队列的大小。
我曾经遇到过类似的问题,通过让 WriteFile 方法附加到一个单独的文件上解决了问题。它写入的记录包括记录号、文件名、长度和数据。正如 Hans 在对你原始问题的评论中指出的那样,写入文件是快速的;打开文件是慢的。
我的程序中第二个线程开始读取 WriteFile 正在写入的文件。该线程读取每个记录头(编号、文件名、长度),打开一个新文件,然后将数据从日志文件复制到最终文件。
如果日志文件和最终文件位于不同的磁盘上,则此方法效果更佳,但即使只有一个盘也可以很好地工作。不过它确实会让你的硬盘运转起来。
它的缺点是需要两倍的磁盘空间,但是随着 2TB 硬盘价格低于 150 美元,我不认为这是个大问题。它总体上也不如直接写入数据高效(因为你必须处理两次数据),但它的好处是不会导致主处理线程停止。

如果IO访问是瓶颈,那么建议同时使用两个写入者,这样写入的瓶颈就会变成原来的1/2。一个线程不停地向磁盘写入,另一个线程在其他地方读取和写入。如果以前队列在X时间内填满,现在它会更快地填满,而不是X/2。想象一下你正在以等于磁盘写入速度的速度接收1GB文件。现在你的解决方案根本行不通,而我的解决方案会利用整个可用的IO磁盘写入速度,而不是只有一半甚至更少的速度。 - Valentin Kuzub
针对这个示例情况,为了使第一个线程能够执行其写入原始数据的任务(该任务无法让我们接近最终结果,即实际文件已写入磁盘),您将不得不完全禁用第二个线程,否则会出现内存快速爆满的情况。 - Valentin Kuzub
@Valentin:你说得对,我的方法会降低系统的整体性能,需要更多时间。但从OP的问题来看,问题是如何防止工作线程停滞不前。我的解决方案可以做到这一点,因为“WriteFile”方法只是将内容追加到文件中,这是非常快速的操作。瓶颈在于创建新文件,这由单独的线程处理。确实会有一些IO争用,但工作线程不会被阻塞。我已经编写并使用了与我所描述的非常相似的东西。它确实像广告中所说的那样运行。 - Jim Mischel
如果我们谈论抽象数字,打开文件的确更昂贵,但这与最大IO带宽无关。最大IO带宽是固定的,如果我们的进程已经优化为以最大速度写入(通过任何方法,为简单起见,我们可以说通常传入文件很大,所以大多数情况下都在写入),引入一个读写两用的第二个线程可能会导致性能下降。由于我们有X(速度)和Y(磁盘输出速度),如果Y-X>0,那么一切都还好,但是一旦Y-X变成每秒钟少1个字节,我们就注定要使应用程序崩溃。 - Valentin Kuzub
@Valentin:毫无疑问,这种方法会有失败的极限。但是对于OP的应用程序来说,这应该可以很好地工作。只要操作系统的写缓存足够大,可以在另一个线程读写单个文件时容纳进来的数据,一切都可以正常运行。写缓存将进行缓冲,然后操作系统将以一个大写入方式刷新到文件中。当然,假设有一个点停止输入数据,或者至少减慢到写后线程可以赶上的速度。但是这种假设已经内置在任何缓存方案中了。 - Jim Mischel

0
将完整的方法实现封装在一个新的Thread()中。然后,您可以“fire-and-forget”这些线程并返回到主调用线程。
    foreach (file in filesArray)
    {
        try
        {
            System.Threading.Thread updateThread = new System.Threading.Thread(delegate()
                {
                    WriteFileSynchronous(fileName, data);
                });
            updateThread.Start();
        }
        catch (Exception ex)
        {
            string errMsg = ex.Message;
            Exception innerEx = ex.InnerException;
            while (innerEx != null)
            {
                errMsg += "\n" + innerEx.Message;
                innerEx = innerEx.InnerException;
            }
            errorMessages.Add(errMsg);
        }
    }

2
因为您无论如何都无法控制这些并发线程的数量,所以应用程序启动后即执行并崩溃。 - Valentin Kuzub
我指出了你的方法中一个明显的错误,而你却说它很美妙。我没有什么可补充的。 - Valentin Kuzub
哪里出了问题?如果你预计会有大量负载,你可以限制创建的线程数量。 - Leon
Canacourse:你可以“分批处理”它。每个批次将是同步的,但在每个批次内部将是异步的。 - Leon
此外,Leon的初始方法不会向您呈现文件数组。您将获得许多对单个文件的方法调用,因此,如果您计划首先将它们分组成块,以便您的方法可以工作,我们就会遇到新问题。假设您计划编写50个文件的块,并且您有30个请求。您仍然什么都没有写吗?如果用户希望在发送请求后的某个时间点将其文件保存到文件系统中,该怎么办?在这种情况下,如果没有更多请求进来,他们将永远看不到自己的文件刷新到磁盘上。 - Valentin Kuzub
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接