如何在Java中以最高效和优雅的方式使用并行处理

4

我有不同的数据源,想要并行请求(因为每个请求都是一个http调用,可能会非常耗时)。但我只会使用这些请求中的1个响应。所以我会对它们进行优先级排序。如果第一个响应无效,我将检查第二个响应。如果第二个响应也无效,我想使用第三个响应等等。

但是,我希望在收到第一个正确响应后停止处理并返回结果。

为了模拟这个问题,我创建了下面这段代码,在其中我试图使用Java并行流。但是问题是,我只有在处理所有请求之后才会收到最终结果。

public class ParallelExecution {

    private static Supplier<Optional<Integer>> testMethod(String strInt) {
        return () -> {
            Optional<Integer> result = Optional.empty();
            try {
                result = Optional.of(Integer.valueOf(strInt));
                System.out.printf("converted string %s to int %d\n",
                        strInt,
                        result.orElse(null));
            } catch (NumberFormatException ex) {
                System.out.printf("CANNOT CONVERT %s to int\n", strInt);
            }

            try {
                int randomValue = result.orElse(10000);
                TimeUnit.MILLISECONDS.sleep(randomValue);
                System.out.printf("converted string %s to int %d in %d milliseconds\n",
                        strInt,
                        result.orElse(null), randomValue);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result;
        };
    }

    public static void main(String[] args) {
        Instant start = Instant.now();
        System.out.println("Starting program: " + start.toString());
        List<Supplier<Optional<Integer>>> listOfFunctions = new ArrayList();
        for (String arg: args) {
            listOfFunctions.add(testMethod(arg));
        }
        Integer value = listOfFunctions.parallelStream()
                .map(function -> function.get())
                .filter(optValue -> optValue.isPresent()).map(val-> {
                    System.out.println("************** VAL: " + val);
                    return val;
                }).findFirst().orElse(null).get();
        Instant end = Instant.now();
        Long diff = end.toEpochMilli() - start.toEpochMilli();
        System.out.println("final value:" + value + ", worked during " + diff + "ms");
    }
}

所以当我使用以下命令执行程序时:

$java ParallelExecution dfafj 34 1341 4656 dfad 245df 5767

我希望尽快得到结果“34”(大约34毫秒后),但实际上,我等待了超过10秒钟。您能帮忙找到最有效的解决方案吗?
3个回答

2

ExecutorService#invokeAny 看起来是一个不错的选择。

List<Callable<Optional<Integer>>> tasks = listOfFunctions
    .stream()
    .<Callable<Optional<Integer>>>map(f -> f::get)
    .collect(Collectors.toList());

ExecutorService service = Executors.newCachedThreadPool();
Optional<Integer> value = service.invokeAny(tasks);

service.shutdown();

我将你的 List<Supplier<Optional<Integer>>> 转换成了一个 List<Callable<Optional<Integer>>>,以便在 invokeAny 中传递它。你可以最初构建 Callables。然后,我创建了一个 ExecutorService 并提交了任务。

第一个成功执行的任务的结果将会在从任务返回该结果时立即返回。其他任务将被中断。

你也可以看一下 CompletionService

List<Callable<Optional<Integer>>> tasks = Arrays
    .stream(args)
    .<Callable<Optional<Integer>>>map(arg -> () -> testMethod(arg).get())
    .collect(Collectors.toList());

final ExecutorService underlyingService = Executors.newCachedThreadPool();
final ExecutorCompletionService<Optional<Integer>> service = new ExecutorCompletionService<>(underlyingService);
tasks.forEach(service::submit);

Optional<Integer> value = service.take().get();
underlyingService.shutdownNow();

如果我理解正确,ExecutorCompletionServiceExecutorService 相关联,并且允许我们使用 take 方法在任何顺序下获取结果,只要有一个结束了就可以。我之前使用过 List<Future>(按调用顺序,不好)或回调方法(冗长)来处理这些工作。 - AxelH
@AxelH 是的,使用 take() 方法时顺序并不重要。 - Andrew Tobilko
1
但是invokeAny对于第一个任务返回空的Optional也可以正常工作。最好一开始就创建Callable而不是需要包装的Supplier。然后,异常应该简单地传播给调用者,而不是被转换为空的可选项。 - Holger
@Holger 是的,我考虑过了。最好传播一个 NumberFormatException 而不是捕获它并返回一个空的可选项。 - Andrew Tobilko

0
你可以使用队列来存储你的结果:
private static void testMethod(String strInt, BlockingQueue<Integer> queue) {
    // your code, but instead of returning anything:
    result.ifPresent(queue::add);
}

然后使用以下代码调用

for (String s : args) {
    CompletableFuture.runAsync(() -> testMethod(s, queue));
}
Integer result = queue.take();

请注意,这只处理第一个结果,就像您的示例一样。

0

我已经尝试使用competableFutures和anyOf方法。它将在任何一个future完成时返回。现在,停止其他任务的关键是为completableFuture(s)提供自己的执行器服务,并在需要时关闭它。

  public static void main(String[] args) {
    Instant start = Instant.now();
    System.out.println("Starting program: " + start.toString());
    CompletableFuture<Optional<Integer>> completableFutures[] = new CompletableFuture[args.length];
    ExecutorService es = Executors.newFixedThreadPool(args.length,r -> {
            Thread t = new Thread(r);
            t.setDaemon(false);
            return t;
    });

    for (int i = 0;i < args.length; i++) {
        completableFutures[i] = CompletableFuture.supplyAsync(testMethod(args[i]),es);
    }
    CompletableFuture.anyOf(completableFutures).
            thenAccept(res-> {
                System.out.println("Result - " + res + ", Time Taken : " + (Instant.now().toEpochMilli()-start.toEpochMilli()));
                es.shutdownNow();
            });
}

注意:它会抛出中断异常,你可以在try catch块中忽略它而不打印堆栈跟踪。此外,你的线程池大小理想情况下应该与args数组的长度相同。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接