在Stream.parallel()中,是否可以设置线程的优先级?

12

如果我想在后台任务中并行运行一个流,是否可以以较低的优先级运行它? 如果可以,如何做到?

3个回答

10

是的,这是可能的。

步骤如下:

  1. 创建一个ForkJoinWorkerThreadFactory,使用适当的优先级创建线程。

  2. 使用上述线程工厂创建一个ForkJoinPool

  3. 实例化并行流。

  4. 通过将其提交到ForkJoinPool来运行流。

类似于这样:

public class MyThread extends ForkJoinWorkerThread {
    public MyThread(ForkJoinPool pool, int priority) {
        super(pool);
        setPriority(priority);
    }
}

final int poolSize = ...
final int priority = ...

List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
  .collect(Collectors.toList());

ForkJoinWorkerThreadFactory factory = new ForkJoinWorkerThreadFactory() {
    public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
         return new MyThread(pool, priority);
    }
};
/*
ForkJoinWorkerThreadFactory factory = pool -> new MyThread(
  pool,
  priority
);
*/

ForkJoinPool customThreadPool = new ForkJoinPool(
    poolSize, factory, null, false);
long actualTotal = customThreadPool.submit(
    () -> aList.parallelStream().reduce(0L, Long::sum)).get();

(示例代码改编自http://www.baeldung.com/java-8-parallel-streams-custom-threadpool

我可以翻译这段文本。

1
这应该可以:ForkJoinWorkerThreadFactory factory = pool -> new ForkJoinWorkerThread(pool) {{ setPriority(priority); }}; - shmosel
1
用了稍微不同的方法...但是它可以编译和运行。唯一需要注意的是submit会抛出InterruptedException,需要进行处理。 - Stephen C
3
第二个需要注意的问题是(除非你使用的是Java 10),创建的Fork/Join任务数量将与公共池并行度成比例增长。 - Stefan Zobel
1
ForkJoinPool.ForkJoinWorkerThreadFactory是一个(结构上的)函数式接口,因此工厂可以定义为 pool -> new MyThread(pool, priority) - srborlongan
2
应该注意这种行为是一个未记录的实现细节,就像整个Fork/Join池的使用一样。不能保证这在其他实现中也能正常工作。 - Holger
显示剩余2条评论

1

我认为更好的方法是像这里描述的那样:

public class CustomForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {

    private final int threadPriority;

    public CustomForkJoinWorkerThreadFactory(int threadPriority) {
        this.threadPriority = threadPriority;
    }

    @Override           
    public ForkJoinWorkerThread newThread(ForkJoinPool pool)
    {
        final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
        worker.setPriority(threadPriority);
        return worker;
    }
}

它允许您仍然使用“默认”ForkJoinWorkerThread,但您可以设置优先级/名称等。使用方法如下:
new ForkJoinPool(poolSize, new CustomForkJoinWorkerThreadFactory(Thread.MIN_PRIORITY), null, false);

0

前一个答案的内联版本

new ForkJoinPool(parallelism, (pool) -> {
    ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
    thread.setPriority(NORM_PRIORITY + 1);
    return thread;
}, null, false);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接