我应该使用Java中的哪个线程池?

8
有大量的任务需要完成,每个任务属于一个单独的组。要求是每个任务组应该像在单个线程中执行一样按顺序执行,并且在多核(或多 CPU)环境中应该最大化吞吐量。注意:任务组的数量也与任务数成比例。
天真的解决方案是使用 ThreadPoolExecutor 和同步(或锁定)。然而,线程会相互阻塞,吞吐量不会最大化。
有更好的想法吗?或者是否存在第三方库满足要求?

2
然而,线程会相互阻塞,从而降低吞吐量。你的意思是说个别任务正在访问共享的数据结构或资源,这是争用的原因吗? - Adamski
你是否提前知道一个组的所有任务?在选择解决方案(队列 vs 无队列)时,这一点非常重要。 - Eyal Schneider
5个回答

3

一个简单的方法是将所有分组任务“连接”成一个超级任务,从而使子任务串行运行。但这可能会导致其他组的延迟,除非某个组完全完成并在线程池中腾出一些空间,否则它们不会开始。

作为替代方案,请考虑链接组的任务。以下代码说明了这一点:

public class MultiSerialExecutor {
    private final ExecutorService executor;

    public MultiSerialExecutor(int maxNumThreads) {
        executor = Executors.newFixedThreadPool(maxNumThreads);
    }

    public void addTaskSequence(List<Runnable> tasks) {
        executor.execute(new TaskChain(tasks));
    }

    private void shutdown() {
        executor.shutdown();
    }

    private class TaskChain implements Runnable {
        private List<Runnable> seq;
        private int ind;

        public TaskChain(List<Runnable> seq) {
            this.seq = seq;
        }

        @Override
        public void run() {
            seq.get(ind++).run(); //NOTE: No special error handling
            if (ind < seq.size())
                executor.execute(this);
        }       
    }

优点是不需要使用额外的资源(线程/队列),任务的粒度比朴素方法更好。缺点是必须提前知道所有组的任务。
--编辑--
为了使这个解决方案更加通用完整,您可能需要决定错误处理(即是否在出现错误时继续运行链)。另外,实现ExecutorService并将所有调用委托给底层执行者是一个好主意。

也许我们还应该添加一个地图,以便找到特定任务的TaskChain,并将其添加到其TaskChain中。 - James
@James:你说得对。通过一些简单的同步,这些链条将能够即时接收新任务(实际上它们将作为调用者的队列)。我想我会让解决方案更加通用和有用,并在我的博客中写一篇文章 :) - Eyal Schneider

2

我建议使用任务队列:

  • 对于每组任务,您需要创建一个队列,并将该组中的所有任务插入到该队列中。
  • 现在,所有您的队列都可以并行执行,而队列内的任务则按顺序依次执行。

快速搜索结果表明Java API本身没有任务/线程队列。但是,如果您知道一些好的教程/实现,则可以自由列出它们。


谢谢 Dave。 如果有大量的组,则线程数将达到限制。 - James
@James 不一定。仅仅因为您有n个组并不意味着您需要创建n个线程来执行它们。只需创建适当数量的线程,它们将以轮询或串行方式处理队列。 - Dave O.

1

我基本上同意Dave的答案,但是如果你需要将CPU时间分配给所有“组”,即所有任务组应该并行进行,你可能会发现这种构造很有用(使用删除作为“锁定”)。虽然我想象它倾向于使用更多的内存,但在我的情况下,这个方法运行得很好:

class TaskAllocator {
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
         = childQueuePerTaskGroup();

    public Queue<Runnable> lockTaskGroup(){
        return entireWork.poll();
    }

    public void release(Queue<Runnable> taskGroup){
        entireWork.offer(taskGroup);
    }
 }

 class DoWork implmements Runnable {
     private final TaskAllocator allocator;

     public DoWork(TaskAllocator allocator){
         this.allocator = allocator;
     }

     pubic void run(){
        for(;;){
            Queue<Runnable> taskGroup = allocator.lockTaskGroup();
            if(task==null){
                //No more work
                return;
            }
            Runnable work = taskGroup.poll();
            if(work == null){
                //This group is done
                continue;
            }

            //Do work, but never forget to release the group to 
            // the allocator.
            try {
                work.run();
            } finally {
                allocator.release(taskGroup);
            }
        }//for
     }
 }

然后,您可以使用最佳线程数来运行DoWork任务。这有点像循环负载平衡..

您甚至可以做得更复杂,通过在TaskAllocator中使用此方法而不是简单的队列(具有更多任务剩余的任务组往往会被执行)

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator());

其中SophisticatedComparator

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
        int diff = o2.size() - o1.size();
        if(diff==0){
             //This is crucial. You must assign unique ids to your 
             //Subqueue and break the equality if they happen to have same size.
             //Otherwise your queues will disappear...
             return o1.id - o2.id;
        }
        return diff;
    }
 }

1
+1 任务队列允许您使用适合您需求的任何调度算法。 - Dave O.
看起来你正在重新实现线程池。为什么不像我的解决方案一样使用标准的ThreadPoolExecutor加上一些额外的功能呢?我的解决方案不需要队列和同步。 - Eyal Schneider
@Eyal:如果可以按顺序使用任务组,我同意你的观点。不过,如果必须并行使用,这是必要的。 - Enno Shioji
在我的解决方案中,组是并行执行的,每个组都是串行执行的,就像你的解决方案一样。我们解决方案之间的最大区别(如果我理解正确的话)是你的解决方案允许在现有组中随时添加新任务,而我的解决方案更简单,因为它假定每当一个组开始执行时,所有的任务都已预先知道。 - Eyal Schneider
哦,好的,这就是你重新提交的原因吗?聪明 :) 我想TPE只允许使用BlockingQueue可能会有一定的限制,但现在我理解了你的观点。 - Enno Shioji

0

Actor也是解决这种特定类型问题的另一种方案。 Scala有actors,Java也提供了AKKA。


-2

我曾经遇到过类似的问题,我使用了一个与Executor一起工作的ExecutorCompletionService来完成任务集合。以下是自Java7以来java.util.concurrent API的摘录:

假设您有一组解决某个问题的求解器,每个求解器返回某种类型Result的值,并且希望并发运行它们,在每个返回非空值的求解器中处理其结果,使用某些方法use(Result r)。您可以编写如下代码:

void solve(Executor e, Collection<Callable<Result>> solvers)
        throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null)
            use(r);
    }
}

所以,在您的情况下,每个任务将是一个单独的Callable<Result>,并且任务将分组在一个Collection<Callable<Result>>中。

Reference: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接