Java 8中使用CompletableFuture处理Supplier异常

8
考虑以下代码:
public class TestCompletableFuture {

    BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
        System.out.println(x);
        System.out.println(y);
    };

    public static void main(String args[]) {
        TestCompletableFuture testF = new TestCompletableFuture();
        testF.start();      
    }

    public void start() {
        Supplier<Integer> numberSupplier = new Supplier<Integer>() {
            @Override
            public Integer get() {
                return SupplyNumbers.sendNumbers();                     
            }
        };
        CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer);         
    }       
}

class SupplyNumbers {
    public static Integer sendNumbers(){
        return 25; // just for working sake its not  correct.
    }
}

上述内容运行良好。然而,在我的情况下,sendNumbers 也可能抛出已检查的异常,如下所示:
class SupplyNumbers {
    public static Integer sendNumbers() throws Exception {
        return 25; // just for working sake its not  correct.
    }
}

现在我想把这个异常作为我的biConsumer中的y来处理。这将有助于我在单个函数(biConsumer)中处理结果以及异常(如果有)。 有什么想法吗?我可以在这里使用CompletableFuture.exceptionally(fn)还是其他什么东西?

这里有一个解决方案,可以让您在不降低代码可读性的情况下使用已检查异常:https://dev59.com/oqrka4cB1Zd3GeqPdW7k#49705336 - Gili
5个回答

16

当您想处理受检异常时,使用标准功能接口的工厂方法并没有什么帮助。当您将捕获异常的代码插入lambda表达式中时,会出现一个问题,即catch子句需要CompletableFuture实例来设置异常,而工厂方法需要Supplier,这是个先有鸡还是先有蛋的问题。

您可以使用类的实例字段允许在创建后进行变异,但最终的代码不够简洁,比直接基于Executor的解决方案更加复杂。 《CompletableFuture》文档如下所述:

因此,您知道以下代码将显示CompletableFuture.supplyAsync(Supplier)的标准行为,同时直接处理受检异常:

CompletableFuture<Integer> f=new CompletableFuture<>();
ForkJoinPool.commonPool().submit(()-> {
  try { f.complete(SupplyNumbers.sendNumbers()); }
  catch(Exception ex) { f.completeExceptionally(ex); }
});

文档还说:

……为了简化监控、调试和跟踪,所有生成的异步任务都是 CompletableFuture.AsynchronousCompletionTask 标记接口的实例。

如果你想遵循这个约定,使解决方案更像原始的 supplyAsync 方法,请将代码更改为:

CompletableFuture<Integer> f=new CompletableFuture<>();
ForkJoinPool.commonPool().submit(
  (Runnable&CompletableFuture.AsynchronousCompletionTask)()-> {
    try { f.complete(SupplyNumbers.sendNumbers()); }
    catch(Exception ex) { f.completeExceptionally(ex); }
});

你能解释一下这是什么意思吗?(Runnable&CompletableFuture.AsynchronousCompletionTask) - Jasonw
5
@Jasonw说的是将类型转换为交叉类型。换句话说,对象必须同时实现RunnableCompletableFuture.AsynchronousCompletionTask这两种类型。由于类型转换为Lambda表达式提供了上下文类型,这意味着生成的Lambda实例将同时实现这两个接口。请参见这里 - Holger

7

您已经在 y 中捕获了异常。也许您没有看到它是因为在 CompletableFuture 完成之前,main 已经退出了?

下面的代码如预期地打印了 "null" 和 "Hello":

public static void main(String args[]) throws InterruptedException {
  TestCompletableFuture testF = new TestCompletableFuture();
  testF.start();
  Thread.sleep(1000); //wait for the CompletableFuture to complete
}

public static class TestCompletableFuture {
  BiConsumer<Integer, Throwable> biConsumer = (x, y) -> {
    System.out.println(x);
    System.out.println(y);
  };
  public void start() {
    CompletableFuture.supplyAsync(SupplyNumbers::sendNumbers)
            .whenComplete(biConsumer);
  }
}

static class SupplyNumbers {
  public static Integer sendNumbers() {
    throw new RuntimeException("Hello");
  }
}

3

我不太确定您想要实现什么。如果您的供应商出现异常, 当您调用testFuture .get()时,您将会得到一个java.util.concurrent.ExecutionException,其原因是供应商引发的任何异常,您可以通过在ExecutionException上调用getCause()来检索。

或者,就像您提到的那样,您可以在CompletableFuture中使用exceptionally。这段代码:

public class TestCompletableFuture {

    private static BiConsumer<Integer, Throwable> biConsumer = (x,y) -> {
        System.out.println(x);
        System.out.println(y);
    };

    public static void main(String args[]) throws Exception {
        Supplier<Integer> numberSupplier = () -> {
            throw new RuntimeException(); // or return integer
        };

        CompletableFuture<Integer> testFuture = CompletableFuture.supplyAsync(numberSupplier)
                .whenComplete(biConsumer)
                .exceptionally(exception -> 7);

        System.out.println("result = " + testFuture.get());
    }

}

打印此结果:
null
java.util.concurrent.CompletionException: java.lang.RuntimeException
result = 7

编辑:

如果您有检查异常,可以简单地添加一个try-catch块。

原始代码:

Supplier<Integer> numberSupplier = new Supplier<Integer>() {
    @Override
    public Integer get() {
        return SupplyNumbers.sendNumbers();                     
    }
};

修改后的代码:

Supplier<Integer> numberSupplier = new Supplier<Integer>() {
    @Override
    public Integer get() {
        try {
            return SupplyNumbers.sendNumbers();                     
        } catch (Excetpion e) {
            throw new RuntimeExcetpion(e);
        }
    }
};

2
OP希望能够在调用供应商的代码中处理异常,而不是在供应商本身内部处理。如果您总是以一致的方式处理异常,那么被迫在编写每个供应商时使用try/catch会很笨拙。 - skelly

2
也许你可以使用 new Object 来封装你的整数和错误信息,像这样:
public class Result {

    private Integer   integer;
    private Exception exception;

    // getter setter

}

然后:

public void start(){
    Supplier<Result> numberSupplier = new Supplier<Result>() {
        @Override
        public Result get() {
            Result r = new Result();
            try {
                r.setInteger(SupplyNumbers.sendNumbers());
            } catch (Exception e){
                r.setException(e);
            }
            return r;

        }
    };
    CompletableFuture<Result> testFuture = CompletableFuture.supplyAsync(numberSupplier).whenComplete(biConsumer);
}

2

将已检查异常包装成CompletionException

在使用completeExceptionally()时,需要考虑的另一点是,在CompletableFuture中处理异常时,在handle()whenComplete()中会提供确切的异常,但在调用join()或转发到任何下游阶段时,该异常将被包装在CompletionException中。

因此,应用于下游阶段的handle()exceptionally()将看到一个CompletionException而不是原始异常,并且必须查找其原因以找到原始异常。

此外,任何操作(包括supplyAsync())抛出的RuntimeException也会被包装在CompletionException中,除非它已经是一个CompletionException

考虑到这一点,最好采取安全措施,让您的异常处理程序取消包装CompletionException

如果这样做,那么在 CompletableFuture 上设置确切的(已检查的)异常就没有意义了,直接将已检查的异常包装在 CompletionException 中会更简单
Supplier<Integer> numberSupplier = () -> {
    try {
        return SupplyNumbers.sendNumbers();
    } catch (Exception e) {
        throw new CompletionException(e);
    }
};

Holger的方法进行比较,我使用了两种解决方案来改编您的代码(simpleWrap()是上面的方法,customWrap()是Holger的代码):
public class TestCompletableFuture {

    public static void main(String args[]) {
        TestCompletableFuture testF = new TestCompletableFuture();
        System.out.println("Simple wrap");
        testF.handle(testF.simpleWrap());
        System.out.println("Custom wrap");
        testF.handle(testF.customWrap());
    }

    private void handle(CompletableFuture<Integer> future) {
        future.whenComplete((x1, y) -> {
            System.out.println("Before thenApply(): " + y);
        });
        future.thenApply(x -> x).whenComplete((x1, y) -> {
            System.out.println("After thenApply(): " + y);
        });
        try {
            future.join();
        } catch (Exception e) {
            System.out.println("Join threw " + e);
        }
        try {
            future.get();
        } catch (Exception e) {
            System.out.println("Get threw " + e);
        }
    }

    public CompletableFuture<Integer> simpleWrap() {
        Supplier<Integer> numberSupplier = () -> {
            try {
                return SupplyNumbers.sendNumbers();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        };
        return CompletableFuture.supplyAsync(numberSupplier);
    }

    public CompletableFuture<Integer> customWrap() {
        CompletableFuture<Integer> f = new CompletableFuture<>();
        ForkJoinPool.commonPool().submit(
                (Runnable & CompletableFuture.AsynchronousCompletionTask) () -> {
                    try {
                        f.complete(SupplyNumbers.sendNumbers());
                    } catch (Exception ex) {
                        f.completeExceptionally(ex);
                    }
                });
        return f;
    }
}

class SupplyNumbers {
    public static Integer sendNumbers() throws Exception {
        throw new Exception("test"); // just for working sake its not  correct.
    }
}

输出:

Simple wrap
After thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test
Before thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test
Join threw java.util.concurrent.CompletionException: java.lang.Exception: test
Get threw java.util.concurrent.ExecutionException: java.lang.Exception: test
Custom wrap
After thenApply(): java.util.concurrent.CompletionException: java.lang.Exception: test
Before thenApply(): java.lang.Exception: test
Join threw java.util.concurrent.CompletionException: java.lang.Exception: test
Get threw java.util.concurrent.ExecutionException: java.lang.Exception: test

如您所见,唯一的区别在于,在“customWrap()”情况下,whenComplete()会在thenApply()之前看到原始异常。在thenApply()之后以及所有其他情况中,原始异常都被包装。
最令人惊讶的是,在“Simple wrap”情况下,get()将取消包装CompletionException,并用ExecutionException替换它。

在这里,抛出CompletionException包装是关键,这使得一切更接近“Future<?>精神”... - Matthieu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接