BlockingCollection(T).GetConsumingEnumerable()如何会抛出OperationCanceledException?

11

我正在使用BlockingCollection来实现一个任务调度器,基本上是这样的:

public class DedicatedThreadScheduler : TaskScheduler, IDisposable
{
    readonly BlockingCollection<Task> m_taskQueue = new BlockingCollection<Task>();

    readonly Thread m_thread;


    public DedicatedThreadScheduler()
    {
        m_thread = new Thread(() =>
        {
            foreach (var task in m_taskQueue.GetConsumingEnumerable())
            {
                TryExecuteTask(task);
            }
            m_taskQueue.Dispose();
        });
        m_thread.Start();
    }

    public void Dispose()
    {
        m_taskQueue.CompleteAdding();
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
        return Thread.CurrentThread == m_thread && TryExecuteTask(task);
    }

    (...)
}

我只看到了一次,无法重现此问题,但在 foreach(TryTakeWithNoTimeValidation)的某个点上,我收到了一个OperationCanceledException异常。 我不明白,因为我正在使用没有传递CancellationToken参数的重载方法,并且文档中说明它只会抛出ObjectDisposedException异常。这个异常意味着什么?是阻塞集合被终结了吗?还是队列中的任务被取消了?

更新:调用堆栈如下:

mscorlib.dll!System.Threading.SemaphoreSlim.WaitUntilCountOrTimeout(int millisecondsTimeout, uint startTime, System.Threading.CancellationToken cancellationToken) + 0x36 bytes 
mscorlib.dll!System.Threading.SemaphoreSlim.Wait(int millisecondsTimeout, System.Threading.CancellationToken cancellationToken) + 0x178 bytes   
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.TryTakeWithNoTimeValidation(out System.Threading.Tasks.Task item, int millisecondsTimeout, System.Threading.CancellationToken cancellationToken, System.Threading.CancellationTokenSource combinedTokenSource) Line 710 + 0x25 bytes   C#
System.dll!System.Collections.Concurrent.BlockingCollection<System.Threading.Tasks.Task>.GetConsumingEnumerable(System.Threading.CancellationToken cancellationToken) Line 1677 + 0x18 bytes    C#

1
你检查过堆栈跟踪了吗?异常可能发生在BlockingCollection调用的方法中。 - Kendall Frey
是的,正如我在描述中添加的那样,它发生在TryTakeWIthNoTimeValidation中。就我所看到的源代码而言,它只应在传递了CancellationToken的情况下抛出OperationCanceledException,但我使用了不带有此参数的重载函数... - Asik
已更新问题并附上调用堆栈。 - Asik
Dispose()方法和foreach循环之间没有互锁。通常情况下都能正常工作,但如果由于某种原因过早调用Dispose()方法,则可能会出现问题。 - Hans Passant
@HansPassant 这应该没问题,因为阻塞集合的Dispose()方法会在同一线程上的foreach之后被调用。该类的Dispose()方法仅调用CompleteAdding(),以防止新任务被排队。 - Asik
显示剩余5条评论
3个回答

16
这是一个旧问题,但我会为任何将来遇到此问题的人添加完整答案。Eugene提供的答案部分正确;当时你一定已经配置了Visual Studio以在处理框架异常时中断调试。
然而,你中断调试的实际原因是BlockingCollection<T>.CompleteAdding()代码看起来像这样:
    public void CompleteAdding()
    {
        int num;
        this.CheckDisposed();
        if (this.IsAddingCompleted)
        {
            return;
        }
        SpinWait wait = new SpinWait();
    Label_0017:
        num = this.m_currentAdders;
        if ((num & -2147483648) != 0)
        {
            wait.Reset();
            while (this.m_currentAdders != -2147483648)
            {
                wait.SpinOnce();
            }
        }
        else if (Interlocked.CompareExchange(ref this.m_currentAdders, num | -2147483648, num) == num)
        {
            wait.Reset();
            while (this.m_currentAdders != -2147483648)
            {
                wait.SpinOnce();
            }
            if (this.Count == 0)
            {
                this.CancelWaitingConsumers();
            }
            this.CancelWaitingProducers();
        }
        else
        {
            wait.SpinOnce();
            goto Label_0017;
        }
    }

请注意这些特别的行:

if (this.Count == 0)
{
    this.CancelWaitingConsumers();
}

调用此方法的代码:

private void CancelWaitingConsumers()
{
    this.m_ConsumersCancellationTokenSource.Cancel();
}

因此,即使您的代码中没有明确使用 CancellationToken,但在调用 CompleteAdding() 时,如果 BlockingCollection 为空,则底层框架代码会抛出 OperationCanceledException。它这样做是为了向 GetConsumingEnumerable() 方法发出退出信号。异常由框架代码处理,如果您没有将调试器配置为拦截它,您可能不会注意到它。

您无法复制它的原因是因为您将对 CompleteAdding() 的调用放置在您的 Dispose() 方法中。因此,它会在 GC 的任意时刻被调用。


1
这是一个很好的答案!我猜下一个显而易见的问题是,我需要处理这个异常吗?如果需要,该怎么处理?当生产者数量众多但消费者数量较少时,这种情况很可能发生。 - dragonfly02
1
不需要处理它。它只是作为内部信号机制使用。只有在调试器配置拦截它时,您才会意识到它的存在。 - 0b101010

2
我只能推测,但我认为你可能正在经历Stephen Toub在他的博客文章“Task.Wait and 'Inlining'”和Jon Skeet在此处描述的任务内联场景。
你的TaskScheduler.TryExecuteTaskInline的实现是什么样子的?为了防止意外的任务内联,请始终返回false
override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
    return false;
}

1
添加了TryExecuteTaskInline的实现到问题中。我会尝试看看当调用该方法时是否会出现此问题,感谢您的建议。 - Asik

2

有时候在GetConsumingEnumerable枚举器的MoveNext()方法内部会发生异常,但这是一个已处理的异常,所以通常不应该看到它。

也许你的调试器配置成在已处理的异常上中断(在Visual Studio中,这些选项在Debug/Exceptions菜单中),在这种情况下,调试器甚至可能在.NET框架函数内部发生异常时中断。


这不是全部的故事。我会在另一个回答中说明真正的原因。 - 0b101010

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接