Java 8并行流中的自定义线程池

483

在Java 8的并行流中,是否有可能指定自定义的线程池?我找不到相关信息。

假设我有一个服务器应用程序,并且我想使用并行流。但是应用程序很大而且多线程,因此我想将其分隔开来。我不希望一个模块中的缓慢运行任务阻塞另一个模块的任务。

如果我不能为不同的模块使用不同的线程池,那么在大多数真实世界的情况下,我就不能安全地使用并行流。

请尝试以下示例。在单独的线程中执行一些CPU密集型任务。这些任务利用并行流。第一个任务出现错误,所以每个步骤都需要1秒钟(通过线程休眠模拟)。问题是其他线程被卡住并等待错误任务完成。这是一个人为的示例,但是可以想象一个servlet应用程序,其中有人向共享的fork join池提交了一个长时间运行的任务。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

3
什么是自定义线程池?虽然有一个通用的ForkJoinPool,但你可以创建自己的ForkJoinPool,并向其提交任务请求。 - edharned
9
提示:Java大师Heinz Kabutz检查了同样的问题,但影响甚至更糟:常规Fork-Join池中的线程死锁。请参阅http://www.javaspecialists.eu/archive/Issue223.html。 - Peti
被接受的答案是错误的。至少在Java 8u352和Java 17中,流使用专用和公共fork-join-pool。您可以使用CompletableFuture来实现所需的行为,就像我在我的答案中(远远)概述的那样。您应该考虑删除绿色勾号。 - Dirk Hillbrecht
16个回答

507

实际上有一种技巧可以在特定的fork-join池中执行并行操作。如果将其作为任务在fork-join池中执行,它将留在那里而不使用公共池。

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

这个技巧基于ForkJoinTask.fork方法,它指定:"如果适用,则在当前任务所在的池中异步执行此任务,否则使用ForkJoinPool.commonPool()执行任务。如果不在inForkJoinPool()中,则使用ForkJoinPool.commonPool()"


23
这里详细描述了解决方案:http://blog.krecan.net/2014/03/18/how-to-specify-thread-pool-for-java-8-parallel-streams/。 - Lukas
4
文档中是否指定流使用ForkJoinPool?还是这只是实现细节?提供文档链接会更好。 - Nicolai Parlog
7
谢谢你的代码片段,我会将ForkJoinPool实例在不再需要时进行shutdown()以避免线程泄漏。 (示例) - jck
6
请注意,Java 8中存在一个bug,即使任务在自定义的线程池实例上运行,它们仍然与共享线程池耦合:计算的大小仍与公共线程池成比例,而不是自定义线程池。该问题已在Java 10中修复:JDK-8190974 - Terran
4
这个问题在Java 8中也已经得到修复。https://bugs.openjdk.java.net/browse/JDK-8224620 - Cutberto Ocampo
显示剩余12条评论

247

并行流使用默认的ForkJoinPool.commonPool,该池子默认情况下的线程数比您的处理器少一个, 可以通过Runtime.getRuntime().availableProcessors()返回(这意味着并行流为调用线程留出一个处理器)。

对于需要单独或自定义池的应用程序,可以构造一个具有给定目标并行性级别的ForkJoinPool;默认情况下,等于可用处理器的数量。

这也意味着如果您有嵌套并行流或同时启动多个并行流,它们将全部共享同一个线程池。优点:您永远不会使用超过默认值(可用处理器数量)。缺点:您可能无法将“所有处理器”分配给每个启动的并行流(如果恰好有多个)。 (显然,您可以使用ManagedBlocker来规避这一问题。)

要更改并行流的执行方式,您可以选择:

  • 将并行流执行提交到您自己的ForkJoinPool中:yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); 或者
  • 您可以使用系统属性更改常规池的大小:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"),以获得目标并行性为20个线程。

以下是我机器上运行的一个例子,该机器有8个处理器。如果我运行以下程序:

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

输出结果为:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

可以看到,平行流每次处理8个项,即使用8个线程。然而,如果取消注释的代码行,输出结果为:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

这一次,平行流使用了20个线程,并且流中的所有20个元素都被并发地处理了。

46
commonPool 实际上比 availableProcessors 少一个线程,因为调用线程本身也算一个线程,所以总并行度等于 availableProcessors - Marko Topolnik
3
提交返回 ForkJoinTask。要模拟 parallel(),需要使用 get()stream.parallel().forEach(doSomething)).get(); - Grigory Kislin
8
我并不相信 ForkJoinPool.submit(() -> stream.forEach(...)) 可以使用给定的 ForkJoinPool 来运行我的 Stream 操作。我预期整个 Stream 操作作为一个完整的操作在 ForkJoinPool 中执行,但是在内部仍然使用默认的/公共的 ForkJoinPool。你在哪里看到了 ForkJoinPool.submit() 可以做你所说的事情? - Frederic Leitenberger
2
我现在看到了https://dev59.com/r2Ei5IYBdhLWcg3wjs5j#34930831,很好地展示了它实际上按照公告的方式工作。然而,我仍然不明白它是如何工作的。但是我对“它能工作”感到满意。谢谢! - Frederic Leitenberger
2
我建议撤销Tod Casasent的编辑,因为JDK-8190974中没有任何迹象表明System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", …)不再起作用,而且在JDK 18中,它仍然按预期工作。 - Holger
显示剩余5条评论

53

除了在自己的forkJoinPool内触发并行计算的技巧之外,您还可以像以下示例一样将该池传递给CompletableFuture.supplyAsync方法:

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), 
    forkJoinPool
);

24

原先的解决方案(设置ForkJoinPool常见并行性属性)不再起作用。查看原回答中的链接,一个破坏此方案的更新已被后移至Java 8。正如链接线程中提到的那样,这个方案并不能保证永远有效。基于此,该解决方案是forkjoinpool.submit与.get解决方案,即接受答案所讨论的方案。我认为后移修复了此方案的不可靠性。

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();

1
在调试模式下,当我执行ForkJoinPool.commonPool().getParallelism()时,我没有看到并行性的变化。 - d-coder
1
谢谢。我进行了一些测试/研究并更新了答案。看起来是由于更新导致的,因为在旧版本中它可以工作。 - Tod Casasent
为什么即使在循环中有所有的catch异常,我仍然会收到这个错误信息:“未报告的异常InterruptedException; 必须被捕获或声明抛出”? - Rocky Li
Rocky,我没有看到任何错误。知道Java版本和确切的代码行将有所帮助。“InterruptedException”表明在你的版本中sleep周围的try/catch未正确关闭。 - Tod Casasent
1
当我执行 System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10"); System.out.println(ForkJoinPool.commonPool().getParallelism()); 时,无论是从 JDK 8 到 JDK 18 的所有版本,它都会始终打印出 10。我不知道为什么你声称这个常见并行性属性不起作用;你在另一个答案中添加的链接甚至没有提到这个属性,而且它的补丁根本没有涉及到这个功能。 - Holger

18

我们可以使用以下属性更改默认并行度:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

可以设置为使用更多的并行性。


虽然它是一个全局设置,但它可以增加parallelStream的并行性。 - meadlai
同上文所述,这在我的openjdk“11.0.6”上无法工作。 - abbas
1
@abbas 在我尝试过的所有版本中都能正常工作,从 Java 8 到 Java 18。 - Holger

10

要测量实际使用的线程数,您可以检查Thread.activeCount()

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

在一台4核CPU上,此操作可产生以下输出:

5 // common pool
23 // custom pool

没有.parallel(),它会得到:

3 // common pool
4 // custom pool

10
Thread.activeCount() 无法告诉你哪些线程正在处理你的流,可以将其转换为Thread.currentThread().getName(),然后再加上distinct()。这时你会发现并不是池中的每个线程都被使用了...如果在处理中添加延迟,就能够利用池中的所有线程。 - keyoxy

9

到目前为止,我使用了这个问题答案中描述的解决方案。现在,我想出了一个名为Parallel Stream Support的小型库来解决这个问题:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

但正如 @PabloMatiasGomez 在评论中指出的那样,使用并行流的拆分机制存在缺点,它严重依赖于公共池的大小。请参见Parallel stream from a HashSet doesn't run in parallel

我仅使用此解决方案来为不同类型的工作创建单独的线程池,但即使我不使用公共池,我也无法将其大小设置为1。


6

5

如果您不想依赖实现技巧,总有一种方法可以通过实现自定义收集器来结合mapcollect语义来实现相同的功能...这样您就不会受到ForkJoinPool的限制:

list.stream()
  .collect(parallel(i -> process(i), executor, 4))
  .join()

幸运的是,这已经在此处完成并可在Maven Central上获得:http://github.com/pivovarit/parallel-collectors 免责声明:我写了这个并承担责任。

1

访问abacus-common获取相关IT技术内容。可通过指定线程数量来使用并行流。以下是示例代码:

LongStream.range(4, 1_000_000).parallel(threadNum)...

声明:我是abacus-common的开发者。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接