有没有类似于异步BlockingCollection<T>的东西?

108
我想要异步地awaitBlockingCollection<T>.Take()的结果,以便不会阻塞线程。寻找类似于这样的任何东西:
var item = await blockingCollection.TakeAsync();

我知道我可以这样做:

var item = await Task.Run(() => blockingCollection.Take());

但这样做会破坏整个想法,因为另一个线程(ThreadPool的线程)会被阻塞。

有没有其他替代方案?


3
如果你使用 await Task.Run(() => blockingCollection.Take()),任务将在其他线程上执行,你的 UI 线程就不会被阻塞。这不就是重点吗? - Selman Genç
11
@Selman22,这不是一个用户界面应用程序,而是一个导出基于“任务(Task)”的API库。例如,可以从ASP.NET中使用它。所涉及的代码在那里无法良好扩展。 - avo
如果在 Run() 之后使用 ConfigureAwait,这仍然会是一个问题吗?[编辑:没关系,我现在明白你的意思了] - MojoFilter
5个回答

133

我知道有四种选择。

第一种是Channels,它提供了一个线程安全的队列,支持异步ReadWrite操作。 Channels高度优化,可以选择在达到阈值时丢弃某些项目。

下一个是TPL Dataflow中的BufferBlock<T>。 如果您只有一个消费者,可以使用OutputAvailableAsyncReceiveAsync,或将其链接到ActionBlock<T>。有关更多信息,请参见我的博客

最后两个是我创建的类型,在我的AsyncEx库中可用。

AsyncCollection<T>BlockingCollection<T>async近似,能够包装并发生产者/消费者集合,例如ConcurrentQueue<T>ConcurrentBag<T>。您可以使用TakeAsync从集合中异步消耗项目。有关更多信息,请参见我的博客

AsyncProducerConsumerQueue<T>是一个更便携的async兼容的生产者/消费者队列。您可以使用DequeueAsync从队列中异步消耗项目。有关更多信息,请查看我的博客

这三个替代方案中的最后三个允许同步和异步放置和取出。


13
当 CodePlex 最终关闭时的 Git Hub 链接:https://github.com/StephenCleary/AsyncEx - Paul
API文档包含方法AsyncCollection.TryTakeAsync,但我在下载的最新版本Nito.AsyncEx.Coordination.dll 5.0.0.0中找不到它。引用的Nito.AsyncEx.Concurrent.dllpackage中不存在。我错过了什么? - Theodor Zoulias
@TheodorZoulias:该方法已在v5中删除。 v5 API文档在此处:http://dotnetapis.com/pkg/Nito.AsyncEx.Coordination/5.0.0/netstandard2.0/doc/Nito.AsyncEx.AsyncCollection'1。 - Stephen Cleary
哦,谢谢。看起来这是枚举集合的最简单和最安全的方法。 while ((result = await collection.TryTakeAsync()).Success) { }。为什么它被移除了? - Theodor Zoulias
2
@TheodorZoulias:因为“尝试”对不同的人意味着不同的事情。我正在考虑重新添加一个“尝试”方法,但它实际上会有不同的语义。同时,我也在考虑在未来版本中支持异步流,这肯定是在支持时最好的消费方法。 - Stephen Cleary

26

或者你可以这样做:

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class AsyncQueue<T>
{
    private readonly SemaphoreSlim _sem;
    private readonly ConcurrentQueue<T> _que;

    public AsyncQueue()
    {
        _sem = new SemaphoreSlim(0);
        _que = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        _que.Enqueue(item);
        _sem.Release();
    }

    public void EnqueueRange(IEnumerable<T> source)
    {
        var n = 0;
        foreach (var item in source)
        {
            _que.Enqueue(item);
            n++;
        }
        _sem.Release(n);
    }

    public async Task<T> DequeueAsync(CancellationToken cancellationToken = default(CancellationToken))
    {
        for (; ; )
        {
            await _sem.WaitAsync(cancellationToken);

            T item;
            if (_que.TryDequeue(out item))
            {
                return item;
            }
        }
    }
}

简单、完全功能的异步FIFO队列。

注意: SemaphoreSlim.WaitAsync 是在 .NET 4.5 中添加的,之前这并不是那么直观的。


4
无限循环 for 有何用?如果信号量被释放,队列至少有一个要出列的项,不是吗? - Blendester
4
如果有多个消费者被阻塞,可能会出现竞争条件。我们不能确定是否至少有两个竞争的消费者,并且我们不知道它们是否都在获取要出列的项目之前醒来。如果发生竞争,在一个没有成功获取出列项的情况下,它将重新进入睡眠状态并等待另一个信号。 - John Leidegren
1
如果两个或更多的消费者通过WaitAsync(),那么队列中就有相同数量的项目,因此它们将始终成功出列。我有什么遗漏吗? - mindcruzer
2
这是一个阻塞集合,TryDequeue 的语义是返回一个值或者根本不返回。技术上讲,如果你有多个读取器,同一个读取器在任何其他读取器完全唤醒之前可以消耗两个(或多个)项目。成功的 WaitAsync 只是一个信号,可能有队列中的项目可以消耗,但并不是保证。 - John Leidegren
1
@AshishNegi 这只是TryDequeue API设计的一个后果。这样,如果由于某种原因队列没有成功出列,它的行为仍然是有定义的。我们可以选择抛出异常。我可能已经为使用监视器的代码适应了这个例子,我们实际上可以唤醒多个消费者。我认为这是那个的遗留问题。我看不出它为什么绝对必要。 - John Leidegren
显示剩余4条评论

6
异步(非阻塞)的BlockingCollection<T>的替代方案是Channel<T>类。它提供了几乎相同的功能,以及一些额外的特性。您可以使用Channel的静态工厂方法来实例化一个Channel<T>,如下所示(演示了所有可用选项的默认值)。
Channel<Item> channel = Channel.CreateUnbounded<Item>(new UnboundedChannelOptions()
{
    SingleWriter = false,
    SingleReader = false,
    AllowSynchronousContinuations = false,
});

Channel<Item> channel = Channel.CreateBounded<Item>(new BoundedChannelOptions(capacity)
{
    SingleWriter = false,
    SingleReader = false,
    AllowSynchronousContinuations = false,
    FullMode = BoundedChannelFullMode.Wait,
});

最显著的区别是Channel<T>暴露了一个Writer和一个Reader外观。因此,您可以将Writer外观传递给扮演生产者角色的方法,类似地,将Reader外观传递给扮演消费者角色的方法。Writer只允许在通道中添加项目并标记其完成。Reader只允许从通道中获取项目并等待其完成。这两个外观仅公开非阻塞API。例如,ChannelWriter<T>具有一个返回ValueTaskWriteAsync方法。如果您有某种原因要在这些API上阻塞,例如如果您的生产者/消费者对中的一个工作线程必须是同步的,则可以使用.AsTask().GetAwaiter().GetResult()进行阻塞,但这不如使用BlockingCollection<T>高效。如果您想了解Channel<T>BlockingCollection<T>类之间的相似性和差异,请参阅this answer
在这个答案的第三版中可以找到一个实现了自定义AsyncBlockingCollection<T>类的基本功能的实例。

4
事后看来,我认为AsyncBlockingCollection这个类名毫无意义。一件事情不可能同时是异步和阻塞的,因为这两个概念是完全相反的! - Theodor Zoulias
2
但是,它仍然是BlockingCollection的异步版本 :) - Stephan Møller

-1

这很简单,但它满足了我的需求。

    public static class BlockingCollectionEx
    {
        public async static Task<T> TakeAsync<T>(this BlockingCollection<T> bc, CancellationToken token, int inner_delay = 10)
        {
            while (!token.IsCancellationRequested)
            {
                if (bc.TryTake(out T el))
                    return el;
                else
                    await Task.Delay(inner_delay);
            }

            throw new OperationCanceledException();
        }
    }

1
不要使用 while (!token.IsCancellationRequested)throw new OperationCanceledException();,而是像 Dejisys 的回答中那样,将令牌直接传递给 TryTake 方法:TryTake(out T el, 0, token)。这样更简单、更好。 - Theodor Zoulias
目前你的回答不够清晰,请编辑并添加更多细节,以帮助其他人理解它如何回答问题。你可以在帮助中心找到有关如何编写好答案的更多信息。 - Community
根据 @TheodorZoulias 的评论,我认为可以将其简化为:while (!bc.TryTake(out T el, inner_delay, token)) ; return el; - Aaron
@Aaron,TakeAsync的意图是非阻塞的。TryTake重载的millisecondsTimeout参数是阻塞的,所以它违背了初衷。 - Theodor Zoulias

-4

如果您不介意进行一些小的修改,您可以尝试这些扩展。

public static async Task AddAsync<TEntity>(
    this BlockingCollection<TEntity> Bc, TEntity item, CancellationToken abortCt)
{
    while (true)
    {
        try
        {
            if (Bc.TryAdd(item, 0, abortCt))
                return;
            else
                await Task.Delay(100, abortCt);
        }
        catch (Exception)
        {
            throw;
        }
    }
}

public static async Task<TEntity> TakeAsync<TEntity>(
    this BlockingCollection<TEntity> Bc, CancellationToken abortCt)
{
    while (true)
    {
        try
        {
            TEntity item;

            if (Bc.TryTake(out item, 0, abortCt))
                return item;
            else
                await Task.Delay(100, abortCt);
        }
        catch (Exception)
        {
            throw;
        }
    }
}

4
所以你引入了人为的延迟来实现异步?但它仍然是阻塞的,对吗? - nawfal

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接