C#并行扩展Task.Factory.StartNew在错误的对象上调用方法

3

好的,我在使用System.Threading.Tasks中的.Net 4.0并行扩展进行实验。我发现了一些看起来很奇怪的行为,但我认为我只是做错了什么。我有一个接口和几个实现类,它们对于这个例子来说非常简单。

interface IParallelPipe
{
    void Process(ref BlockingCollection<Stream> stream, long stageId);
}

class A:IParallelPipe
{
    public void Process(ref BlockingCollection<Stream> stream, long stageId)
    {
        //do stuff
    }
}

class B:IParallelPipe
{
    public void Process(ref BlockingCollection<Stream> stream, long stageId)
    {
        //do stuff
    }
}

然后我有一个类来开始这些操作。在这里出现了问题。我基本上通过传入的类型获取关于要调用的实现类的信息,然后调用工厂来实例化它,然后使用它创建任务并启动它。如下所示:

BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();                   
foreach (Stage s in pipeline.Stages) 
{
    IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
    Task.Factory.StartNew(() => p.Process(ref bcs, s.id)); 
}

在我的样例中,每次运行时pipeline.Stages包含两个元素,一个被实例化为类A,另一个被实例化为类B。这很好,在调试器中我看到p带有两种不同类型。然而,类B从未被调用,相反我得到了两次调用A.Process(...)方法。两者都包含传入的stageId(即两次调用具有不同的stageIds)。
现在,如果我稍微分离一下内容,仅供测试,我可以通过像这样做来使事情正常工作:
BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();                   
A a = null;
B b = null;
foreach (Stage s in pipeline.Stages) 
{
    IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
    if(p is A)
        a = p;
    else
        b = p;
}
Task.Factory.StartNew(() => a.Process(ref bcs, idThatINeed)); 
Task.Factory.StartNew(() => b.Process(ref bcs, idThatINeed));

这将调用适当的类!

有什么想法吗?

1个回答

4
您所描述的行为对我来说似乎很奇怪-我期望使用正确的实例,但可能是错误的阶段ID-foreach变量捕获问题。变量s被捕获了,当任务工厂评估闭包时,s的值已经改变。
这绝对是您代码中的问题,但它并不能解释为什么您会看到问题。只是为了检查,您确实在循环内部声明了p,而不是在外部声明吗?如果您在循环外部声明了p,那就可以解释一切。
以下是解决捕获问题的方法:
BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();
foreach (Stage s in pipeline.Stages) 
{
    Stage copy = s;
    IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
    Task.Factory.StartNew(() => p.Process(ref bcs, copy.id)); 
}

请注意,我们只是在循环内部取一个副本,并捕获该副本,以便每次都获得不同的变量“实例”。
或者,我们可以只捕获ID,因为这是我们所需要的。
BlockingCollection<Stream> bcs = new BlockingCollection<Stream>();
foreach (Stage s in pipeline.Stages) 
{
    long id = s.id;
    IParallelPipe p = (IParallelPipe)Factory.GetPipe(s.type);
    Task.Factory.StartNew(() => p.Process(ref bcs, id)); 
}

如果这并没有帮助,您能否发布一个简短但完整的程序来展示问题?这将使追踪问题变得更加容易。

我也有同样的想法,“这与p无关”,所以我没有想到foreach问题。然而,我添加了“阶段复制”,问题得以解决。很有趣。将进行更多的调查... - MikeD
是的,我在循环内部声明p。 - MikeD

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接