如何在Java中等待多个任务的完成?

12

在Java应用程序中实现并发的适当方式是什么?我当然知道线程等相关内容,我已经使用Java编程有10年了,但对并发的经验不是很多。

例如,我需要异步加载一些资源,只有在所有资源都被加载完后,才能继续做更多工作。不必说,它们完成的顺序是没有规律的。我该怎么做?

在JavaScript中,我喜欢使用jQuery.deferred基础设施来表述

$.when(deferred1,deferred2,deferred3...)
 .done(
   function(){//here everything is done
    ...
   });

那我在Java中应该做什么呢?


Java的哪个版本? - Matthew Herbst
看一下 RxJava。 - tddmonkey
在谷歌上搜索Java线程。在Java中实现并发非常容易。 - Felipe Sulser
1
@FelipeSulser 很容易做错,这在 SO 上的问题经常可以看到。 - Kayaman
2
CountdownLatch或CyclicBarrier可以帮助。 - rkosegi
Android Java,Java 7。我知道线程,就像我说的,我处理 Java 已经有10年了。 - f.khantsis
8个回答

9
我会使用并行流。
Stream.of(runnable1, runnable2, runnable3).parallel().forEach(r -> r.run());
// do something after all these are done.

如果您需要异步操作,那么您可以使用线程池或者Thread。
我必须异步加载一些资源,
您可以像这样收集这些资源。
List<String> urls = ....

Map<String, String> map = urls.parallelStream()
                              .collect(Collectors.toMap(u -> u, u -> download(u)));

一旦下载并发完成,这将为您提供所有资源的映射。默认情况下,并发数将是您拥有的CPU数量。


@RobinJonsson forEach在所有任务完成之前不会返回。没有支持异步运行流代码,因此如果需要此功能,则需要使用其他方法。 - Peter Lawrey
1
你并不总是能够运行程序,并且你也不能总是控制异步。例如,我正在调用一个名为 “getMapAsync(OnMapReadyCallback)” 的外部方法,它会立即返回并启动自己的线程。还有其他一些类似的东西需要以类似的方式加载,并在所有事情都完成后执行代码。我认为我找到的最好的解决方案是使用“CountDownLatch”,这是正确的解决方案。 - f.khantsis

8
你可以通过多种方式实现。
1.ExecutorService invokeAll() API

执行给定的任务,当所有任务完成时返回一个持有它们状态和结果的 Future 列表。

2.CountDownLatch

一个同步辅助工具,允许一个或多个线程等待其他线程中正在执行的一组操作完成。 CountDownLatch 使用给定的计数进行初始化。await 方法会阻塞,直到由于调用countDown()方法而导致当前计数达到零,此后所有等待的线程都将被释放,并且任何随后的 await 调用都将立即返回。这是一种一次性现象——计数无法重置。如果您需要一个可以重置计数的版本,请考虑使用 CyclicBarrier。 3.ForkJoinPoolExecutors 中的 newWorkStealingPool() 是另一种方法。 请查看相关的 SE 问题: 如何等待生成自己的线程的线程?

执行者:如果任务是递归创建的,如何同步等待所有任务完成?


很好的答案。我还要补充一下CyclicBarrier - 它有一个构造函数,接受一个Runnable参数,在所有任务完成时将被执行:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CyclicBarrier.html#CyclicBarrier-int-java.lang.Runnable- - Miha_x64
哪种方法是正确的,但不会阻塞任何线程? - Miha_x64

4
如果我没有使用并行流或Spring MVC的TaskExecutor,我通常会使用CountDownLatch。使用任务数进行实例化,每个完成任务的线程减少一次。 CountDownLatch.await() 等待直到计数器为0。非常有用。
阅读更多信息: JavaDocs

1
这和信号量有什么区别? - f.khantsis
1
CountdownLatch 用于启动一系列线程,然后等待它们全部完成(或者直到它们调用 countDown() 指定次数)。Semaphore 用于控制正在使用资源的并发线程数。该资源可以是像文件这样的东西,或者通过限制执行线程的数量来使用 CPU。Semaphore 上的计数器可以因为不同的线程调用 acquire() 和 release() 而上下波动。 - Robin Jonsson
1
是的,但 CountdownLatch 不就是一个不能增加的信号量吗? - f.khantsis
1
它的使用不仅限于此。基本上,它可以作为一个简单的执行池。仅仅将其用于计数(无论是向上还是向下)似乎有点毫无意义。 - Robin Jonsson

3

个人而言,如果我使用Java 8或更高版本,我会这样做。

// Retrieving instagram followers
CompletableFuture<Integer> instagramFollowers = CompletableFuture.supplyAsync(() -> {
    // getInstaFollowers(userId);

    return 0; // default value
});

// Retrieving twitter followers
CompletableFuture<Integer> twitterFollowers = CompletableFuture.supplyAsync(() -> {
    // getTwFollowers(userId);

    return 0; // default value
});

System.out.println("Calculating Total Followers...");
CompletableFuture<Integer> totalFollowers = instagramFollowers
    .thenCombine(twitterFollowers, (instaFollowers, twFollowers) -> {
         return instaFollowers + twFollowers; // can be replaced with method reference
});

System.out.println("Total followers: " + totalFollowers.get()); // blocks until both the above tasks are complete

我在这里使用了 supplyAsync(),因为我需要从任务中返回一些值(在这种情况下是关注者数量),否则我可以使用runAsync()。这两个方法都会在单独的线程中运行任务。
最后,我使用了thenCombine()来连接两个CompletableFuture。如果一个future取决于另一个future,则可以使用thenCompose()来连接两个CompletableFuture。但在这种情况下,由于两个任务都可以并行执行,所以我使用了thenCombine()
方法getInstaFollowers(userId)getTwFollowers(userId)是简单的HTTP调用或其他什么东西。

1

1

在Java 8中,您可以使用名为CompletableFuture的类来实现与jQuery$.Deferred相同的行为。该类提供了用于处理Promises的API。要创建异步代码,您可以使用其static创建方法之一,例如#runAsync#supplyAsync。然后使用#thenApply应用一些结果计算。


1

这是一个我使用线程的示例。它是一个具有50个固定大小线程的静态执行服务。

public class ThreadPoolExecutor {

private static final ExecutorService executorService = Executors.newFixedThreadPool(50,
        new ThreadFactoryBuilder().setNameFormat("thread-%d").build());

private static ThreadPoolExecutor instance = new ThreadPoolExecutor();

public static ThreadPoolExecutor getInstance() {
    return instance;
}

public <T> Future<? extends T> queueJob(Callable<? extends T> task) {
    return executorService.submit(task);
}

public void shutdown() {
    executorService.shutdown();
}
}

执行器的业务逻辑使用方式如下:(您可以使用Callable或Runnable。Callable可以返回结果,而Runnable不行)
public class MultipleExecutor implements Callable<ReturnType> {//your code}

执行者的呼叫:

ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutor.getInstance();

List<Future<? extends ReturnType>> results = new LinkedList<>();

for (Type Type : typeList) {
            Future<? extends ReturnType> future = threadPoolExecutor.queueJob(
                    new MultipleExecutor(needed parameters));
            results.add(future);
        }

        for (Future<? extends ReturnType> result : results) {
            try {
                if (result.get() != null) {
                    result.get(); // here you get the return of one thread
                }
            } catch (InterruptedException | ExecutionException e) {
                logger.error(e, e);
            }
        }

0

我通常选择一种异步的通知-开始,通知-进展,通知-结束的方法:

class Task extends Thread {
    private ThreadLauncher parent;

    public Task(ThreadLauncher parent) {
        super();
        this.parent = parent;
    }

    public void run() {
        doStuff();

        parent.notifyEnd(this);
    }

    public /*abstract*/ void doStuff() {
        // ...
    }
}


class ThreadLauncher {

    public void stuff() {
        for (int i=0; i<10; i++)
            new Task(this).start(); 
    }

    public void notifyEnd(Task who) {
        // ...
    }
}

1
是的,如果您愿意,可以实现Runnable。 - Exceptyon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接