使用BlockingCollection<T>: OperationCanceledException,有更好的方法吗?

30
我正在使用(非常优秀的)BlockingCollection<T> 类型来开发一个高度多线程、高性能的应用程序。每个“批次”都将通过标记取消令牌来结束,这会导致任何等待的 Take 调用都会抛出异常。虽然这很好,但我更希望返回值或输出参数来表示它,因为a) 异常有明显的开销,b) 在调试时,我不想手动关闭特定异常的断点。

实现看起来很强烈,理论上我可以反汇编并重新创建一个没有使用异常的版本,但也许有一种不那么复杂的方法?我可以添加一个 null(或者如果不行,一个占位符)对象到集合中,以表示该过程应该结束,但是还需要一种优雅的中止方式,即唤醒等待的线程,并以某种方式告诉它们发生了什么事情。

所以 - 可以考虑使用其他的集合类型吗?重建自己的集合类型?滥用这个集合类型的一些功能?(一些背景信息:我选择 BlockingCollection<T> 是因为它相对于手动锁定 Queue 来说具有优势。就我所知,线程原语的使用是极好的,在我的情况下,几毫秒的时间和最佳核心使用至关重要。)

编辑:我刚刚为这个问题开了一个赏金。我不认为Anastasiosyal's answer回答了我在评论中提出的问题。我知道这是一个棘手的问题。是否有人能够提供帮助?


CompleteAdding方法对你是否可行? - dtb
3
我想答案很简单:如果这种情况经常发生并影响性能,就不要取消操作。例如,向队列添加一个哨兵对象,并让消费者消耗队列直到该对象,而不是立即中止消费者。 - dtb
你最终解决了这个问题吗? - dragonfly02
@stt106 - 不好意思,很遗憾不是这样的。 - Kieren Johnstone
1
不确定是否相关,但刚刚读到.NET团队已经意识到这一点,并且他们实际上是有意抛出此异常的。有些人要求他们更改此实现,但他们拒绝这样做。https://connect.microsoft.com/VisualStudio/feedback/details/631951/first-chance-operationcanceledexception-in-blockingcollection-trytake - dragonfly02
显示剩余23条评论
5个回答

9
我猜你已经自己查看了BlockingCollection的反射源代码,不幸的是,当CancellationToken传递到BlockingCollection并且被取消时,你会得到OperationCancelledException异常,如下图所示(图像下方有一些解决方法)。
GetConsumingEnumerable在BlockingCollection上调用TryTakeWithNoTimeValidation,从而引发此异常。
解决方法#1
一个潜在的策略是,假设你对生产者和消费者有更多的控制权,而不是将取消标记传递到BlockingCollection中(这将引发此异常),你可以将取消标记传递到你的生产者和消费者中。
如果你的生产者没有生产,你的消费者没有消费,那么你已经有效地取消了操作,而不会引发此异常,并在你的BlockingCollection中传递CancellationToken.None。
特殊情况 当BlockingCollection达到BoundedCapacity或Empty时取消
生产者被阻塞:当BlockingCollection的BoundedCapacity达到时,生产者线程将被阻塞。因此,当尝试取消并且BlockingCollection处于BoundedCapacity时(这意味着你的消费者没有被阻塞,但生产者被阻塞,因为它们无法添加任何其他项目到队列中),那么你需要允许额外的项被消耗(每个生产者线程一个),这将解除生产者的阻塞(因为它们被阻塞在添加到BlockingCollection中),从而允许你的取消逻辑在生产者端起作用。
消费者被阻塞:当你的消费者被阻塞因为队列为空时,你可以在BlockingCollection中插入一个空的工作单元(每个消费者线程一个),以解除消费者线程的阻塞,并允许你的取消逻辑在消费者端起作用。
当队列中有项目且没有达到BoundedCapacity或Empty等限制时,生产者和消费者线程不应该被阻塞。
解决方法#2
使用取消工作单元。
当你的应用程序需要取消时,你的生产者(也许只需要一个生产者就足够了,而其他的则只是取消生产)将生产一个取消工作单元(可以是null,如你所提到的,或者实现标记接口的某个类)。当消费者消耗这个工作单元并检测到它确实是一个取消工作单元时,它们的取消逻辑会启动。要生产的取消工作单元的数量需要等于消费者线程的数量。

需要注意的是,当我们接近BoundedCapacity时需要谨慎,因为这可能意味着一些生产者被阻塞了。根据生产者/消费者的数量,您可以让一个消费者消费直到所有生产者(除了1个)关闭。这确保了没有残留的生产者。当只剩下1个生产者时,您的最后一个消费者可以关闭并且生产者可以停止生成取消单位的工作。


那么是什么会唤醒生产者/消费者线程呢?它们仍然会被阻塞在BlockingCollection调用中吗? - Kieren Johnstone
@KierenJohnstone 很好的评论,我已经编辑了帖子,包括当生产者或消费者被阻塞时处理这些情况。 - Anastasiosyal
如果有人觉得这个回答解决了问题,请告诉我。我不想把悬赏浪费在这上面,我开启悬赏的原因是因为它不起作用。例如,在解决方法中没有描述如何取消添加操作,当队列已满时被阻止。 - Kieren Johnstone

1

您可以通过在最后一个项目上设置标志(添加IsLastItem布尔属性或将其包装)来表示批处理的结束。或者,您可以发送null作为最后一个项目(不确定null是否正确地通过blockingcollection传递)。

如果您可以消除“批处理”概念的需求,则可以创建一个额外的线程来持续从阻塞集合中获取和处理新数据,并且不执行其他任何操作。


当队列因为已满而被阻塞时,它如何处理取消操作?因为它需要... - Kieren Johnstone
我从来没有设置最大容量(冒着内存不足的风险)。但是如果你这样做,我看不到在等待阻塞添加被取消时不出现错误的方法。你可以改用循环方式使用TryAdd(),这样就有了非阻塞添加。 - IvoTops
这是问题的核心,也是为什么这很难解决的原因。循环/轮询=慢。远远比BlockingCollection<>慢得多。我所需要的唯一的东西就是BlockingCollection<>不要使用异常。在添加时它比循环要快得多,但当取消时会使用异常。那就是我不想发生的事情。 - Kieren Johnstone
TryAdd()有一个可选的TimeOut参数。因此,您可以使用一个循环,在其中进行TryAdd()一秒钟(短暂阻塞),并每秒检查取消。您需要重新构造Cancelling,以便在引发Cancel时不进行TryAdding(),否则它可能仍会抛出异常。 - IvoTops
此页面上第二个示例使用了 Timed TryAdd()。http://msdn.microsoft.com/en-us/library/dd997306.aspx - IvoTops

1

我之前做的BlockingQueue怎么样?

http://apichange.codeplex.com/SourceControl/changeset/view/76c98b8c7311#ApiChange.Api%2fsrc%2fInfrastructure%2fBlockingQueue.cs

它应该没有任何异常就可以正常运行。当前队列只是在处理完毕后关闭事件,这可能不是您想要的。您可能希望将一个空值加入队列,并等待所有项目被处理完成。除此之外,它应该能够满足您的需求。

using System.Collections.Generic;
using System.Collections;
using System.Threading;
using System;

namespace ApiChange.Infrastructure
{

    /// <summary>
    /// A blocking queue which supports end markers to signal that no more work is left by inserting
    /// a null reference. This constrains the queue to reference types only. 
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class BlockingQueue<T> : IEnumerable<T>, IEnumerable, IDisposable where T : class
    {
        /// <summary>
        /// The queue used to store the elements
        /// </summary>
        private Queue<T> myQueue = new Queue<T>();
        bool myAllItemsProcessed = false;
        ManualResetEvent myEmptyEvent = new ManualResetEvent(false);

        /// <summary>
        /// Deques an element from the queue and returns it.
        /// If the queue is empty the thread will block. If the queue is stopped it will immedieately
        /// return with null.
        /// </summary>
        /// <returns>An object of type T</returns>      
        public T Dequeue()
        {
            if (myAllItemsProcessed)
                return null;

            lock (myQueue)
            {
                while (myQueue.Count == 0) 
                {
                    if(!Monitor.Wait(myQueue, 45))
                    {
                        // dispatch any work which is not done yet
                        if( myQueue.Count > 0 )
                            continue;
                    }

                    // finito 
                    if (myAllItemsProcessed)
                    {
                        return null;
                    }
                }

                T result = myQueue.Dequeue();
                if (result == null)
                {
                    myAllItemsProcessed = true;
                    myEmptyEvent.Set();
                }
                return result;
            }
        }

        /// <summary>
        /// Releases the waiters by enqueuing a null reference which causes all waiters to be released. 
        /// The will then get a null reference as queued element to signal that they should terminate.
        /// </summary>
        public void ReleaseWaiters()
        {
            Enqueue(null);
        }

        /// <summary>
        /// Waits the until empty. This does not mean that all items are already process. Only that
        /// the queue contains no more pending work. 
        /// </summary>
        public void WaitUntilEmpty()
        {
            myEmptyEvent.WaitOne();
        }

        /// <summary>
        /// Adds an element of type T to the queue. 
        /// The consumer thread is notified (if waiting)
        /// </summary>
        /// <param name="data_in">An object of type T</param>      
        public void Enqueue(T data_in)
        {
            lock (myQueue)
            {
                myQueue.Enqueue(data_in);
                Monitor.PulseAll(myQueue);
            }
        }

        /// <summary>
        /// Returns an IEnumerator of Type T for this queue
        /// </summary>
        /// <returns></returns>    
        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            while (true)
            {
                T item = Dequeue();
                if (item == null)
                    break;
                else
                    yield return item;
            }
        }

        /// <summary>
        /// Returns a untyped IEnumerator for this queue
        /// </summary>
        /// <returns></returns>     
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }


        #region IDisposable Members

        /// <summary>
        /// Closes the EmptyEvent WaitHandle.
        /// </summary>
        public void Dispose()
        {
            myEmptyEvent.Close();
        }

        #endregion
    }
}

1
抱歉,与我所需的完全不符。Monitor.Wait比BlockingCollection慢得多,如果它必须等待一个项目,甚至会增加最小延迟45毫秒。在这里性能不够好(除非我错过了什么?)。您没有明确说明如何取消(使用布尔标志吗?)并且队列长度不固定!如果队列已满,我需要Enqueue阻塞..但可取消!(这是整个重点!) - Kieren Johnstone
嗨Kiren,你测量了吗?45毫秒不是最小延迟,而恰恰相反。取消操作是通过Enque(null)完成的,这将导致处理所有元素,直到出队空元素。你是对的,它没有界限,但在enque操作期间添加检查很容易。如果你经常进入饱和状态,那么无锁集合和显式锁定集合之间的差异将不再重要,因为你正在等待处理器处理项目。 - Alois Kraus
嗨Alois,如果您的Enqueue因队列已满而阻塞,则无法再加入另一个null。enqueue操作需要被阻止,但也需要可取消。您有什么建议吗? - Kieren Johnstone
我会在我的Enqueue方法中进行null检查,并且无论当前队列计数如何都会将其入队。我知道这听起来很明显,但这就是我会这样做的方式。如果队列中的元素数量比最大计数多一个,那么不会发生任何问题(至少不会尝试一些非常奇怪的东西)。 - Alois Kraus
但是 null 将会在队列的末尾。因此它不会立即取消,而是在取消之前处理整个队列(在这种情况下有 20,000 个项目)。那不是一个“取消”操作,对吧? - Kieren Johnstone
您可以添加一个ClearAndCancel方法,该方法在队列上加锁,清空队列,然后将null引用入队。效果将是您想要的那个。额外的好处是您的对象引用将更快地被GC清理。 - Alois Kraus

0

Kieren,

经过我的检查,我个人不知道有哪些线程安全类型适用于生产者消费者模式,可以完全满足您的要求。我不认为这是一种竞争性的解决方案,但建议您使用几个扩展方法来装饰BlockingCollection<T>,这将使您能够提供任何内置或自定义类型,而不是默认的CancellationToken

阶段1:

以下是默认方法列表,它们使用底层的TryAddWithNoTimeValidation方法将元素添加到队列中。

public void Add(T item){
      this.TryAddWithNoTimeValidation(item, -1, new CancellationToken());
}

public void Add(T item, CancellationToken cancellationToken){
      this.TryAddWithNoTimeValidation(item, -1, cancellationToken);
    }

public bool TryAdd(T item){
      return this.TryAddWithNoTimeValidation(item, 0, new CancellationToken());
    }

public bool TryAdd(T item, TimeSpan timeout){
      BlockingCollection<T>.ValidateTimeout(timeout);
      return this.TryAddWithNoTimeValidation(item, (int) timeout.TotalMilliseconds, new CancellationToken());
    }

public bool TryAdd(T item, int millisecondsTimeout){
      BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
      return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, new           CancellationToken());
}

public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken){
 BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
 return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
}

现在您可以为您感兴趣的任何/所有方法提供扩展。

第二阶段:

现在您可以引用您实现的TryAddWithNoTimeValidation而不是默认值。

我可以给您提供一个替代版本的TryAddWithNoTimeValidation,它可以安全地继续执行而不会抛出OperationCancellation异常。


1
说我可以使用扩展方法有点像是一种编码建议。我想要的是如何在不使用取消令牌的情况下实现高性能。这个答案只是告诉我如何使用扩展方法,而我已经知道如何使用了。 - Kieren Johnstone
2
@Bobb - 嗯,我认为我说得非常实际。你的两个代码片段都会在取消方法内导致.NET抛出异常。抛出异常是一件实际发生的事情,而这正是我想要消除的部分。这很实用,不是吗? - Kieren Johnstone
1
a) 制作自己的基准测试,并认识到异常更快。 b) 理解异常捕获发生一次,而布尔标志检查在每个项迭代中都会发生的事实。 c) 重新设计您的工作程序,使其具有较少的异常,并使用异常与布尔标志节省下来的计算资源,而不是 if(flag) 浪费...。 - Boppity Bop
2
我来说明一下:在内部,BlockingCollection<>使用异常。我无法“承认异常更快”,因为我无法修改BlockingCollection<>的源代码。我无法重新设计我的工作程序以减少异常数量,因为不是我的工作程序抛出异常,而是BlockingCollection<>在内部抛出异常。这就是我的问题所在。你如何建议我关闭BlockingCollection<>的异常,以便我可以在没有异常的情况下进行测量? - Kieren Johnstone
2
@Bob:抛出异常比简单的布尔检查慢得多。没有争议的必要。如果你测量过抛出异常比布尔检查更快,那么我知道你的性能测试和从中得出的结论是有缺陷的。你不需要相信我的话,只需启用反汇编功能,在 if(flag) return else xxxx 和 if(flag) throw new Exception() else xxxx 之间进行调试比较即可。在抛出异常和堆栈展开期间,执行了比布尔检查更多的汇编指令(包括其中一些布尔检查)... - Alois Kraus
显示剩余2条评论

0

我的建议是通过封装异步队列来实现此功能,例如BufferBlock<T>类,该类来自TPL Dataflow库。这个类是一个线程安全的容器,旨在用于生产者-消费者场景,并支持回压(BoundedCapacity),就像BlockingCollection<T>类一样。异步意味着相应的Add/Take方法(SendAsync/ReceiveAsync)返回任务。这些任务将取消事件存储为内部状态,可以使用IsCanceled属性查询,从而避免在内部抛出异常。通过使用异常抑制继续(ContinueWith)等待任务,也可以避免传播此状态。以下是实现:

/// <summary>
/// A thread-safe collection that provides blocking and bounding capabilities.
/// The cancellation is propagated as a false result, and not as an exception.
/// </summary>
public class CancellationFriendlyBlockingCollection<T>
{
    private readonly BufferBlock<T> _bufferBlock;

    public CancellationFriendlyBlockingCollection()
    {
        _bufferBlock = new BufferBlock<T>();
    }

    public CancellationFriendlyBlockingCollection(int boundedCapacity)
    {
        _bufferBlock = new BufferBlock<T>(new() { BoundedCapacity = boundedCapacity });
    }

    public bool TryAdd(T item, CancellationToken cancellationToken = default)
    {
        if (cancellationToken.IsCancellationRequested) return false;
        if (_bufferBlock.Post(item)) return true;
        Task<bool> task = _bufferBlock.SendAsync(item, cancellationToken);
        WaitNoThrow(task);
        if (!task.IsCompletedSuccessfully) return false;
        return task.Result;
    }

    public bool TryTake(out T item, CancellationToken cancellationToken = default)
    {
        if (cancellationToken.IsCancellationRequested) { item = default; return false; }
        if (_bufferBlock.TryReceive(out item)) return true;
        Task<T> task = _bufferBlock.ReceiveAsync(cancellationToken);
        WaitNoThrow(task);
        if (!task.IsCompletedSuccessfully) return false;
        item = task.Result; return true;
    }

    public IEnumerable<T> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        while (TryTake(out var item, cancellationToken)) yield return item;
    }

    public void CompleteAdding() => _bufferBlock.Complete();
    public bool IsCompleted => _bufferBlock.Completion.IsCompleted;
    public int Count => _bufferBlock.Count;

    // Wait the task to complete without throwing exceptions
    private static void WaitNoThrow(Task task)
    {
        if (task.IsCompleted) return;
        task.ContinueWith(_ => { }, default,
            TaskContinuationOptions.ExecuteSynchronously |
            TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Wait();
        Debug.Assert(task.IsCompleted);
    }
}

性能:在我的PC上(单线程),CancellationFriendlyBlockingCollection.TryTake方法可以在每秒约15,000,000次的频率下循环调用已取消的CancellationToken。相比之下,在相同条件下,BlockingCollection<T>.Take的频率约为每秒20,000次。

您可能会想用更现代的异步队列(如Channel<T>)替换BufferBlock<T>。在这种情况下,请务必先阅读this question,以了解该类在特定条件下的泄漏行为。


请注意,原始实现尝试使用 Stephen Toub 的 SuppressException 结构同步阻塞。这是一个错误,因为此结构只能与 await 一起使用。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接