我有不同的数据源,想要并行请求(因为每个请求都是一个http调用,可能会非常耗时)。但我只会使用这些请求中的1个响应。所以我会对它们进行优先级排序。如果第一个响应无效,我将检查第二个响应。如果第二个响应也无效,我想使用第三个响应等等。
但是,我希望在收到第一个正确响应后停止处理并返回结果。
为了模拟这个问题,我创建了下面这段代码,在其中我试图使用Java并行流。但是问题是,我只有在处理所有请求之后才会收到最终结果。
public class ParallelExecution {
private static Supplier<Optional<Integer>> testMethod(String strInt) {
return () -> {
Optional<Integer> result = Optional.empty();
try {
result = Optional.of(Integer.valueOf(strInt));
System.out.printf("converted string %s to int %d\n",
strInt,
result.orElse(null));
} catch (NumberFormatException ex) {
System.out.printf("CANNOT CONVERT %s to int\n", strInt);
}
try {
int randomValue = result.orElse(10000);
TimeUnit.MILLISECONDS.sleep(randomValue);
System.out.printf("converted string %s to int %d in %d milliseconds\n",
strInt,
result.orElse(null), randomValue);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
};
}
public static void main(String[] args) {
Instant start = Instant.now();
System.out.println("Starting program: " + start.toString());
List<Supplier<Optional<Integer>>> listOfFunctions = new ArrayList();
for (String arg: args) {
listOfFunctions.add(testMethod(arg));
}
Integer value = listOfFunctions.parallelStream()
.map(function -> function.get())
.filter(optValue -> optValue.isPresent()).map(val-> {
System.out.println("************** VAL: " + val);
return val;
}).findFirst().orElse(null).get();
Instant end = Instant.now();
Long diff = end.toEpochMilli() - start.toEpochMilli();
System.out.println("final value:" + value + ", worked during " + diff + "ms");
}
}
所以当我使用以下命令执行程序时:
$java ParallelExecution dfafj 34 1341 4656 dfad 245df 5767
我希望尽快得到结果“34”(大约34毫秒后),但实际上,我等待了超过10秒钟。您能帮忙找到最有效的解决方案吗?
ExecutorCompletionService
与ExecutorService
相关联,并且允许我们使用take
方法在任何顺序下获取结果,只要有一个结束了就可以。我之前使用过List<Future>
(按调用顺序,不好)或回调方法(冗长)来处理这些工作。 - AxelHtake()
方法时顺序并不重要。 - Andrew TobilkoinvokeAny
对于第一个任务返回空的Optional
也可以正常工作。最好一开始就创建Callable
而不是需要包装的Supplier
。然后,异常应该简单地传播给调用者,而不是被转换为空的可选项。 - HolgerNumberFormatException
而不是捕获它并返回一个空的可选项。 - Andrew Tobilko