我在使用任务并行库、BlockingCollection、ConcurrentQueue和GetConsumingEnumerable时遇到了一个可量化且可重复的问题,尝试创建一个简单的流水线。
简而言之,从一个线程向默认的BlockingCollection(在底层依赖于ConcurrentQueue)添加条目,并不能保证另一个调用GetConsumingEnumerable()方法的线程会将它们从BlockingCollection中弹出。
我创建了一个非常简单的Winforms应用程序来重现/模拟这个问题,它只是将整数打印到屏幕上。
以下是事件的顺序:
- 按下开始按钮 - Timer1滴答一次,ListBox1立即更新为3条消息(添加0、1、2) - ListBox2随后更新为3条消息,间隔1秒
- 处理0 - 处理1 - 处理2
- Timer1滴答一次,ListBox1立即更新为3条消息(添加3、4、5) - ListBox2随后更新为2条消息,间隔1秒
- 处理3 - 处理4 - 处理5没有打印出来...似乎消失了
- 按下停止按钮以防止Timer1继续添加更多消息 - 等待... "处理5"仍然没有出现
如果我再次点击开始,那么timer1将开始添加3个新条目,而并行循环将重新开始打印5、6、7和8。
为什么BlockingCollection.GetConsumingEnumerable()不能保证迭代每个添加到集合中的项?
为什么随后添加更多条目会导致它“解除阻塞”并继续处理?
简而言之,从一个线程向默认的BlockingCollection(在底层依赖于ConcurrentQueue)添加条目,并不能保证另一个调用GetConsumingEnumerable()方法的线程会将它们从BlockingCollection中弹出。
我创建了一个非常简单的Winforms应用程序来重现/模拟这个问题,它只是将整数打印到屏幕上。
Timer1
负责排队工作项...它使用一个并发字典_tracker
,以便知道已经添加到阻塞集合中的内容。Timer2
只是记录BlockingCollection
和_tracker
的计数状态。- START按钮启动
Paralell.ForEach
,它简单地迭代阻塞集合GetConsumingEnumerable()
并开始将它们打印到第二个列表框。 - STOP按钮停止
Timer1
,防止更多条目被添加到阻塞集合中。
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
以下是事件的顺序:
- 按下开始按钮 - Timer1滴答一次,ListBox1立即更新为3条消息(添加0、1、2) - ListBox2随后更新为3条消息,间隔1秒
- 处理0 - 处理1 - 处理2
- Timer1滴答一次,ListBox1立即更新为3条消息(添加3、4、5) - ListBox2随后更新为2条消息,间隔1秒
- 处理3 - 处理4 - 处理5没有打印出来...似乎消失了
- 按下停止按钮以防止Timer1继续添加更多消息 - 等待... "处理5"仍然没有出现
如果我再次点击开始,那么timer1将开始添加3个新条目,而并行循环将重新开始打印5、6、7和8。
为什么BlockingCollection.GetConsumingEnumerable()不能保证迭代每个添加到集合中的项?
为什么随后添加更多条目会导致它“解除阻塞”并继续处理?