天真的解决方案是使用 ThreadPoolExecutor 和同步(或锁定)。然而,线程会相互阻塞,吞吐量不会最大化。
有更好的想法吗?或者是否存在第三方库满足要求?
一个简单的方法是将所有分组任务“连接”成一个超级任务,从而使子任务串行运行。但这可能会导致其他组的延迟,除非某个组完全完成并在线程池中腾出一些空间,否则它们不会开始。
作为替代方案,请考虑链接组的任务。以下代码说明了这一点:
public class MultiSerialExecutor {
private final ExecutorService executor;
public MultiSerialExecutor(int maxNumThreads) {
executor = Executors.newFixedThreadPool(maxNumThreads);
}
public void addTaskSequence(List<Runnable> tasks) {
executor.execute(new TaskChain(tasks));
}
private void shutdown() {
executor.shutdown();
}
private class TaskChain implements Runnable {
private List<Runnable> seq;
private int ind;
public TaskChain(List<Runnable> seq) {
this.seq = seq;
}
@Override
public void run() {
seq.get(ind++).run(); //NOTE: No special error handling
if (ind < seq.size())
executor.execute(this);
}
}
我建议使用任务队列:
快速搜索结果表明Java API本身没有任务/线程队列。但是,如果您知道一些好的教程/实现,则可以自由列出它们。
我基本上同意Dave的答案,但是如果你需要将CPU时间分配给所有“组”,即所有任务组应该并行进行,你可能会发现这种构造很有用(使用删除作为“锁定”)。虽然我想象它倾向于使用更多的内存,但在我的情况下,这个方法运行得很好:
class TaskAllocator {
private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
= childQueuePerTaskGroup();
public Queue<Runnable> lockTaskGroup(){
return entireWork.poll();
}
public void release(Queue<Runnable> taskGroup){
entireWork.offer(taskGroup);
}
}
和
class DoWork implmements Runnable {
private final TaskAllocator allocator;
public DoWork(TaskAllocator allocator){
this.allocator = allocator;
}
pubic void run(){
for(;;){
Queue<Runnable> taskGroup = allocator.lockTaskGroup();
if(task==null){
//No more work
return;
}
Runnable work = taskGroup.poll();
if(work == null){
//This group is done
continue;
}
//Do work, but never forget to release the group to
// the allocator.
try {
work.run();
} finally {
allocator.release(taskGroup);
}
}//for
}
}
然后,您可以使用最佳线程数来运行DoWork
任务。这有点像循环负载平衡..
您甚至可以做得更复杂,通过在TaskAllocator
中使用此方法而不是简单的队列(具有更多任务剩余的任务组往往会被执行)
ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue =
new ConcurrentSkipListSet(new SophisticatedComparator());
其中SophisticatedComparator
是
class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
int diff = o2.size() - o1.size();
if(diff==0){
//This is crucial. You must assign unique ids to your
//Subqueue and break the equality if they happen to have same size.
//Otherwise your queues will disappear...
return o1.id - o2.id;
}
return diff;
}
}
BlockingQueue
可能会有一定的限制,但现在我理解了你的观点。 - Enno ShiojiActor也是解决这种特定类型问题的另一种方案。 Scala有actors,Java也提供了AKKA。
我曾经遇到过类似的问题,我使用了一个与Executor
一起工作的ExecutorCompletionService
来完成任务集合。以下是自Java7以来java.util.concurrent API的摘录:
假设您有一组解决某个问题的求解器,每个求解器返回某种类型Result的值,并且希望并发运行它们,在每个返回非空值的求解器中处理其结果,使用某些方法use(Result r)。您可以编写如下代码:
void solve(Executor e, Collection<Callable<Result>> solvers)
throws InterruptedException, ExecutionException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get();
if (r != null)
use(r);
}
}
所以,在您的情况下,每个任务将是一个单独的Callable<Result>
,并且任务将分组在一个Collection<Callable<Result>>
中。
Reference: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html