确保线程池中任务的执行顺序

52

我一直在阅读线程池模式,但似乎找不到以下问题的通常解决方案。

有时我想按顺序执行任务。例如,我从文件中读取文本块,并且由于某些原因,我需要以这种顺序处理这些块。因此,基本上我想要消除 某些任务的并发性

考虑以下场景,需要按照推入顺序处理带有*的任务。其他任务可以以任何顺序处理。

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

在线程池的上下文中,如果没有这个限制,一个待处理任务的队列就能正常工作,但显然这里不行。

我考虑让一些线程操作特定于线程的队列,而其他线程则操作“全局”队列。然后,为了按顺序执行一些任务,我只需要将它们推到单个线程查看的队列中即可。这听起来有点笨拙。

所以,这个长故事中真正的问题是:你会如何解决? 你会如何确保这些任务有序

编辑

作为一个更普遍的问题,假设上述场景变成

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

我的意思是,同一组内的任务应该按顺序执行,但是不同组之间的任务可以混合。因此,例如可以有3-2-5-4-7

另外需要注意的一点是,我无法提前访问所有组内的任务(也不能在等待所有任务到达后再开始组内任务)。


1
你需要为每个组设置一个独立的队列,再加上一个主队列。如果任务被分组,检查其组状态是否空闲/繁忙(+-已销毁)(需要CAS或锁),并将其排队到组队列中,然后请求处理该队列。当任务完成时,请检查组队列并将其发布到主执行队列中,以便可以轮询任何任务。我有一个Java类(ExecutorService实现)可以做到这一点。 - bestsss
让我提供一个问题列表,这些问题是我在研究中收集的,供未来的访问者参考:1 - 2 - 3 - 4 + 续篇 - OzgurH
续:[5](https://dev59.com/Cm7Xa4cB1Zd3GeqPudcm) - [6](https://dev59.com/k4rda4cB1Zd3GeqPJC0F) - [7](https ://stackoverflow.com/questions/17810202/routing-tasks-inside-threadpoolexecutor)- 主要主题是拥有一种机制,使特定组的任务串行运行或在池中的某个线程上运行。(如果您无法轻松阅读URL,则问题标题现在可在右侧栏的“链接”部分中找到) - OzgurH
18个回答

17

以下类似的代码可以让串行任务和并行任务排队,其中串行任务将一个接一个地执行,而并行任务将以任何顺序并行执行。这样可以使您能够在必要时序列化任务,同时拥有并行任务,但是这是在接收到任务时完成的,即您不需要提前了解整个序列,执行顺序动态维护。

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

更新

为了处理串行和并行任务混合的任务组,GroupedTaskQueue可以管理每个组的TaskQueue。同样地,您不需要预先知道组的信息,所有这些都是在接收到任务时动态管理的。

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}

13

线程池适用于不需要考虑任务相对顺序,只要它们都完成就可以了的情况。特别是,它们可以在并行情况下全部完成。

如果您的任务必须按照特定的顺序完成,则它们不适合并行处理,因此线程池不适合。

如果您想将这些串行任务从主线程中移开,则单个后台线程与任务队列适合执行这些任务。您仍然可以使用线程池来处理那些适合进行并行处理的其余任务。

是的,这意味着您必须根据任务是有序任务还是“可能并行化”的任务决定将任务提交到何处,但这并不是什么大问题。

如果您有必须串行化但可以与其他任务并行运行的任务组,则有多种选择:

  1. 为每个组创建一个单一任务,该任务按顺序执行相关的组任务,并将此任务发布到线程池。
  2. 让组中的每个任务明确等待前一个任务完成才能继续执行,并将它们发布到线程池。这要求您的线程池能够处理等待尚未计划的任务而不会发生死锁的情况。
  3. 为每个组分配专门的线程,并将组任务发布到相应的消息队列。

问题在于我事先不知道客户端将推送多少组任务。我也不知道这些组的大小和“普通”任务的数量。因此,我不能为每个组启动新线程。 - nc3b
仅仅因为你不知道有多少组需要处理,并不意味着你不能为每个组开启一个线程 --- 当你有新的组时,你可以随时启动一个。如果你的线程池能够处理,我会选择我的第二个选项,因为它让线程池来处理线程数量。 - Anthony Williams
@AnthonyWilliams - 另一个例子可能是这样的,有多个传入事件,您需要独立检索数据并生成消息,但仍然需要按事件到达的顺序发布消息。如何解决这个问题? - userx
@100pipers 这就是future的作用。你将任务传递给线程池,并将Future<Result>交给另一个线程按顺序进行评估。 - Ralph Tandetzky

9
基本上,有许多任务处于待办状态。其中一些任务只能在一个或多个其他待处理任务执行完毕后才能执行。
待处理任务可以建模为依赖图:
- “任务1->任务2”表示“只有在任务1完成后才能执行任务2”。箭头指向执行顺序的方向。 - 任务的入度(指向它的任务数)确定任务是否准备好执行。如果入度为0,则可以执行它。 - 有时候,一个任务必须等待多个任务完成,此时入度大于1。 - 如果一个任务不再需要等待其他任务完成(其入度为零),它可以提交到具有工作线程的线程池,或者提交到等待被工作线程接收的任务队列中。你知道提交的任务不会引起死锁,因为该任务不会等待任何东西。作为优化,可以使用优先级队列,例如,依赖图上有更多任务依赖的任务将首先执行。这也不会引发死锁,因为线程池中的所有任务都可以执行。但是,这可能会导致饥饿现象。 - 如果一个任务完成了执行,它可以从依赖图中移除,可能会降低其他任务的入度,这些任务反过来可以提交到工作线程池中。 因此,至少有一个线程用于添加/删除待处理任务,并且有一个工作线程的线程池。
当将任务添加到依赖图中时,必须检查:
- 任务在依赖图中的连接方式:它必须等待哪些任务完成,哪些任务必须等待它完成?从新任务绘制连接。 - 连接绘制完成后:新连接是否导致依赖图中出现任何循环?如果是,则存在死锁情况。
性能: - 如果并行执行实际上很少可能,这种模式比顺序执行要慢,因为你需要额外的管理来几乎按顺序执行所有操作。 - 如果在实践中可以同时执行许多任务,则此模式很快。
假设: - 如你所读到的,必须设计任务,使它们不会干扰其他任务。此外,必须有一种方法确定任务的优先级。任务优先级应包括每个任务处理的数据。两个任务不能同时更改同一对象;其中一个任务应该优先于另一个任务,或者对对象执行的操作必须是线程安全的。

7
要使用线程池来实现你想要的功能,可能需要创建一种调度程序。
类似这样的东西:
任务队列 -> 调度程序 -> 队列 -> 线程池
调度程序在其自己的线程中运行,跟踪作业之间的依赖关系。当一个作业准备好时,调度程序只需将其推入线程池的队列中。
线程池可能需要向调度程序发送信号,以指示作业已完成,因此调度程序可以将依赖于该作业的作业放入队列中。
在您的情况下,依赖关系可能存储在链表中。
假设您具有以下依赖关系: 3 -> 4 -> 6 -> 8
作业3正在线程池上运行,您仍然不知道作业8的存在。
作业3结束。您从链表中删除3,将作业4放入线程池的队列中。
作业8到达。您将其放在链表的末尾。
必须完全同步的结构仅是调度程序之前和之后的队列。

5
如果我理解问题正确,jdk执行器没有这个功能,但很容易自己实现。您需要:
- 一组工作线程,每个线程都有一个专用队列。 - 对这些队列的一些抽象来提供工作(例如ExecutorService)。 - 一些确定性算法为每个工作选择特定的队列。 - 然后将每个工作项提供给正确的队列,从而按正确的顺序处理。
与jdk执行器的区别在于它们具有1个带n个线程的队列,但您需要n个队列和m个线程(其中n可能等于m,也可能不等于m)。
* 在阅读每个任务都有一个键之后进行编辑 *
更详细地说:
- 编写一些代码,将关键字转换为给定范围内的索引(int)(0-n,其中n是您想要的线程数),这可以简单地是key.hashCode()%n ,也可以是某些已知关键值到线程的静态映射或任何您想要的内容。 - 在启动时: - 创建n个队列,将它们放入索引结构中(数组、列表等)。 - 启动n个线程,每个线程只需从队列中进行阻塞式接收。 - 当它接收到一些工作时,它知道如何执行特定于该任务/事件的工作(如果您有异构事件,则显然可以将某些任务映射到操作)。 - 将其存储在一些接受工作项的外观后面。 - 当任务到达时,将其交给外观: - 外观基于关键字找到任务的正确队列,将其提供给该队列。
向此方案添加自动重新启动工作线程很容易,您只需要让工作线程向某个管理器注册以声明“我拥有此队列”,然后对此进行一些管理,并检测线程中的错误(这意味着它取消注册该队列的所有权,将队列返回到空闲队列池,这是启动新线程的触发器)。

我喜欢你的想法。然而,情况会变得棘手,因为客户端可能会同时发送“简单任务”和“带有键的任务”。 - nc3b
什么是“简单任务”?如何确定简单任务应该在哪个线程上运行?它们是否需要与键控任务一起串行处理?例如,您可以通过AtomicLong.getAndIncrement() % poolSize 轻松地将它们轮流分配到线程上。 - Matt
简单任务是没有关键性的任务; 它们不依赖于任何人(也没有其他任务依赖于它们)。我知道如何安排这些任务;将它们与有关键的任务混合在一起才是问题。 - nc3b
只需为简单任务制定任意键,无论它是否毫无意义,只要任务能够到达您想要的线程即可。 - Matt
那就看你的了;一个天真的轮询(例如,全局计数器 mod 池大小)是最简单的,稍微复杂一些的是考虑到 key'ed 任务存在的轮询(例如,将作业分配给当前没有忙于带有关键字任务的线程),另一个选项是保持特定线程热来帮助更快地处理作业。它非常灵活,实际上没有必要以任何方式明确地将任务链接在一起(让队列来做)。 - Matt

4
我认为线程池可以在这种情况下有效地使用。思路是为每组相关任务使用单独的“strand”对象。您可以使用或不使用“strand”对象将任务添加到队列中。您可以在相关任务中使用相同的“strand”对象。您的调度程序会检查下一个任务是否有“strand”,以及该“strand”是否已锁定。如果没有,锁定此“strand”并运行此任务。如果“strand”已经被锁定,则将此任务保留在队列中,直到下一个调度事件。任务完成后解锁其“strand”。
结果需要一个单一的队列,不需要任何额外的线程,也不需要复杂的组等。 “strand”对象可以非常简单,只需两个方法“lock”和“unlock”。
我经常遇到相同的设计问题,例如用于处理多个同时会话的异步网络服务器。当会话内部任务依赖时(这将会话内部任务映射到组内的依赖任务),会话是独立的(这将它们映射到独立任务和相关任务组)。使用上述方法,我完全避免了会话内部的显式同步。每个会话都有自己的“strand”对象。
而且更重要的是,我使用现有的(很棒的)实现这个想法的 Boost Asio库(C ++)。我只是使用了他们的术语strand。实现很优雅:在调度异步任务之前,我将它们包装到相应的strand对象中。

3

选项1 - 复杂的方式

由于您有连续的作业,您可以将这些作业收集到一个链中,并在它们完成后让作业自己重新提交到线程池。假设我们有一个作业列表:

 [Task1, ..., Task6]

就像你的例子一样。我们有一个顺序依赖关系,使得[Task3,Task4,Task6]是一个依赖链。现在我们创建一个作业(Erlang 伪代码):

 Task4Job = fun() ->
               Task4(), % Exec the Task4 job
               push_job(Task6Job)
            end.
 Task3Job = fun() ->
               Task3(), % Execute the Task3 Job
               push_job(Task4Job)
            end.
 push_job(Task3Job).

也就是说,我们通过将Task3作业包装成一个工作作业,并将下一个作业推送到线程池中作为后续执行。这里有强烈的类似之处,就像在诸如Node.js或Python的Twisted框架中看到的一般的延续传递风格

一般来说,您可以创建一个系统,其中您可以定义作业链,这些作业链可以defer进一步的工作并重新提交进一步的工作。

选项2-简单的方法

为什么我们要分开这些工作?我的意思是,由于它们是顺序相关的,将它们全部在同一个线程上执行不会比将该链扩展到多个线程上更快或更慢。假设“足够”的工作负载,任何线程都将始终有工作要做,因此只需将作业捆绑在一起可能是最简单的:

  Task = fun() ->
            Task3(),
            Task4(), 
            Task6()  % Just build a new job, executing them in the order desired
         end,
  push_job(Task).

如果您的语言支持将函数作为一等公民并且可以任意构建它们,例如函数式编程语言、Python、Ruby-blocks 等,那么像这样的事情就相当容易实现。

虽然我并不喜欢像“选项1”中那样构建队列或连续堆栈,但我肯定会选择第二个选项。在 Erlang 中,我们甚至有一个名为 jobs 的程序,由 Erlang Solutions 编写并作为开源发布。 jobs 用于执行和调节类似这样的任务执行。如果我要解决这个问题,我可能会将选项2与 jobs 结合使用。


我需要拆分任务,因为我没有提前得到它们,也不能等待它们全部到达。 - nc3b
啊,你可能想把这个加到问题里。只有在你逐步学习新任务时,它才非常重要。 - I GIVE CRAP ANSWERS

3
建议不要使用线程池,这就像硬编码任务依赖和执行顺序的知识。相反,我会创建一个 CompositeTask 来管理两个任务之间的起始/结束依赖关系。通过在任务接口后面封装依赖关系,可以统一处理所有任务并将其添加到线程池中。这样隐藏了执行细节,使得任务依赖关系可以改变而不会影响是否使用线程池。
问题没有指定语言 - 我将使用 Java,希望大多数人都能读懂。
class CompositeTask implements Task
{
    Task firstTask;
    Task secondTask;

    public void run() {
         firstTask.run();
         secondTask.run();
    }
}

这会按顺序并在同一个线程上执行任务。你可以将许多 CompositeTask 链接在一起,创建所需数量的顺序任务序列。

这里的缺点是,在所有任务按顺序执行期间,会占用该线程。您可能有其他任务希望在第一个和第二个任务之间执行。因此,不要直接执行第二个任务,而是让复合任务安排第二个任务的执行:

class CompositeTask implements Runnable
{
    Task firstTask;
    Task secondTask;
    ExecutorService executor;

    public void run() {
         firstTask.run();
         executor.submit(secondTask);
    }
}

这样可以确保第二个任务在第一个任务完成后才运行,并且还允许线程池执行其他(可能更紧急的)任务。请注意,第一个和第二个任务可能在不同的线程上执行,因此尽管它们不会并发执行,但由任务使用的任何共享数据必须对其他线程可见(例如通过将变量设置为volatile)。这是一种简单而强大、灵活的方法,使任务本身能够定义执行约束,而不是使用不同的线程池来实现。

3

使用两个 Active Objects。简而言之,活动对象模式由优先队列和一个或多个工作线程组成,这些工作线程可以从队列中获取任务并处理它们。

因此,可以使用一个带有一个工作线程的活动对象:所有放置到队列中的任务将按顺序进行处理。使用第二个带有多个工作线程的活动对象。在这种情况下,工作线程将以任何顺序从队列中获取和处理任务。

祝好运。


2
这是可以实现的,根据我理解您的情况。基本上,您需要做一些聪明的事情来协调主线程中的任务。Java API 中您需要使用的是 ExecutorCompletionServiceCallable
首先,实现您的可调用任务:
public interface MyAsyncTask extends Callable<MyAsyncTask> {
  // tells if I am a normal or dependent task
  private boolean isDependent;

  public MyAsyncTask call() {
    // do your job here.
    return this;
  }
}

然后在您的主线程中,使用CompletionService协调依赖任务的执行(即等待机制):

ExecutorCompletionService<MyAsyncTask> completionExecutor = new 
  ExecutorCompletionService<MyAsyncTask>(Executors.newFixedThreadPool(5));
Future<MyAsyncTask> dependentFutureTask = null;
for (MyAsyncTask task : tasks) {
  if (task.isNormal()) {
    // if it is a normal task, submit it immediately.
    completionExecutor.submit(task);
  } else {
    if (dependentFutureTask == null) {
      // submit the first dependent task, get a reference 
      // of this dependent task for later use.
      dependentFutureTask = completionExecutor.submit(task);
    } else {
      // wait for last one completed, before submit a new one.
      dependentFutureTask.get();
      dependentFutureTask = completionExecutor.submit(task);
    }
  }
}

通过这样做,您可以使用单个执行器(线程池大小为5)同时执行普通任务和依赖任务,普通任务在提交后立即执行,依赖任务一个接一个地执行(在主线程中通过调用Future的get()方法等待,在提交新的依赖任务之前),所以在任何时候,您总是有一些普通任务和一个单独的依赖任务(如果存在)在单个线程池中运行。
这只是一个开头,通过使用ExecutorCompletionService、FutureTask和Semaphore,您可以实现更复杂的线程协调场景。

+1 如果提到了ExecutorCompletionService。我会为每种任务类型添加队列,而不是使用isDependent布尔值,你的Callable可以引用任务的队列。CompletionService可以从该任务的队列中获取,并且如果有任何等待的任务,则将其提交到线程池中。 - Stephen Denne

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接