CompletionService何时完成结果传递?

7

我希望使用CompletionService处理一系列线程的结果,在它们完成时。我将服务放在循环中,以获取提供的Future对象,但是我不知道确定所有线程何时完成(从而退出循环)的最佳方法:

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class Bar {

    final static int MAX_THREADS = 4;
    final static int TOTAL_THREADS = 20;

    public static void main(String[] args) throws Exception{

        final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(MAX_THREADS);
        final CompletionService<Integer> service = new ExecutorCompletionService<Integer>(threadPool);

        for (int i=0; i<TOTAL_THREADS; i++){
            service.submit(new MyCallable(i));
        }

        int finished = 0;
        Future<Integer> future = null;
        do{
            future = service.take();
            int result = future.get();
            System.out.println("  took: " + result);
            finished++;             

        }while(finished < TOTAL_THREADS);

        System.out.println("Shutting down");
        threadPool.shutdown();
    }


    public static class MyCallable implements Callable<Integer>{

        final int id;

        public MyCallable(int id){
            this.id = id;
            System.out.println("Submitting: " + id);
        }

        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println("finished: " + id);
            return id;
        }
    }
}

我尝试检查ThreadPoolExecutor的状态,但我知道getCompletedTaskCount和getTaskCount方法只是近似值,不能依赖它们。有没有更好的方法来确保我已经从CompletionService中检索到了所有的Futures,而不是自己计数?


编辑:Nobeh提供的链接和this link都建议计算提交的任务数,然后调用take()相同次数。我只是惊讶于没有一种方法可以询问CompletionService或其Executor还剩下什么需要返回。
4个回答

7
请参考http://www.javaspecialists.eu/archive/Issue214.html,以了解如何扩展ExecutorCompletionService以完成您需要的功能。以下是该代码的相关部分,供您参考。作者还建议使服务实现Iterable,我认为这是个好主意。

顺便说一下,我同意您的观点,这确实应该成为标准实现的一部分,但不幸的是,它并不是。

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

public class CountingCompletionService<V> extends ExecutorCompletionService<V> {
  private final AtomicLong submittedTasks = new AtomicLong();
  private final AtomicLong completedTasks = new AtomicLong();

  public CountingCompletionService(Executor executor) {
    super(executor);
  }

  public CountingCompletionService(
      Executor executor, BlockingQueue<Future<V>> queue) {
    super(executor, queue);
  }

  public Future<V> submit(Callable<V> task) {
    Future<V> future = super.submit(task);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> submit(Runnable task, V result) {
    Future<V> future = super.submit(task, result);
    submittedTasks.incrementAndGet();
    return future;
  }

  public Future<V> take() throws InterruptedException {
    Future<V> future = super.take();
    completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll() {
    Future<V> future = super.poll();
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public Future<V> poll(long timeout, TimeUnit unit)
      throws InterruptedException {
    Future<V> future = super.poll(timeout, unit);
    if (future != null) completedTasks.incrementAndGet();
    return future;
  }

  public long getNumberOfCompletedTasks() {
    return completedTasks.get();
  }

  public long getNumberOfSubmittedTasks() {
    return submittedTasks.get();
  }

  public boolean hasUncompletedTasks() {
    return completedTasks.get() < submittedTasks.get();
  }
}

4
下面的代码受 @Mark 回答的启发,但我发现使用它更加方便:
package com.example;

import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CompletionIterator<T> implements Iterator<T>, AutoCloseable {

    private AtomicInteger count = new AtomicInteger(0);
    
    private CompletionService<T> completer;
    
    private ExecutorService executor = Executors.newWorkStealingPool(100);
    
    public CompletionIterator() {
        this.completer = new ExecutorCompletionService<>(executor);
    }

    public void submit(Callable<T> task) {
        completer.submit(task);
        count.incrementAndGet();
      }

    @Override
    public boolean hasNext() {
        return count.decrementAndGet() >= 0;
    }

    @Override
    public T next() {
        try {
            return completer.take().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void close() {
        try {
            executor.shutdown();
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
            executor = null;
            completer = null;
            count = null;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    
}

这是如何使用它的示例:
try(CompletionIterator service = new CompletionIterator()) {
  service.submit(task1);
  service.submit(task2);
  // all tasks must be submitted before iterating, to avoid race condition
  for (Future<Integer> future : service) {
    System.out.printf("Job %d is done%n", future.get());
  }
}

看起来很漂亮。 - fedd
你的 hasNext() 方法中有一个 off by one 错误。应该是:return count.decrementAndGet() >= 0; - CaMiX
修好了!谢谢。 - Alex R

3
回答这些问题能为您提供答案吗?
  • 您的异步任务是否创建其他提交到CompletionService的任务?
  • service是唯一一个应该处理应用程序中创建的任务的对象吗?
根据参考文档CompletionService采用消费者/生产者方法,并利用内部Executor。所以,只要你在一个地方生成任务并在另一个地方消费它们,CompletionService.take()将表示是否还有更多的结果要输出。
我相信这个问题也会对您有所帮助。

谢谢,nobeh。看起来他们也只是在线程计数上循环,在他们的“for(int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++)”循环中。我只是想知道是否有更微妙的并发API可以使用。 - Ed Beaty
1
API中的示例使用了相同的方法,连续执行take() n次。http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorCompletionService.html - Ed Beaty
如果在不同于任务提交到执行器的线程中从CompletionService获取结果,那么它是线程安全的吗? - raffian
线程安全在使用CompletionServiceExecutorService的并发API时,我想是一个直接的收益,因为它们简单地_抽象_和_封装_了内部如何使用线程。 - nobeh
3
需要强调的是,您需要跟踪提交到CompletionService的作业数量。CompletionService.take()的javadoc意味着它只会阻塞直到接收到另一个Future。这意味着除非您使用提交的线程计数终止循环,否则将无限期地阻塞。 - Matt Lachman

2

根据Alex R变体,我的看法是这个计数器只会在一个线程中被调用,所以不需要原子操作,只需普通的整数计数器。

public class CompletionIterator<T> implements Iterable<T> {

    private int _count = 0;
    private final CompletionService<T> _completer;

    public CompletionIterator(ExecutorService executor) {
        this._completer = new ExecutorCompletionService<>(executor);
    }

    public void submit(Callable<T> task) {
        _completer.submit(task);
        _count++;
    }

    @Override
    public Iterator<T> iterator() {
        return new Iterator<T>() {

            @Override
            public boolean hasNext() {
                return _count > 0;
            }

            @Override
            public T next() {
                try {
                    T ret = _completer.take().get();
                    _count--;
                    return ret;
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException(e);
                }
            }

        };
    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接