Java ForkJoinPool - 任务在队列中的顺序

11

我希望了解Java fork-join池中任务处理的顺序。

到目前为止,在文档中我找到的唯一相关信息是一个名为"asyncMode"的参数,如果此池使用先进先出调度模式来处理未联接的分叉任务,则该参数为"true"。

我的理解是每个worker都有自己的任务队列;worker从它们自己队列的前面取出任务,或者从其他worker的队列的后面窃取任务,如果它们自己的队列为空;如果asyncMode为true (resp. false),则worker将新分叉的任务添加到它们自己队列的后面(resp.前面)。

如果我的理解有误,请纠正!

现在,这引发了几个问题:

1) 联接的分叉任务的排序方式是什么?

我猜想,当一个任务被分叉时,它将如上述我的解释添加到worker的队列中。现在,假设任务已经联接...

  • 如果在调用join时,任务尚未开始,则调用join的worker将立即从队列中取出该任务并开始工作。

  • 如果在调用 join 时任务已经被其他工作线程窃取了,那么调用 join 的工作线程将在此期间处理其他任务(按照上述解释中获取任务的顺序),直到窃取该任务的工作线程完成它。

这个推测是基于编写带有打印语句的简单测试代码,并观察更改 join 调用顺序对任务处理顺序的影响。请问是否有人可以告诉我我的推测是否正确?

2)外部提交的任务的顺序是什么?

根据这个问题的答案,fork-join池不使用外部队列。(我正在使用Java 8。)

那么我是否可以理解为,当一个任务被外部提交时,该任务会被添加到随机选定的工作线程队列中?

如果是这样,那么外部提交的任务是添加到队列的后面还是前面?

最后,这是否取决于是通过调用pool.execute(task)还是通过调用pool.invoke(task)来提交任务?以及这是否取决于调用pool.execute(task)或pool.invoke(task)的线程是外部线程还是此fork-join池内的线程?


1
为什么不去看一下,源代码是开放的。链接 - rustyx
对于你问题的第二部分,你可能可以在“实现概述”中找到一些见解。 - vanOekel
1
你为什么需要知道这个?顺序没有被指定,因此可以随时更改。即使您移动到具有不同处理器数量的机器上,它也可能会更改。如果您需要特定的顺序,请通过自己的方式明确提供它。 - Alexei Kaigorodov
1个回答

3
  1. 你的猜测是正确的,你完全正确。正如你可以在“实现概述”中所读到的。
 * Joining Tasks
 * =============
 *
 * Any of several actions may be taken when one worker is waiting
 * to join a task stolen (or always held) by another.  Because we
 * are multiplexing many tasks on to a pool of workers, we can't
 * just let them block (as in Thread.join).  We also cannot just
 * reassign the joiner's run-time stack with another and replace
 * it later, which would be a form of "continuation", that even if
 * possible is not necessarily a good idea since we may need both
 * an unblocked task and its continuation to progress.  Instead we
 * combine two tactics:
 *
 *   Helping: Arranging for the joiner to execute some task that it
 *      would be running if the steal had not occurred.
 *
 *   Compensating: Unless there are already enough live threads,
 *      method tryCompensate() may create or re-activate a spare
 *      thread to compensate for blocked joiners until they unblock.

2. ForkJoinPool.invoke和ForkJoinPool.join在任务提交的方式上完全相同。您可以在代码中看到

    public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task.join();
    }
    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
    }

在externalPush中,可以看到任务使用ThreadLocalRandom添加到随机选择的工作队列中。此外,它使用push方法进入队列头部。
    final void externalPush(ForkJoinTask<?> task) {
        WorkQueue[] ws; WorkQueue q; int m;
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null &&
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                    int j = ((am & s) << ASHIFT) + ABASE;
                U.putOrderedObject(a, j, task);
                U.putOrderedInt(q, QTOP, s + 1);
                U.putIntVolatile(q, QLOCK, 0);
                if (n <= 1)
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        externalSubmit(task);
    }

我不确定您的意思是什么:
并且这是否取决于调用pool.execute(task)或pool.invoke(task)的线程是外部线程还是该 fork-join 池中的线程?

谢谢您的回复!我很感激您确认我的猜测(1)是正确的。我发现“实现概述”文档非常难以理解。 - Kenny Wong
此外,值得知道的是pool.invoke(task)和pool.execute(task)的工作方式相同,并将新任务发送到随机选择队列的前面。对我来说,这种压缩的源代码是难以理解的,所以我很高兴你替我查看了它!那么,对于我的最后一个问题(调用pool.execute(task)或pool.invoke(task)的线程类型是否重要),答案必须是“否”,因为pool.execute(task)或pool.invoke(task)总是将新任务放在随机选择队列的前面。 - Kenny Wong
当然!祝你好运。 - Gal S

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接