取消 BlockingCollection.GetConsumingEnumerable() 并处理剩余内容

7
我有一个进程生成工作,第二个进程使用 BlockingCollection<> 消费这些工作。当我关闭程序时,我需要让消费者停止消费工作,但我仍然需要快速记录尚未被消费的挂起工作。
目前,我的消费者生成一个线程,该线程具有一个 foreach (<object> in BlockingCollection.GetConsumingEnumerable()) 循环。当我停止程序时,我的生产者调用 Consumer.BlockingCollection.CompleteAdding()。结果我发现我的消费者继续处理队列中的所有内容。
搜索了这个问题告诉我,我需要使用一个 CancellationToken。所以我试着用它:
private void Process () { // This method runs in a separate thread
    try {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable(CancellationToken)) {
            // Consume
        }
    }
    catch (OperationCancelledException) {
        foreach (*work* in BlockingCollection.GetConsumingEnumerable()) {
            // quickly log
        }
    }
}

我的制作人有:

private CancellationTokenSource StopFlag = new CancellationTokenSource ();
MyConsumer.CancellationToken = StopFlag.Token;
// Make the consumer spawn it's consuming thread...
StopFlag.Cancel ();
MyConsumer.BlockingCollection.CompleteAdding ();

当我尝试这样做时,没有迹象表明OperationCancelledException曾经发生过。 这个问题试图解释取消令牌的使用,但似乎没有正确使用。(论据:如果它起作用,则“足够正确”。)而这个问题似乎是我的问题的完全重复,但没有示例。(同这里。)
因此,再次强调:如何在需要使用不同方法处理取消后队列中剩余项的情况下,正确使用CancellationToken和BlockingCollection.GetConsumingEnumerable()?
(我认为我的问题集中在正确使用CancellationToken上。我的所有测试都没有表明进程实际上被取消了。(StopFlag.IsCancellationRequested始终等于false。))

GetConsumingEnumerable会等待更多的项被添加。在catch中使用集合本身的foreach - Dustin Kingen
我很想尝试一下,Romoku!我该怎么做呢?我使用了简单的ctor:public BlockingCollection<cOrder> OrderQueue = new BlockingCollection<cOrder> (); - Jason
你需要检查取消标记。如果CancellationToken.IsCancellationRequested为true,则仍需处理但记录而非处理。http://msdn.microsoft.com/en-us/library/dd997289.aspx - paparazzo
3个回答

5
当您将CancellationToken传递到GetConsumingEnumerable时,如果请求取消,则不会抛出异常,它只会停止输出项目。而不是捕获异常,只需检查令牌即可:
foreach (var item in BlockingCollection.
    GetConsumingEnumerable(CancellationToken))
{
    //consume item
}
if (CancellationToken.IsCancellationRequested)
    foreach (var item in BlockingCollection)
    {
        //log item
    }

请注意,如果请求取消并且有可能尚未调用CompletedAdding,那么您应该只迭代集合,而不是调用GetConsumingEnumerable。如果您知道在操作被取消时生产者将完成添加,则这不是问题。


我尝试了在循环外(就像你所做的那样)和循环内使用if()语句,但两者都没有起作用。我甚至没有收到取消标志正在发挥作用的通知。 - Jason
@Jason 那么你的令牌没有被取消,这就是你的问题。确保在处理完项目之前实际取消了令牌。 - Servy
@Jason 根据你的使用代码,你是在完成添加之前才取消。很有可能在消费代码再次运行时,两个操作都已经完成了,如果你的消费者比生产者更快(因此没有剩余物品),它很容易在取消令牌指示应该被取消之前完成消费所有物品。如果你想测试这种情况,你应该让你的示例工作得不可能让消费者在正常消费之前被取消。 - Servy
我有模拟工作的Sleep()调用。每个“处理”步骤都会记录已处理的项目和队列中剩余项目的数量(BlockingCollection.Count)。我的原始代码显示计数器逐渐减少到零。但是后续测试显示计数器仅停止,但我从未收到“操作已取消”的消息。 - Jason
@Jason,所以您已经验证了集合不为空,但是在foreach循环结束后,IsCancellationRequested仍然为false?我觉得这非常奇怪。如果是这种情况,您能否提供一个完整的代码示例来复制问题? - Servy
3
当请求取消时,GetConsumingEnumerable将抛出异常。 - Darragh

3
我的问题在于我尝试取消操作的方式。我把 CancellationTokenSource 放在消费者中,而不是让生产者拥有它。
public class cProducer {
    private cConsumer myConsumer = new cConsumer ();

    public void onStart () {
        myConsumer.OnStart ();
    }

    public void onStop () {
        myConsumer.OnStop ();
    }

    public void OnOrderReceived (cOrder newOrder) {
        myConsumer.orderQueue.Add (cOrder);
    }
}

public class cConsumer {
    private CancellationTokenSource stopFlag;
    public BlockingCollection<cOrder> orderQueue = new BlockingCollection<cOrder> ();
    private Task processingTask;

    public void OnStart () {
        stopFlag = new CancellationTokenSource ();
        processingTask = Task.Factory.StartNew (() => Process ());
    }

    public void OnStop () {
        stopFlag.Cancel ();
        orderQueue.CompleteAdding ();
        processingTask.Wait ();
    }

    private void Process () {
        try {
            foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) {
                // process
            }
        }
        catch (OperationCanceledException) {
            foreach (cOrder cancelledOrder in orderQueue.GetConsumingEnumerable ()) {
                // log it
            }
        }
    }
}

你的OperationCanceledException处理程序中为什么有一个GetConsumingEnumerable调用?此外,为什么这不会阻止processingTask.Wait()调用完成。 - priehl
https://msdn.microsoft.com/en-us/library/dd287086(v=vs.110).aspx说:“在将集合标记为完成添加之后,不允许向集合添加内容,并且在集合为空时尝试移除元素将不会等待。”所以我想现在这就有意义了,但您也可以像在catch块中枚举任何IEnumerable一样枚举它。 - priehl

0
我曾经遇到过完全相同的问题。当我取消程序时,BlockingCollection似乎被死锁了。OperationCanceledException没有传播到调用方法。我发现我的生产者没有考虑取消令牌,因此正在等待队列被消耗。我所要做的就是在Add()方法中提供取消令牌。为了将其转换为上面Jason的解决方案,我所做的就是这样:
public void OnOrderReceived (cOrder newOrder, CancellationToken cancellationToken) 
{
    myConsumer.orderQueue.Add (cOrder, cancellationToken);
}

Process() 方法不需要任何 try-catch 子句。但是,如果进程被取消,您需要抛出异常:

private void Process () 
{
    foreach (cOrder newOrder in orderQueue.GetConsumingEnumerable (stopFlag.Token)) 
    {
        // process
        stopFlag.Token.ThrowIfCancellationRequested();
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接