如何创建一个Future,当任何给定的CompletableFuture完成并返回符合特定谓词条件的结果时,该Future也随之完成?

4

我有一个 CompletableFuture<MyService> 列表:

List<CompletableFuture<MyService>>

其中MyService是不可变的,像以下这样:

public class MyService {
    public boolean isAvailable() {
         return true; // or false
    }
}

我现在希望有一个未来,当以下任意一种情况发生时,该未来就完成:
  • 已经完成;并且
  • 对于由该未来提供的MyService实例:MyService.isAvailable()返回true
在继续之前,我需要可用的MyService实例。换句话说,我想要一个CompletableFuture<MyService>,当满足这两个条件时就会完成。
我该如何做?我知道我可以使用CompletableFuture.anyOf(...)在一个未来完成时进行处理,但我不确定如何集成第二个要求:MyService.isAvailable()必须为真。

3
您的意思是“继续进行”是什么? 您想做什么? 调用 MyService.isAvailable() 有什么问题吗? 您想等待它变为真吗? 此外,由于语法不正确,您的问题标题并没有太多意义。 - Didier L
我想要一个CompletableFuture,当满足两个条件时完成。 - Jade Dezo
您的服务需要提供一个在可用时完成的未来。否则,唯一的方法是定期轮询服务以检查其是否可用... - Didier L
@DidierL 我不确定。请注意,MyService.isAvailable() 根据内部成员返回 truefalse,因此不执行任何异步检查或其他操作。我需要某种形式的链式 CompletableFuture,就像您在流上使用 filter() 一样。 - Jade Dezo
最终,提出的任何答案都有帮助吗? - Didier L
3个回答

3
更新:我现在认为我已经理解了你的问题。你是否可以这样代理你的 futures 会有帮助呢?
List<CompletableFuture<MyService>> givenFutures; // wherever they came from

CompletableFuture<MyService>[] myFutures = givenFutures.stream()
        .map(future -> {
            final CompletableFuture<MyService> futureWithCheck = new CompletableFuture<>();

            future.thenAccept(myService -> {
                if (myService.isAvailable()) {
                    futureWithCheck.complete(myService);
                }
            });

            return futureWithCheck;})
        .toArray(CompletableFuture[]::new);

// blocking
MyService availableService = (MyService) CompletableFuture.anyOf(myFutures).get();

// do what you want to do with the available service

更新2:我想到了你关于thenCompose的问题,也许我的解决方案的中间部分可以这样表达:
CompletableFuture<MyService>[] myFutures = givenFutures.stream()
        .map(future ->
                future.thenCompose(myService ->
                        myService.isAvailable() ? CompletableFuture.completedFuture(myService) : new CompletableFuture<>()))
        .toArray(CompletableFuture[]::new);

旧答案,为了完整性:

(本来想评论的,但是我声望太低了)

看起来你要么必须反复轮询MyService.isAvailable()并在返回true后完成另一个CompletableFuture,要么MyService将(就像Didier L评论的那样)返回一个Future,它会跟踪并完成内部成员改变后的操作。例如,每个涉及的成员的setter将不得不检查isAvailable()是否为true,如果是,就完成之前分配的所有future。

在两种情况下,您都必须将额外的Future与您拥有的连接起来。Guavas Futures有一些有用的方法,但可能CompletableFuture.allOf(…)能够胜任。


如果永远不会改变,你还在等待什么(一旦其他的"Futures"完成)?这意味着你要么永远等待,要么立即继续。在这两种情况下,都不需要一个future,只需在其他"futures"完成后简单调用isAvailable()即可。或者您会等待MyService的实例被替换为一个其中isAvailable()为真的实例吗?那么设置这个实例的地方就是您必须连接到最后的Future的地方。 - Ole
不,我不这么认为。CompletableFuture.anyOf(...)CompletableFuture<MyService> 中的任一项完成时就会停止等待。但是特定的MyService可能会返回isAvailable()为false。因此,只要没有isAvailable() 返回true 的MyService实例,我就需要继续等待。 - Jade Dezo
a) 我认为你在这里混淆了并发性和其他问题。 b) 有 CompletableFuture.anyOf(…)CompletableFuture.allOf(…) - Ole
看起来很有前途。如果它有效,请告诉我,我会相应地更新我的答案。 - Ole
thenCompose()仅在未来完成后才执行,因此基本上与allOf()相同。 - Didier L
显示剩余7条评论

1
您可以尝试以下类似的服务,仅当服务可用时才完成未来的操作:
public static class MyService {
    private String name;

    public MyService(String name) {
        this.name = name;
    }

    public String toString() {
        return name;
    }

    public CompletableFuture<MyService> isAvailable() {
        CompletableFuture<MyService> future = new CompletableFuture<>();
        Executors.newCachedThreadPool().submit(() -> {
            boolean available = checkAvailability();
            if (available)
                future.complete(this);
        });
        return future;
    }

    private boolean checkAvailability() {
        try {
            int ms = new Random().nextInt(1000);
            Thread.sleep(ms);
        } catch (InterruptedException e) {
        }
        return new Random().nextBoolean();
    }
}

然后像这样使用该服务:
MyService s1 = new MyService("one");
MyService s2 = new MyService("two");
CompletableFuture<MyService> f1 = s1.isAvailable();
CompletableFuture<MyService> f2 = s2.isAvailable();

System.out.println("waiting for the first service to be available...");
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2);
System.out.println("and the winner is: " + any.get());

// you can now safely cancel all futures
f1.cancel(true);
f2.cancel(true);

请阅读我的其他评论,这不是我真正想要的。我有一个 List<CompletableFuture<MyService>>,而且我无法更改 MyService 的源代码。我需要等待 MyService 实例可用 并且 其中 isAvailable() 返回 true - Jade Dezo

1
考虑到 CompletableFuture 只能被完成一次,您可以创建一个由第一个返回可用服务的 future 完成的 CompletableFuture:
List<CompletableFuture<MyService>> myServiceFutures = …

final CompletableFuture<MyService> availableMyServiceFuture = new CompletableFuture<>();
myServiceFutures.forEach(future -> future.thenAccept(myService -> {
    if (myService.isAvailable()) {
        // call will be ignored if already completed
        availableMyServiceFuture.complete(myService);
    }
}));

此外,如果没有可用服务,您可能希望这个未来完成得非常出色。
CompletableFuture.allOf(myServiceFutures.toArray(new CompletableFuture[myServiceFutures.size()]))
        .whenComplete((e, r) -> {
            // TODO handle failure of individual futures if that can happen
            if (myServiceFutures.stream().map(CompletableFuture::join).noneMatch(MyService::isAvailable)) {
                availableMyServiceFuture.completeExceptionally(new IllegalStateException("No service is available"));
            }
        });

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接