使用ExecutorService如何等待所有线程完成?

459

我需要每次执行4个任务,就像这样:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

我该如何在所有任务完成后得到通知?目前,我想不出比设置一些全局任务计数器更好的方法,并在每个任务结束时将其减少,然后监视这个计数器是否变为0;或者获取Future列表并在无限循环中监视它们全部都已完成。有没有更好的解决方案,不涉及无限循环?

谢谢。

27个回答

5

您可以将您的任务包装在另一个可运行对象中,该对象将发送通知:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});

1
花了我一段时间才看出这将如何解决OP的问题。首先,请注意,这个包装是针对每个任务而不是启动所有任务的代码。假设,每次启动都会增加一个计数器,每次完成都会减少该计数器,或者会增加一个completed计数器。因此,在启动它们全部后,在每个通知时,可以确定是否所有任务都已完成。请注意,使用try/finally非常重要,以便即使任务失败,也会给出完成通知(或在catch块中的替代通知)。否则,将永远等待。 - ToolmakerSteve

4

使用ExecutorService实现清洁的方式

 List<Future<Void>> results = null;
 try {
     List<Callable<Void>> tasks = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(4);
     results = executorService.invokeAll(tasks);
 } catch (InterruptedException ex) {
     ...
 } catch (Exception ex) {
     ...
 }

3

除了使用闩锁/屏障的不同选择之外,为了在所有计算完成之前获得部分结果,您还可以使用CompletionService

引用自《Java并发编程实战》: “如果您有一批要提交给执行器的计算,并且希望在它们变得可用时检索其结果,可以保留与每个任务相关联的Future,并通过调用具有超时为零的get来重复轮询完成。这是可能的,但繁琐。幸运的是,有一个更好的方法:完成服务。”

以下是实现代码:

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}

3
这是我的解决方案,基于"AdamSkywalker"的建议,它可以正常运行。
package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}

3

我刚刚写了一个样例程序来解决你的问题。因为没有给出简洁的实现,所以我会添加一个。虽然你可以使用executor.shutdown()executor.awaitTermination(),但这不是最佳实践,因为不同线程所需的时间是不可预测的。

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

很好,你展示了future.get的使用--这是一个不错的替代方案。但是,为什么你认为等待永远比设置一些最大可接受的超时时间更好呢?更重要的是,如果你想要等待(基本上是永远)直到所有任务完成,那么只需给awaitTermination一个非常长的时间即可,而无需进行所有这些逻辑处理。 - ToolmakerSteve
这与此处已经呈现的解决方案没有区别。你的解决方案只是和 @sjlee 呈现的一样。 - Manish Kumar Sharma
不确定为什么需要检查完成状态,因为根据Oracle文档,invokeAll将只返回“当所有任务完成或超时到期时,以先发生的为准”。 - Mashrur

3

Project Loom中使用AutoCloseable执行器服务的Try-with-Resources语法

Project Loom旨在为Java的并发能力添加新功能。

其中一个功能是使ExecutorService成为AutoCloseable。这意味着每个ExecutorService实现都将提供一个close方法。这也意味着我们可以使用try-with-resources语法自动关闭ExecutorService对象。

ExecutorService#close 方法会阻塞直到所有提交的任务完成。使用 close 可以替代调用 shutdownawaitTermination

实现 AutoCloseable 是 Project Loom 试图将 “结构化并发” 带入 Java 的一部分。

try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit your `Runnable`/`Callable` tasks to the executor service.
    …
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

想要了解更多关于 Project Loom 的信息,请搜索 Ron Pressler 和其他 Project Loom 团队成员所做的演讲和采访。重点关注最近的内容,因为 Project Loom 已经发展。

Project Loom 技术的实验性构建现在可用, 基于早期访问的 Java 18


2

以下是我从相关问题中发布的答案,以便于有人想要更简单的方法来完成此操作。

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.

2
我创建了以下的工作示例。这个想法是有一种方法来处理任务池(我使用一个队列作为例子),用许多线程(由numberOfTasks / threshold在程序中确定)来执行,等待所有线程完成后才能继续进行其他处理。"Original Answer"翻译成"最初的回答"。
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

最初的回答
希望这能对你有所帮助!

2
你可以使用以下代码:
public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");

1

您应该使用 executorService.shutdown()executorService.awaitTermination 方法。

例如:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}

在shutdown()之后是否需要使用awaitTermination()呢? - Gaurav

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接