内存流写入时出现OutOfMemoryException异常

3
我有一个小的样例应用程序,我正在努力尝试使用一些新的.Net 4.0并行扩展(它们非常好)。我遇到了一个(可能非常愚蠢的)OutOfMemoryException问题。我想要将这个示例插入到我的主应用程序中,该应用程序读取一些数据和大量文件,对其进行一些处理,然后将其写出到某个地方。我遇到了一些文件变得越来越大的问题(可能是GB级别),并且担心内存问题,所以我想并行化处理,这就导致我走上了这条路。
现在下面的代码会在较小的文件上引发OOME异常,我认为我只是漏掉了一些东西。它会很好地并行读取10-15个文件并将它们写出,但是在下一个文件上卡住了。看起来它已经读取并写入了约650MB的数据。需要另一双眼睛来帮忙解决。
我从FileStream中读取到MemorySteam,因为这是主应用程序所需的内容,我只是在某种程度上尝试复制它。它从各种地方读取数据和文件,并在MemoryStreams上工作。
这是使用.Net 4.0 Beta 2、VS 2010。
namespace ParellelJob
{
class Program
{
    BlockingCollection<FileHolder> serviceToSolutionShare;
    static void Main(string[] args)
    {
        Program p = new Program();
        p.serviceToSolutionShare = new BlockingCollection<FileHolder>();
        ServiceStage svc = new ServiceStage(ref p.serviceToSolutionShare);
        SolutionStage sol = new SolutionStage(ref p.serviceToSolutionShare);

        var svcTask = Task.Factory.StartNew(() => svc.Execute());
        var solTask = Task.Factory.StartNew(() => sol.Execute());

        while (!solTask.IsCompleted)
        {

        }

    }
}

class ServiceStage
{
    BlockingCollection<FileHolder> outputCollection;
    public ServiceStage(ref BlockingCollection<FileHolder> output)
    {
        outputCollection = output;
    }

    public void Execute()
    {
        var di = new DirectoryInfo(@"C:\temp\testfiles");
        var files = di.GetFiles();
        foreach (FileInfo fi in files)
        {
            using (var fs = new FileStream(fi.FullName, FileMode.Open, FileAccess.Read))
            {
                int b;
                var ms = new MemoryStream();
                while ((b = fs.ReadByte()) != -1)
                {
                    ms.WriteByte((byte)b); //OutOfMemoryException Occurs Here
                }
                var f = new FileHolder();
                f.filename = fi.Name;
                f.contents = ms;

                outputCollection.TryAdd(f);
            }
        }
        outputCollection.CompleteAdding();

    }
}

class SolutionStage
{
    BlockingCollection<FileHolder> inputCollection;
    public SolutionStage(ref BlockingCollection<FileHolder> input)
    {
        inputCollection = input;
    }
    public void Execute()
    {
        FileHolder current;
        while (!inputCollection.IsCompleted)
        {
            if (inputCollection.TryTake(out current))
            {
                using (var fs = new FileStream(String.Format(@"c:\temp\parellel\{0}", current.filename), FileMode.OpenOrCreate, FileAccess.Write))
                {
                    using (MemoryStream ms = (MemoryStream)current.contents)
                    {
                        ms.WriteTo(fs);
                        current.contents.Close();
                    }
                }
            }
        }
    }
}

class FileHolder
{
    public string filename { get; set; }
    public Stream contents { get; set; }
}
}
2个回答

4
主要逻辑似乎没问题,但如果主函数中的空while循环是字面意思,那么你正在浪费不必要的CPU周期。最好使用solTask.Wait()。

但是,如果单个文件可以运行几千兆字节,您仍然需要至少在内存中完全保存1个文件,通常是2个(一个被读取,一个被处理/写入)。

PS1:我刚意识到您没有预分配MemStream。那很糟糕,对于大文件,它将经常重新调整大小,并且这会消耗大量内存。最好使用类似以下内容的东西:

var ms = new MemoryStream(fs.Length);

针对大文件,您需要考虑大对象堆(LOH)。您确定不能将文件分成段并处理吗?

PS2:构造函数参数不需要ref,但这不是问题。


预先分配内存流解决了我的问题,适用于除最大文件之外的所有文件。我将把较大的文件分成块。只是在这个示例应用程序中没有那个功能。谢谢。 - MikeD

0

只是快速查看一下,在你的ServiceStage.Execute方法中,你有

var ms = new MemoryStream();

我没有看到你关闭ms或将其放在using中。你在另一个类中有using。这是需要检查的一件事。


1
我正在尝试通过将MemoryStream放入共享的BlockingCollection中,在类之间共享它。在SolutionStage完成后,该流会被关闭。 - MikeD
不,MemStreams 用于传输数据,并在接收端关闭。由于完全受管理,您实际上不需要 Dispose() 它们。 - H H

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接