我有一个小的样例应用程序,我正在努力尝试使用一些新的.Net 4.0并行扩展(它们非常好)。我遇到了一个(可能非常愚蠢的)OutOfMemoryException问题。我想要将这个示例插入到我的主应用程序中,该应用程序读取一些数据和大量文件,对其进行一些处理,然后将其写出到某个地方。我遇到了一些文件变得越来越大的问题(可能是GB级别),并且担心内存问题,所以我想并行化处理,这就导致我走上了这条路。
现在下面的代码会在较小的文件上引发OOME异常,我认为我只是漏掉了一些东西。它会很好地并行读取10-15个文件并将它们写出,但是在下一个文件上卡住了。看起来它已经读取并写入了约650MB的数据。需要另一双眼睛来帮忙解决。
我从FileStream中读取到MemorySteam,因为这是主应用程序所需的内容,我只是在某种程度上尝试复制它。它从各种地方读取数据和文件,并在MemoryStreams上工作。
这是使用.Net 4.0 Beta 2、VS 2010。
现在下面的代码会在较小的文件上引发OOME异常,我认为我只是漏掉了一些东西。它会很好地并行读取10-15个文件并将它们写出,但是在下一个文件上卡住了。看起来它已经读取并写入了约650MB的数据。需要另一双眼睛来帮忙解决。
我从FileStream中读取到MemorySteam,因为这是主应用程序所需的内容,我只是在某种程度上尝试复制它。它从各种地方读取数据和文件,并在MemoryStreams上工作。
这是使用.Net 4.0 Beta 2、VS 2010。
namespace ParellelJob
{
class Program
{
BlockingCollection<FileHolder> serviceToSolutionShare;
static void Main(string[] args)
{
Program p = new Program();
p.serviceToSolutionShare = new BlockingCollection<FileHolder>();
ServiceStage svc = new ServiceStage(ref p.serviceToSolutionShare);
SolutionStage sol = new SolutionStage(ref p.serviceToSolutionShare);
var svcTask = Task.Factory.StartNew(() => svc.Execute());
var solTask = Task.Factory.StartNew(() => sol.Execute());
while (!solTask.IsCompleted)
{
}
}
}
class ServiceStage
{
BlockingCollection<FileHolder> outputCollection;
public ServiceStage(ref BlockingCollection<FileHolder> output)
{
outputCollection = output;
}
public void Execute()
{
var di = new DirectoryInfo(@"C:\temp\testfiles");
var files = di.GetFiles();
foreach (FileInfo fi in files)
{
using (var fs = new FileStream(fi.FullName, FileMode.Open, FileAccess.Read))
{
int b;
var ms = new MemoryStream();
while ((b = fs.ReadByte()) != -1)
{
ms.WriteByte((byte)b); //OutOfMemoryException Occurs Here
}
var f = new FileHolder();
f.filename = fi.Name;
f.contents = ms;
outputCollection.TryAdd(f);
}
}
outputCollection.CompleteAdding();
}
}
class SolutionStage
{
BlockingCollection<FileHolder> inputCollection;
public SolutionStage(ref BlockingCollection<FileHolder> input)
{
inputCollection = input;
}
public void Execute()
{
FileHolder current;
while (!inputCollection.IsCompleted)
{
if (inputCollection.TryTake(out current))
{
using (var fs = new FileStream(String.Format(@"c:\temp\parellel\{0}", current.filename), FileMode.OpenOrCreate, FileAccess.Write))
{
using (MemoryStream ms = (MemoryStream)current.contents)
{
ms.WriteTo(fs);
current.contents.Close();
}
}
}
}
}
}
class FileHolder
{
public string filename { get; set; }
public Stream contents { get; set; }
}
}