在.NET中,管理单独线程的任务队列的最佳方法是什么?

47

我知道异步编程多年来已经发生了很多变化。我有点尴尬,我才34岁就让自己变得这么生疏,但我指望 StackOverflow 帮助我跟上潮流。

我想做的是在一个单独的线程上管理“工作”队列,但只处理一个项目。我想在此线程上发布工作,而且不需要将任何东西传回给调用者。当然,我可以简单地启动一个新的 Thread 对象并让它循环使用共享的 Queue 对象,使用睡眠、中断、等待句柄等等。但我知道事情已经变得更好了。我们有 BlockingCollectionTaskasync/await,更不用说可能抽象了很多的 NuGet 包。

我知道“最好的...”这样的问题通常会被人所反感,所以我会重新表述一下,“当前推荐的...”用内置 .NET 机制进行此类操作的方法。但是如果第三方的 NuGet 包能够简化很多事情,那也一样好。

我考虑过使用固定最大并发数为1的 TaskScheduler 实例,但现在可能有更简洁的方法。

背景

具体来说,在这种情况下我想要做的是在 Web 请求期间排队 IP 地理位置任务。相同的 IP 可能会多次被排队进行地理定位,但是任务将会知道如何检测并且如果已经解决了就尽早跳过它。但请求处理程序只会把这些 () => LocateAddress(context.Request.UserHostAddress) 调用扔进队列中,然后让 LocateAddress 方法来处理重复工作的检测。我使用的地理位置 API 不喜欢遭受大量请求,这就是为什么我想将它限制在同时进行单个任务。但是,如果可以通过简单的参数更改轻松地扩展到更多并发任务,则很好。


1
我会使用 BlockingCollection。听起来像是一个简单的生产者/消费者问题。 - Zer0
@Zer0 这对于同步的生产者/消费者模型很有用,但不适用于异步模型。 - Servy
@Servy 不行 - BlockingCollection 对于异步的情况非常有效。让我写一个例子。 - Zer0
同意这是一个简单的生产者/消费者模型。而且.NET中的并发集合可能会使“启动一个新线程(...)”的情况更加简单。我猜想.NET(或流行的库)现在可能已经添加了一些可配置的线程池概念,这也是我所期望的。 - Josh
2
@Josh:我认为Servy的回答给了你所需的——即限制(异步)并发的方法。没有专用线程,也没有FIFO排序的保证,但听起来它会做你需要的事情。话虽如此,如果你真的想要在线程上运行队列,你可以使用我的AsyncEx库中的AsyncContext/AsyncContextThread - Stephen Cleary
显示剩余2条评论
7个回答

70

要创建一个异步的单一并行度工作队列,您只需创建一个初始化为1的SemaphoreSlim,然后让入队方法在开始所请求的工作之前await该信号量的获取。

public class TaskQueue
{
    private SemaphoreSlim semaphore;
    public TaskQueue()
    {
        semaphore = new SemaphoreSlim(1);
    }

    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

当然,如果想要固定的并行度而不是一个,只需将信号量初始化为其他数字即可。


21
请注意,这段代码不会在单独的专用线程上执行任务(与标题中要求的不同),而是确保任务按照OP实际需要逐一运行。 - Alexei Levenkov
4
@ThibaultD. 不,它没有这样做。 - Servy
1
@Thibault D. 我下面的回答: - Alexander Danilov
1
将运行任务计数器实现为以下方式是否可以:Interlocked.Increment(ref counter); await semaphore.WaitAsync(); [...] Interlocked.Decrement(ref counter); semaphore.Release(); 或者还有更好的方法吗? - Profet
我的计数器跟踪等待(+运行)的任务,而不仅仅是“正在运行”的任务!我在这里提交了你的类的改进版本:http://codereview.stackexchange.com/questions/148459/wpf-async-observabletaskqueue-class 请随意评论... :) - Profet
显示剩余4条评论

23

据我看来,您最好的选择是使用TPL DataflowActionBlock

var actionBlock = new ActionBlock<string>(address =>
{
    if (!IsDuplicate(address))
    {
        LocateAddress(address);
    }
});

actionBlock.Post(context.Request.UserHostAddress);

TPL Dataflow 是一个强大的、线程安全的、async-ready 且高度可配置的基于 actor 的框架(作为 nuget 可用)。

以下是一个更复杂情况下的简单示例。假设您想要:

  • 启用并发(限制为可用核心数)。
  • 限制队列大小(以防止内存耗尽)。
  • 同时使 LocateAddress 和队列插入都是 async 的。
  • 一小时后取消所有操作。

var actionBlock = new ActionBlock<string>(async address =>
{
    if (!IsDuplicate(address))
    {
        await LocateAddressAsync(address);
    }
}, new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 10000,
    MaxDegreeOfParallelism = Environment.ProcessorCount,
    CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});

await actionBlock.SendAsync(context.Request.UserHostAddress);

5
非常好!我一定会查看这个。我从未深入研究过TPL Dataflow,但听起来它比专用线程具有更多的优势。 - Josh

22
实际上,你不需要在一个线程中运行任务,而是需要让它们串行(一个接一个地运行)并且先进先出。TPL没有这样的类,但是这里有我的非常轻量级且非阻塞实现,并带有测试。https://github.com/Gentlee/SerialQueue 同时,在那里也有@Servy的实现,测试显示它比我的慢两倍,并且不能保证先进先出。
示例:
private readonly SerialQueue queue = new SerialQueue();

async Task SomeAsyncMethod()
{
    var result = await queue.Enqueue(DoSomething);
}

这个实施对我来说非常顺利,谢谢你。 - undefined

13

使用 BlockingCollection<Action> 创建生产者/消费者模式,其中只有一个消费者(一次只运行一个任务,就像你想要的那样),但可以有一个或多个生产者。

首先在某个地方定义共享队列:

BlockingCollection<Action> queue = new BlockingCollection<Action>();

在您的消费者ThreadTask中,您从其中取出:

//This will block until there's an item available
Action itemToRun = queue.Take()

然后从任何其他线程的生产者中,只需将内容添加到队列中:

queue.Add(() => LocateAddress(context.Request.UserHostAddress));

2
这需要消费者同步处理任务。必须有一个线程坐在那里,当没有工作时什么也不做,而不是异步地消耗操作,这样当没有工作要做时就没有线程了。 - Servy
2
@Servy 如果没有工作要做,线程将处于等待状态。它不会浪费任何CPU时间,所以我认为这没有任何问题。虽然我仍然不理解你对“同步”或“异步”的评论。这是一种异步设计。 - Zer0
3
它是一个异步的“生产者”,但完全是同步的“消费者”。线程仍然很昂贵,为了让它们无所事事地坐在那里而大量启动它们仍然很昂贵,应该避免这样做。 - Servy
1
@Servy 那个限制很容易解决。将定义更改为 BlockingCollection<Task<T>>... - Zer0
2
@Zer0,这就是它的全部内容* CPU根本不需要考虑并行处理*。 并行处理可以在没有任何线程的情况下发生。 例如,您可以有多个挂起的IO请求,每个请求都在工作,而根本不使用线程。 有很多方法可以“工作”,而不使用CPU。 - Servy
显示剩余20条评论

5

我在这里提供一个不同的解决方案,但老实说我不确定这是否是一个好的解决方案。

我习惯使用 BlockingCollection 来实现生产者/消费者模式,使用一个专用线程来消费这些项目。如果总是有数据进来,且消费者线程不会闲置不动,那么这种方法很好用。

我遇到了一种情况,其中一个应用程序需要在不同的线程上发送电子邮件,但总数量并不是很大。我的最初解决方案是创建一个专用的消费者线程(通过 Task.Run() 创建),但它经常会闲置不动。

旧的解决方案:

private readonly BlockingCollection<EmailData> _Emails =
    new BlockingCollection<EmailData>(new ConcurrentQueue<EmailData>());

// producer can add data here
public void Add(EmailData emailData)
{
    _Emails.Add(emailData);
}

public void Run()
{
    // create a consumer thread
    Task.Run(() => 
    {
        foreach (var emailData in _Emails.GetConsumingEnumerable())
        {
            SendEmail(emailData);
        }
    });
}

// sending email implementation
private void SendEmail(EmailData emailData)
{
    throw new NotImplementedException();
}

正如您所看到的,如果没有足够的电子邮件需要发送(这也是我的情况),消费者线程将大部分时间都坐在那里什么也不做。

我改变了我的实现方式:

// create an empty task
private Task _SendEmailTask = Task.Run(() => {});

// caller will dispatch the email to here
// continuewith will use a thread pool thread (different to
// _SendEmailTask thread) to send this email
private void Add(EmailData emailData)
{
    _SendEmailTask = _SendEmailTask.ContinueWith((t) =>
    {
        SendEmail(emailData);
    });
}

// actual implementation
private void SendEmail(EmailData emailData)
{
    throw new NotImplementedException();
}

它不再是生产者/消费者模式,但它不会有一个线程坐在那里无所事事,相反,每次发送电子邮件时,它将使用线程池线程进行操作。


你的 Add 方法会出现竞态条件吗?例如,有两个线程同时调用这个方法。但无论如何,这是一个非常简洁但有效的解决方案。 - Juniver Hazoic

1
我的库可以做到以下几点:
  1. 在队列列表中随机运行
  2. 多个队列
  3. 优先运行优先级高的任务
  4. 重新排队
  5. 事件通知所有队列已完成
  6. 取消正在运行或等待运行的任务
  7. 将事件分派到UI线程

public interface IQueue
  {
    bool IsPrioritize { get; }
    bool ReQueue { get; }
    /// <summary>
    /// Dont use async
    /// </summary>
    /// <returns></returns>
    Task DoWork();
    bool CheckEquals(IQueue queue);
    void Cancel();
  }

  public delegate void QueueComplete<T>(T queue) where T : IQueue;
  public delegate void RunComplete();
  public class TaskQueue<T> where T : IQueue
  {
    readonly List<T> Queues = new List<T>();
    readonly List<T> Runnings = new List<T>();

    [Browsable(false), DefaultValue((string)null)]
    public Dispatcher Dispatcher { get; set; }
    public event RunComplete OnRunComplete;
    public event QueueComplete<T> OnQueueComplete;
    int _MaxRun = 1;
    public int MaxRun
    {
      get { return _MaxRun; }
      set
      {
        bool flag = value > _MaxRun;
        _MaxRun = value;
        if (flag && Queues.Count != 0) RunNewQueue();
      }
    }
    public int RunningCount
    {
      get { return Runnings.Count; }
    }
    public int QueueCount
    {
      get { return Queues.Count; }
    }

    public bool RunRandom { get; set; } = false;

    //need lock Queues first
    void StartQueue(T queue)
    {
      if (null != queue)
      {
        Queues.Remove(queue);
        lock (Runnings) Runnings.Add(queue);
        queue.DoWork().ContinueWith(ContinueTaskResult, queue);
      }
    }

    void RunNewQueue()
    {
      lock (Queues)//Prioritize
      {
        foreach (var q in Queues.Where(x => x.IsPrioritize)) StartQueue(q);
      }

      if (Runnings.Count >= MaxRun) return;//other
      else if (Queues.Count == 0)
      {
        if (Runnings.Count == 0 && OnRunComplete != null)
        {
          if (Dispatcher != null && !Dispatcher.CheckAccess()) Dispatcher.Invoke(OnRunComplete);
          else OnRunComplete.Invoke();//on completed
        }
        else return;
      }
      else
      {
        lock (Queues)
        {
          T queue;
          if (RunRandom) queue = Queues.OrderBy(x => Guid.NewGuid()).FirstOrDefault();
          else queue = Queues.FirstOrDefault();
          StartQueue(queue);
        }
        if (Queues.Count > 0 && Runnings.Count < MaxRun) RunNewQueue();
      }
    }

    void ContinueTaskResult(Task Result, object queue_obj) => QueueCompleted((T)queue_obj);

    void QueueCompleted(T queue)
    {
      lock (Runnings) Runnings.Remove(queue);
      if (queue.ReQueue) lock (Queues) Queues.Add(queue);
      if (OnQueueComplete != null)
      {
        if (Dispatcher != null && !Dispatcher.CheckAccess()) Dispatcher.Invoke(OnQueueComplete, queue);
        else OnQueueComplete.Invoke(queue);
      }
      RunNewQueue();
    }

    public void Add(T queue)
    {
      if (null == queue) throw new ArgumentNullException(nameof(queue));
      lock (Queues) Queues.Add(queue);
      RunNewQueue();
    }

    public void Cancel(T queue)
    {
      if (null == queue) throw new ArgumentNullException(nameof(queue));
      lock (Queues) Queues.RemoveAll(o => o.CheckEquals(queue));
      lock (Runnings) Runnings.ForEach(o => { if (o.CheckEquals(queue)) o.Cancel(); });
    }

    public void Reset(T queue)
    {
      if (null == queue) throw new ArgumentNullException(nameof(queue));
      Cancel(queue);
      Add(queue);
    }

    public void ShutDown()
    {
      MaxRun = 0;
      lock (Queues) Queues.Clear();
      lock (Runnings) Runnings.ForEach(o => o.Cancel());
    }
  }

-1

我知道这个帖子很旧了,但似乎现有的所有解决方案都非常繁琐。我能找到的最简单的方法是使用Linq Aggregate函数创建一个任务链表。

var arr = new int[] { 1, 2, 3, 4, 5};
var queue = arr.Aggregate(Task.CompletedTask, 
    (prev, item) => prev.ContinueWith(antecedent => PerformWorkHere(item)));

这个想法是将你的数据放入一个 IEnumerable 中(我使用的是 int 数组),然后将该可枚举对象缩减为一系列任务,从一个默认的、已完成的任务开始。


几天后,由于垃圾回收无法触及你庞大的链条,你最终会遇到OOM(内存溢出)问题,因为每个新任务都会堆积在旧任务之上。 - greenoldman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接