使用并行流返回最快提供的值

15

我有一组供应商,他们提供相同的结果,但速度不同(且变化不定)。

我想找到一种优雅的方法,让所有供应商同时启动,并在其中任意一个产生值时立即返回该值(丢弃其他结果)。

我尝试过使用并行流和 Stream.findAny() 来解决这个问题,但它似乎总是阻塞直到所有结果都产生。

这里是一个单元测试,展示了我的问题:

import org.junit.Test;

import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.junit.Assert.*;

public class RaceTest {

    @Test
    public void testRace() {
        // Set up suppliers
        Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
        suppliers.add(() -> "fast"); // This supplier returns immediately
        suppliers.add(() -> {
            try {
                Thread.sleep(10_000);
                return "slow";
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }); // This supplier takes 10 seconds to produce a value

        Stream<Supplier<String>> stream = suppliers.parallelStream();
        assertTrue(stream.isParallel()); // Stream can work in parallel
        long start = System.currentTimeMillis();
        Optional<String> winner = stream
                .map(Supplier::get)
                .findAny();
        long duration = System.currentTimeMillis() - start;
        assertTrue(winner.isPresent()); // Some value was produced
        assertEquals("fast", winner.get()); // The value is "fast"
        assertTrue(duration < 9_000); // The whole process took less than 9 seconds
    }
}

测试的结果是,由于整个测试需要约10秒钟才能完成,因此最后一个断言失败。

我在这里做错了什么?


2
我刚才测试了你的代码,没有失败(这里用的是JDK 1.8.0_15版本)。但是,“findAny”方法是不确定性的,所以可能只是运气好。 - Tunaki
1
如果您将快速供应商添加4次(设置大小=5),它将按预期工作,我怀疑当流太小时,它会恢复为顺序执行。 - assylias
2
这里不需要源是Concurrent…集合。任何集合都可以,只要在操作期间不进行修改。实际上,由于其内部工作方式,普通的集合在执行此任务时表现更好,即使是并行执行也是如此。但正如其他人已经说过的那样,Stream API在这里不是正确的工具。 - Holger
3个回答

12

在这种情况下,最好使用Callable而不是Supplier(具有相同的函数签名),并使用自从Java 5以来就存在的老式并发API:

Set<Callable<String>> suppliers=new HashSet<>();
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
        Thread.sleep(10_000);
        return "slow";
    }
);

ExecutorService es=Executors.newCachedThreadPool();
try {

    String result = es.invokeAny(suppliers);
    System.out.println(result);

} catch (InterruptedException|ExecutionException ex) {
    Logger.getLogger(MyClass.class.getName()).log(Level.SEVERE, null, ex);
}
es.shutdown();

注意,整个“运行所有并返回最快结果”现在变成了一个方法调用...

它还有一个额外的好处,在一个结果可用时取消/中断所有待处理的操作,因此慢操作不会真正等待完整的十秒钟(大多数情况下是如此,因为时间不确定)。


invokeAny 的 Javadoc 表示:“执行给定的任务,返回已成功完成的任务的结果”。这是否意味着返回的任务始终是第一个完成的任务(而不是其他任务)? - Tunaki
1
@Tunaki:interface 无法指定这样的保证,因为它取决于实际执行者实现的逻辑,是否存在这种情况。例如,在线程池执行程序的情况下,您必须确保它没有限制线程数量或至少有与任务可用数量相同的空闲线程,才能按预期工作。newCachedThreadPool() 返回的执行程序没有限制。但是,虽然返回最快的是 invokeAny 的明显意图,但如果您想尽可能正式地表达它,当涉及并发时从来没有这样的保证... - Holger
这个完美地运行了,并且也像我预期的那样处理了失败的任务(只有在没有成功的任务时才抛出异常)。 - Gustav Karlsson

4
您目前使用的代码是不确定性的。引用 findAny() 的 Javadoc:

此操作的行为明确是不确定性的;它可以在流中选择任何元素。

您可以使用 CompletionService 并将所有任务提交给它。然后,CompletionService.take() 会返回第一个完成的任务的 Future
long start = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(suppliers.size());
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
suppliers.forEach(s -> completionService.submit(() -> s.get()));
String winner = completionService.take().get();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds

1
这在我看来比目前被接受的答案更具确定性和惯用性。或者说:这正是CompletionService的用途! - Marco13

2

流API不适合处理此类任务,因为它不能保证何时完成任务。更好的解决方案是使用CompletableFuture

long start = System.currentTimeMillis();
String winner = CompletableFuture
        .anyOf(suppliers.stream().map(CompletableFuture::supplyAsync)
                .toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds

请注意,如果公共 FJP 的并行级别不足,则仍然可能无法同时启动所有供应商。要解决这个问题,您可以创建自己的池,具有所需的并行级别:
long start = System.currentTimeMillis();
ForkJoinPool fjp = new ForkJoinPool(suppliers.size());
String winner = CompletableFuture
        .anyOf(suppliers.stream().map(s -> CompletableFuture.supplyAsync(s, fjp))
                .toArray(CompletableFuture[]::new)).join().toString();
long duration = System.currentTimeMillis() - start;
assertEquals("fast", winner); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
fjp.shutdownNow();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接