为什么在迭代GetConsumingEnumerable()时,不能完全清空底层的阻塞集合?

19
我在使用任务并行库、BlockingCollection、ConcurrentQueue和GetConsumingEnumerable时遇到了一个可量化且可重复的问题,尝试创建一个简单的流水线。
简而言之,从一个线程向默认的BlockingCollection(在底层依赖于ConcurrentQueue)添加条目,并不能保证另一个调用GetConsumingEnumerable()方法的线程会将它们从BlockingCollection中弹出。
我创建了一个非常简单的Winforms应用程序来重现/模拟这个问题,它只是将整数打印到屏幕上。
  • Timer1负责排队工作项...它使用一个并发字典_tracker,以便知道已经添加到阻塞集合中的内容。
  • Timer2只是记录BlockingCollection_tracker的计数状态。
  • START按钮启动Paralell.ForEach,它简单地迭代阻塞集合GetConsumingEnumerable()并开始将它们打印到第二个列表框。
  • STOP按钮停止Timer1,防止更多条目被添加到阻塞集合中。
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }

以下是事件的顺序:
- 按下开始按钮 - Timer1滴答一次,ListBox1立即更新为3条消息(添加0、1、2) - ListBox2随后更新为3条消息,间隔1秒
- 处理0 - 处理1 - 处理2
- Timer1滴答一次,ListBox1立即更新为3条消息(添加3、4、5) - ListBox2随后更新为2条消息,间隔1秒
- 处理3 - 处理4 - 处理5没有打印出来...似乎消失了
- 按下停止按钮以防止Timer1继续添加更多消息 - 等待... "处理5"仍然没有出现

Missing Entry

你可以看到并发字典仍然跟踪着一个尚未处理并从_tracker中移除的项目。
如果我再次点击开始,那么timer1将开始添加3个新条目,而并行循环将重新开始打印5、6、7和8。

Entry returned after subsequent items shoved in behind it

我完全不明白为什么会发生这种情况。显然,再次调用start会调用一个新的任务,该任务会调用一个并行循环,并重新执行GetConsumingEnumerable(),这样就可以神奇地找到丢失的条目...
为什么BlockingCollection.GetConsumingEnumerable()不能保证迭代每个添加到集合中的项?
为什么随后添加更多条目会导致它“解除阻塞”并继续处理?

1
谢谢大家。你们都指引了我正确的方向,让我找到了这个答案。@Svick 这可能就是为什么在 .net4.5 beta 中没有这个问题的原因:http://connect.microsoft.com/VisualStudio/feedback/details/674705/blockingcollection-getconsumingenumerable-and-parallel-foreach-hang ,MS 并行团队的 Steven Toup 实际上已经在博客中提到了这个问题。http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx - Eoin Campbell
4个回答

20

不能在Parallel.ForEach()中使用GetConsumingEnumerable()

请使用GetConsumingPartitioner来代替。

在博客文章中,您还将了解为什么不能使用GetConsumingEnumerable()

默认情况下,由Parallel.ForEach和PLINQ使用的分区算法使用分块以最小化同步成本:它不会为每个元素锁定一次,而是锁定、获取一组元素(一个块),然后释放锁定。

即,在收到工作项组之前,Parallel.ForEach等待。这正是你的实验展示的。


2
TPL附加组件采用MS-LPL许可证,这意味着如果您使用它们,您将把整个派生作品锁定在Windows平台上。它不是OSI批准的许可证... - Daniel Crenna
@Daniel,好的了解。谢谢你的更新。你知道非Windows TPL是否也使用分组分区器作为默认值吗? - adrianm

8

1
我确认自.NET 4.5起,这是首选选项。 - MaYaN
1
被采纳的答案很有信息量。这一个看起来更加更新。 - Ho Ho Ho

2
我无法复制您的行为,但我用一个简单的控制台应用程序进行了基本相同的操作(在运行 .Net 4.5 beta 的情况下,这可能会有所不同)。但我认为发生这种情况的原因是 Parallel.ForEach() 尝试通过将输入集合分成块来优化执行。而对于您的可枚举对象,直到添加更多项到集合中,才能创建一个块。更多信息请参见 MSDN 上的 PLINQ 和 TPL 的自定义分区器
要解决此问题,请不要使用 Parallel.ForEach()。如果仍然想并行处理项目,则可以在每次迭代中启动一个 Task

1
感谢您的查看。Parallel.ForEach提供的一个“好”特性是它允许我限制MaxDegreesOfParallelism(在真实世界中,这相当于调用WCF服务)。如果我只是通过普通的foreach循环在每次迭代中新建任务,您会如何建议我限制最大并发任务数? - Eoin Campbell

0

为了澄清,在你能够在执行Parallel.foreach之前调用BlockingCollection的.CompleteAdding()方法的情况下,上述问题将不会成为问题。我已经多次成功地使用这两个对象。

此外,您可以在调用CompleteAdding()后重新设置BlockingCollection以在需要时添加更多项(_entries = new BlockingCollection();)

如果您多次单击“开始”和“停止”按钮,则将上面的单击事件代码更改如下,可以解决缺少条目的问题并使其按预期工作:

private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
    timer1.Stop();
    timer1.Enabled = false;
>>>>_entries.CompleteAdding();
>>>>_entries = new BlockingCollection<int>();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接