我有一组供应商,他们提供相同的结果,但速度不同(且变化不定)。
我想找到一种优雅的方法,让所有供应商同时启动,并在其中任意一个产生值时立即返回该值(丢弃其他结果)。
我尝试过使用并行流和 Stream.findAny()
来解决这个问题,但它似乎总是阻塞直到所有结果都产生。
这里是一个单元测试,展示了我的问题:
import org.junit.Test;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Stream;
import static org.junit.Assert.*;
public class RaceTest {
@Test
public void testRace() {
// Set up suppliers
Set<Supplier<String>> suppliers = Collections.newSetFromMap(new ConcurrentHashMap<>());
suppliers.add(() -> "fast"); // This supplier returns immediately
suppliers.add(() -> {
try {
Thread.sleep(10_000);
return "slow";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}); // This supplier takes 10 seconds to produce a value
Stream<Supplier<String>> stream = suppliers.parallelStream();
assertTrue(stream.isParallel()); // Stream can work in parallel
long start = System.currentTimeMillis();
Optional<String> winner = stream
.map(Supplier::get)
.findAny();
long duration = System.currentTimeMillis() - start;
assertTrue(winner.isPresent()); // Some value was produced
assertEquals("fast", winner.get()); // The value is "fast"
assertTrue(duration < 9_000); // The whole process took less than 9 seconds
}
}
测试的结果是,由于整个测试需要约10秒钟才能完成,因此最后一个断言失败。
我在这里做错了什么?
Concurrent…
集合。任何集合都可以,只要在操作期间不进行修改。实际上,由于其内部工作方式,普通的集合在执行此任务时表现更好,即使是并行执行也是如此。但正如其他人已经说过的那样,Stream
API在这里不是正确的工具。 - Holger