Java 8:Lambda-Streams,通过带有异常的方法进行过滤

209

我在尝试使用Java 8的Lambda表达式时遇到了问题。通常情况下都能正常工作,但现在我有一些方法会抛出IOException异常。请看下面的代码:

class Bank{
    ....
    public Set<String> getActiveAccountNumbers() throws IOException {
        Stream<Account> s =  accounts.values().stream();
        s = s.filter(a -> a.isActive());
        Stream<String> ss = s.map(a -> a.getNumber());
        return ss.collect(Collectors.toSet());
    }
    ....
}

interface Account{
    ....
    boolean isActive() throws IOException;
    String getNumber() throws IOException;
    ....
}

问题在于,它无法编译,因为我必须捕获isActive和getNumber方法可能出现的异常。但是,即使我明确使用try-catch块如下所示,它仍然无法编译,因为我没有捕获异常。因此,要么JDK存在错误,要么我不知道如何捕获这些异常。
class Bank{
    ....
    //Doesn't compile either
    public Set<String> getActiveAccountNumbers() throws IOException {
        try{
            Stream<Account> s =  accounts.values().stream();
            s = s.filter(a -> a.isActive());
            Stream<String> ss = s.map(a -> a.getNumber());
            return ss.collect(Collectors.toSet());
        }catch(IOException ex){
        }
    }
    ....
}

我该如何让它工作?有人能给我指出正确的解决方案吗?


1
相关:https://dev59.com/1mMl5IYBdhLWcg3wsIuA - Vadzim
1
相关内容:https://dev59.com/-o3da4cB1Zd3GeqPzmpI#31638189 - Marko Topolnik
5
简明扼要的回答:在lambda内部捕获异常。 - Brian Goetz
我不确定,但如果“Account”对象在像“Bank”这样的应用程序中是域对象,那么为什么我们要在域方法中抛出“IoException”异常呢?在理想情况下,我们会有一些服务,在那里我们可能会获取账户,然后该域的活动和其他状态已经存在,我们可以只对其进行过滤或操作。 - Kunal Varpe
15个回答

238

在异常逃离lambda之前,必须捕获异常:

s = s.filter(a -> {
    try {
        return a.isActive();
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
});

考虑到Lambda表达式的求值并不是在你编写它的位置,而是在JDK类的某个完全无关的位置。因此,在抛出该已检查异常的位置,该异常并没有被声明。
您可以通过使用一个Lambda表达式的包装器来处理它,将已检查异常转换为未检查异常:
public static <T> T uncheckCall(Callable<T> callable) {
    try {
        return callable.call();
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

你的例子应该写成:

您的示例将被写成

return s.filter(a -> uncheckCall(a::isActive))
        .map(Account::getNumber)
        .collect(toSet());

在我的项目中,我处理这个问题时不使用包装;相反,我使用一种方法有效地消除了编译器对异常的检查。不用说,这应该小心处理,项目中的每个人都必须知道一个已检查的异常可能出现在未声明的位置。这是管道代码:

public static <T> T uncheckCall(Callable<T> callable) {
    try {
        return callable.call();
    } catch (Exception e) {
        sneakyThrow(e);
        return null; // Unreachable but needed to satisfy compiler
    }
}

public static void uncheckRun(RunnableExc r) {
    try {
        r.run();
    } catch (Exception e) {
        sneakyThrow(e);
    }
}

public interface RunnableExc {
    void run() throws Exception;
}

@SuppressWarnings("unchecked")
private static <T extends Throwable> void sneakyThrow(Throwable t) throws T {
    throw (T) t;
}

如果你使用collect方法,即使该方法没有声明,你也有可能遇到抛出IOException的情况。在大多数实际情况下,你只需要重新抛出异常并将其视为通用失败。在所有这些情况下,清晰度和正确性都不会受到影响。但要注意那些你实际上想要立即处理异常的情况。编译器不会向开发人员报告需要捕获的IOException,如果你尝试捕获它,编译器实际上会抱怨,因为我们欺骗了它,让它相信不可能抛出这样的异常。


5
我曾经见过NettyIO使用“sneaky throw”,它让我想把椅子扔出窗外。“什么?那个受检异常是从哪里泄漏的?”这是我看到的第一个合法的“sneaky throw”用例。作为程序员,你必须警惕地指示可能会出现“sneaky throw”。也许更好的方法是创建另一个支持受检异常的流接口/实现? - kevinarpe
9
任何明智的API都不应该向客户端传递未声明的已检查异常,这是肯定的。在API内部,虽然可以理解检查异常可能会泄漏。只要它们只是一般失败的另一个迹象,并且即将被整体捕获和处理,它们就不会造成伤害。 - Marko Topolnik
5
这正是为什么偷懒的编程方式是个坏主意的原因。篡改编译器的运作规则必然会使未来的维护者感到困惑。 - Thorbjørn Ravn Andersen
36
即使你不喜欢规定,也不代表采取法律行动是一个好主意。你的建议是不负责任的,因为它把代码编写者的方便放在透明度和可维护性等更重要的考虑之上。 - Brian Goetz
46
@brian,仅仅因为某个规则存在并不意味着这是一个好主意。但我很惊讶你把我的回答第二部分称作“建议”,因为我认为我已经非常明显地表明了我提出的解决方案以及我所提供的有关内容仅供读者参考,并且充分注明了免责声明。 - Marko Topolnik
显示剩余17条评论

34

你也可以使用Lambda表达式传播静态疼痛,这样整个过程看起来更易读:

s.filter(a -> propagate(a::isActive))

propagate在这里接收java.util.concurrent.Callable作为参数,并将调用过程中捕获的任何异常转换为RuntimeException。 Guava中还有一个类似的转换方法Throwables#propagate(Throwable)

这种方法似乎对于lambda方法链接非常重要,因此我希望有一天它会被添加到流行库之一中,或者此传播行为将是默认的。

public class PropagateExceptionsSample {
    // a simplified version of Throwables#propagate
    public static RuntimeException runtime(Throwable e) {
        if (e instanceof RuntimeException) {
            return (RuntimeException)e;
        }

        return new RuntimeException(e);
    }

    // this is a new one, n/a in public libs
    // Callable just suits as a functional interface in JDK throwing Exception 
    public static <V> V propagate(Callable<V> callable){
        try {
            return callable.call();
        } catch (Exception e) {
            throw runtime(e);
        }
    }

    public static void main(String[] args) {
        class Account{
            String name;    
            Account(String name) { this.name = name;}

            public boolean isActive() throws IOException {
                return name.startsWith("a");
            }
        }


        List<Account> accounts = new ArrayList<>(Arrays.asList(new Account("andrey"), new Account("angela"), new Account("pamela")));

        Stream<Account> s = accounts.stream();

        s
          .filter(a -> propagate(a::isActive))
          .map(a -> a.name)
          .forEach(System.out::println);
    }
}

此外,如果您选择使用Guava中的propagate方法,则需要注意它已被弃用。请改为使用throwIfUnchecked方法。 - yaromir
1
Guava废弃propagate的原因是充分考虑了:https://github.com/google/guava/wiki/Why-we-deprecated-Throwables.propagate - kane

26

这个UtilException帮助类允许您在Java流中使用任何已检查的异常,例如:

Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
      .map(rethrowFunction(Class::forName))
      .collect(Collectors.toList());

注意:Class::forName 抛出 ClassNotFoundException,这是一个已检查的异常。流本身也会抛出 ClassNotFoundException,而不是一些包装未经检查的异常。

public final class UtilException {

@FunctionalInterface
public interface Consumer_WithExceptions<T, E extends Exception> {
    void accept(T t) throws E;
    }

@FunctionalInterface
public interface BiConsumer_WithExceptions<T, U, E extends Exception> {
    void accept(T t, U u) throws E;
    }

@FunctionalInterface
public interface Function_WithExceptions<T, R, E extends Exception> {
    R apply(T t) throws E;
    }

@FunctionalInterface
public interface Supplier_WithExceptions<T, E extends Exception> {
    T get() throws E;
    }

@FunctionalInterface
public interface Runnable_WithExceptions<E extends Exception> {
    void run() throws E;
    }

/** .forEach(rethrowConsumer(name -> System.out.println(Class.forName(name)))); or .forEach(rethrowConsumer(ClassNameUtil::println)); */
public static <T, E extends Exception> Consumer<T> rethrowConsumer(Consumer_WithExceptions<T, E> consumer) throws E {
    return t -> {
        try { consumer.accept(t); }
        catch (Exception exception) { throwAsUnchecked(exception); }
        };
    }

public static <T, U, E extends Exception> BiConsumer<T, U> rethrowBiConsumer(BiConsumer_WithExceptions<T, U, E> biConsumer) throws E {
    return (t, u) -> {
        try { biConsumer.accept(t, u); }
        catch (Exception exception) { throwAsUnchecked(exception); }
        };
    }

/** .map(rethrowFunction(name -> Class.forName(name))) or .map(rethrowFunction(Class::forName)) */
public static <T, R, E extends Exception> Function<T, R> rethrowFunction(Function_WithExceptions<T, R, E> function) throws E {
    return t -> {
        try { return function.apply(t); }
        catch (Exception exception) { throwAsUnchecked(exception); return null; }
        };
    }

/** rethrowSupplier(() -> new StringJoiner(new String(new byte[]{77, 97, 114, 107}, "UTF-8"))), */
public static <T, E extends Exception> Supplier<T> rethrowSupplier(Supplier_WithExceptions<T, E> function) throws E {
    return () -> {
        try { return function.get(); }
        catch (Exception exception) { throwAsUnchecked(exception); return null; }
        };
    }

/** uncheck(() -> Class.forName("xxx")); */
public static void uncheck(Runnable_WithExceptions t)
    {
    try { t.run(); }
    catch (Exception exception) { throwAsUnchecked(exception); }
    }

/** uncheck(() -> Class.forName("xxx")); */
public static <R, E extends Exception> R uncheck(Supplier_WithExceptions<R, E> supplier)
    {
    try { return supplier.get(); }
    catch (Exception exception) { throwAsUnchecked(exception); return null; }
    }

/** uncheck(Class::forName, "xxx"); */
public static <T, R, E extends Exception> R uncheck(Function_WithExceptions<T, R, E> function, T t) {
    try { return function.apply(t); }
    catch (Exception exception) { throwAsUnchecked(exception); return null; }
    }

@SuppressWarnings ("unchecked")
private static <E extends Throwable> void throwAsUnchecked(Exception exception) throws E { throw (E)exception; }

}

许多其他使用它的示例(在静态导入UtilException之后):
@Test
public void test_Consumer_with_checked_exceptions() throws IllegalAccessException {
    Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
          .forEach(rethrowConsumer(className -> System.out.println(Class.forName(className))));

    Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
          .forEach(rethrowConsumer(System.out::println));
    }

@Test
public void test_Function_with_checked_exceptions() throws ClassNotFoundException {
    List<Class> classes1
          = Stream.of("Object", "Integer", "String")
                  .map(rethrowFunction(className -> Class.forName("java.lang." + className)))
                  .collect(Collectors.toList());

    List<Class> classes2
          = Stream.of("java.lang.Object", "java.lang.Integer", "java.lang.String")
                  .map(rethrowFunction(Class::forName))
                  .collect(Collectors.toList());
    }

@Test
public void test_Supplier_with_checked_exceptions() throws ClassNotFoundException {
    Collector.of(
          rethrowSupplier(() -> new StringJoiner(new String(new byte[]{77, 97, 114, 107}, "UTF-8"))),
          StringJoiner::add, StringJoiner::merge, StringJoiner::toString);
    }

@Test    
public void test_uncheck_exception_thrown_by_method() {
    Class clazz1 = uncheck(() -> Class.forName("java.lang.String"));

    Class clazz2 = uncheck(Class::forName, "java.lang.String");
    }

@Test (expected = ClassNotFoundException.class)
public void test_if_correct_exception_is_still_thrown_by_method() {
    Class clazz3 = uncheck(Class::forName, "INVALID");
    }

但在使用之前,必须了解以下优点、缺点和限制:
• 如果调用代码要处理已检查的异常,则必须将其添加到包含流的方法的throws子句中。编译器不再强制你添加它,所以很容易忘记它。
• 如果调用代码已经处理了已检查的异常,则编译器将提醒你将throws子句添加到包含流的方法声明中(如果不这样做,它会说:Exception is never thrown in body of corresponding try statement)。
• 无论如何,你都无法在包含流的方法内部捕获已检查的异常(如果尝试这样做,编译器将说:Exception is never thrown in body of corresponding try statement)。
• 如果你调用的方法永远不会抛出它声明的异常,则不应该包括throws子句。例如:new String(byteArr, "UTF-8") throws UnsupportedEncodingException,但是根据Java规范,UTF-8始终存在。在这种情况下,throws声明是一个麻烦,并且任何能够以最少的样板代码使其静音的解决方案都受到欢迎。
• 如果你讨厌已检查的异常并认为它们根本不应该被添加到Java语言中(越来越多的人持有这种观点,而我不是其中之一),那么就不要将已检查的异常添加到包含流的方法的throws子句中。然后,已检查的异常将像未检查的异常一样行事。
• 如果你正在实现一个严格的接口,在这个接口中你没有添加throws声明的选项,但是抛出异常是完全适当的,那么仅为了获得抛出它的特权而包装异常会导致带有虚假异常的堆栈跟踪,这些异常对于实际发生的情况没有任何信息贡献。一个很好的例子是Runnable.run(),它不会抛出任何已检查的异常。在这种情况下,你可以决定不将已检查的异常添加到包含流的方法的throws子句中。
• 无论如何,如果你决定不将(或忘记将)已检查的异常添加到包含流的方法的throws子句中,请注意抛出已检查的异常的这两个后果:
1) 调用代码将无法按名称捕获它(如果尝试这样做,编译器将说:Exception is never thrown in body of corresponding try statement)。它将冒泡并可能被某个“catch Exception”或“catch Throwable”在主程序循环中捕获,这也许正是你想要的。
2) 它违反了最小惊奇原则:仅捕获RuntimeException将不再足以保证捕获所有可能的异常。出于这个原因,我认为这不应该在框架代码中完成,而只能在完全由你控制的业务代码中完成。
总之:我认为这里的限制并不严重,可以毫不担心地使用UtilException类。但是,这取决于你!

13
您可以通过包装 lambda 表达式来抛出未检查的异常,然后在终端操作时解除该未检查的异常 ,从而潜在地滚动自己的 Stream 变量。
@FunctionalInterface
public interface ThrowingPredicate<T, X extends Throwable> {
    public boolean test(T t) throws X;
}

@FunctionalInterface
public interface ThrowingFunction<T, R, X extends Throwable> {
    public R apply(T t) throws X;
}

@FunctionalInterface
public interface ThrowingSupplier<R, X extends Throwable> {
    public R get() throws X;
}

public interface ThrowingStream<T, X extends Throwable> {
    public ThrowingStream<T, X> filter(
            ThrowingPredicate<? super T, ? extends X> predicate);

    public <R> ThrowingStream<T, R> map(
            ThrowingFunction<? super T, ? extends R, ? extends X> mapper);

    public <A, R> R collect(Collector<? super T, A, R> collector) throws X;

    // etc
}

class StreamAdapter<T, X extends Throwable> implements ThrowingStream<T, X> {
    private static class AdapterException extends RuntimeException {
        public AdapterException(Throwable cause) {
            super(cause);
        }
    }

    private final Stream<T> delegate;
    private final Class<X> x;

    StreamAdapter(Stream<T> delegate, Class<X> x) {
        this.delegate = delegate;
        this.x = x;
    }

    private <R> R maskException(ThrowingSupplier<R, X> method) {
        try {
            return method.get();
        } catch (Throwable t) {
            if (x.isInstance(t)) {
                throw new AdapterException(t);
            } else {
                throw t;
            }
        }
    }

    @Override
    public ThrowingStream<T, X> filter(ThrowingPredicate<T, X> predicate) {
        return new StreamAdapter<>(
                delegate.filter(t -> maskException(() -> predicate.test(t))), x);
    }

    @Override
    public <R> ThrowingStream<R, X> map(ThrowingFunction<T, R, X> mapper) {
        return new StreamAdapter<>(
                delegate.map(t -> maskException(() -> mapper.apply(t))), x);
    }

    private <R> R unmaskException(Supplier<R> method) throws X {
        try {
            return method.get();
        } catch (AdapterException e) {
            throw x.cast(e.getCause());
        }
    }

    @Override
    public <A, R> R collect(Collector<T, A, R> collector) throws X {
        return unmaskException(() -> delegate.collect(collector));
    }
}

那么,您可以像使用Stream一样使用它:

Stream<Account> s = accounts.values().stream();
ThrowingStream<Account, IOException> ts = new StreamAdapter<>(s, IOException.class);
return ts.filter(Account::isActive).map(Account::getNumber).collect(toSet());

这个解决方案需要相当多的样板代码,所以我建议您查看我已经制作的,该库完全实现了我在这里描述的Stream类的功能(以及更多!)。


你好...小bug?new StreamBridge<>(ts, IOException.class); -> new StreamBridge<>(s, IOException.class); - kevinarpe
1
@kevinarpe 是的。它也应该是 StreamAdapter - Jeffrey

6

TLDR: 尝试通过重构代码来避免问题:将“易出错”的操作与“安全”的操作分开,并在lambda表达式中只使用安全的操作。


详情:

这并不是直接回答问题的方法(有许多其他答案可以),而是试图在第一时间避免问题:

根据我的经验,需要在Stream(或其他lambda表达式)中处理异常的原因通常是因为声明从不应抛出异常的方法抛出了异常。这通常来自于将业务逻辑与输入输出混合在一起。你的Account接口就是一个完美的例子:

interface Account {
    boolean isActive() throws IOException;
    String getNumber() throws IOException;
}

不要在每个getter上抛出IOException,考虑使用以下设计:

interface AccountReader {
    Account readAccount(…) throws IOException;
}

interface Account {
    boolean isActive();
    String getNumber();
}

方法AccountReader.readAccount(…)可以从数据库或文件中读取账户信息,如果读取不成功会抛出异常。它构造了一个已经包含所有值的Account对象,可以直接使用。由于值已经被readAccount(…)加载,因此getter方法不会抛出异常。这样,在Lambda表达式中就可以自由地使用它们,而不需要包装、屏蔽或隐藏异常。
请注意,您仍然需要处理readAccount(…)抛出的异常。毕竟,这就是异常存在的原因。但假设readAccount(…)在“其他地方”使用,即在Lambda表达式之外,在那里您可以使用Java提供的“正常”异常处理机制,即使用try-catch处理它或者使用throws让它“冒泡”。
当然,并不总是可能按照我描述的方式来做,但通常可以,这会带来更清晰的代码(依我之见)。
  • 更好的关注点分离和遵循单一职责原则
  • 更少的样板代码:您不必为了满足编译器而在代码中添加throws IOException
  • 错误处理:您可以在读取文件或数据库时处理错误,而不是仅因为要获取字段值而在业务逻辑中间处理错误
  • 您可以使Account不可变并从中获得优势(例如线程安全)
  • 您不需要使用“卑鄙的技巧”或解决方法来在lambda(例如在Stream中)中使用Account

那么你的意思是在readAccount()中处理数据库或文件异常并将其包装成IOException?但是IOException仍然是受检异常,如果调用者想要构造一个流,它必须被捕获。 - WesternGun
@WesternGun,如果你需要在“readAccount()”中包装异常或者只是让它“冒泡”而不包装,则取决于你自己的情况。你也可以在那里抛出SQLException或其他异常。重要的是,“只有”readAccount()”可能失败,而 Account中的方法则不会。是的,调用readAccount()的调用者需要以某种方式处理异常。但是,在流中“使用”Account`很容易,因为它从不抛出任何异常并且这正是问题的关键。 TLDR: 将“容易出错的”操作与“安全”的操作分开,并在lambda中仅使用安全操作。 - siegi
@WesternGun,请看一下我对答案的最后一次编辑 :-) - siegi

5

使用#propagate()方法。以下为非Guava实现示例,由Sam Beran的Java 8博客提供:

public class Throwables {
    public interface ExceptionWrapper<E> {
        E wrap(Exception e);
    }

    public static <T> T propagate(Callable<T> callable) throws RuntimeException {
        return propagate(callable, RuntimeException::new);
    }

    public static <T, E extends Throwable> T propagate(Callable<T> callable, ExceptionWrapper<E> wrapper) throws E {
        try {
            return callable.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw wrapper.wrap(e);
        }
    }
}

Java 8博客链接已失效。 - Spycho

4

扩展 @marcg 的解决方案,您可以在流中正常地抛出和捕获已检查的异常;也就是说,编译器会要求您像在流之外一样捕获/重新抛出异常!

@FunctionalInterface
public interface Predicate_WithExceptions<T, E extends Exception> {
    boolean test(T t) throws E;
}

/**
 * .filter(rethrowPredicate(t -> t.isActive()))
 */
public static <T, E extends Exception> Predicate<T> rethrowPredicate(Predicate_WithExceptions<T, E> predicate) throws E {
    return t -> {
        try {
            return predicate.test(t);
        } catch (Exception exception) {
            return throwActualException(exception);
        }
    };
}

@SuppressWarnings("unchecked")
private static <T, E extends Exception> T throwActualException(Exception exception) throws E {
    throw (E) exception;
}

那么,您的示例将如下编写(添加测试以更清晰地显示它):

@Test
public void testPredicate() throws MyTestException {
    List<String> nonEmptyStrings = Stream.of("ciao", "")
            .filter(rethrowPredicate(s -> notEmpty(s)))
            .collect(toList());
    assertEquals(1, nonEmptyStrings.size());
    assertEquals("ciao", nonEmptyStrings.get(0));
}

private class MyTestException extends Exception { }

private boolean notEmpty(String value) throws MyTestException {
    if(value==null) {
        throw new MyTestException();
    }
    return !value.isEmpty();
}

@Test
public void testPredicateRaisingException() throws MyTestException {
    try {
        Stream.of("ciao", null)
                .filter(rethrowPredicate(s -> notEmpty(s)))
                .collect(toList());
        fail();
    } catch (MyTestException e) {
        //OK
    }
}

这个例子无法编译。 - Roman M
嗨@RomanM,感谢您指出这个问题:我已经纠正了“throwActualException”方法中缺少的返回类型。我们正在生产中使用它,所以我希望它在您那边也能正常工作。 - PaoloC

4
以下简单的代码可以使用 StreamTryabacus-common 中解决:
Stream.of(accounts)
      .filter(a -> Try.call(a::isActive))
      .map(a -> Try.call(a::getNumber))
      .toSet();

或者:

Stream.of(accounts)
      .filterE(Account::isActive)
      .mapE(Account::getNumber)
      .toSet();

声明:我是abacus-common的开发者。


2
为了正确添加IOException(到RuntimeException)处理代码,您的方法将如下所示:
Stream<Account> s =  accounts.values().stream();

s = s.filter(a -> { try { return a.isActive(); } 
  catch (IOException e) { throw new RuntimeException(e); }});

Stream<String> ss = s.map(a -> { try { return a.getNumber() }
  catch (IOException e) { throw new RuntimeException(e); }});

return ss.collect(Collectors.toSet());

现在的问题是,必须将IOException捕获为RuntimeException并转换回IOException - 这将为以上方法添加更多代码。
当可以像这样完成操作且该方法抛出IOException时,为什么要使用Stream,因此也不需要额外的代码:
Set<String> set = new HashSet<>();
for(Account a: accounts.values()){
  if(a.isActive()){
     set.add(a.getNumber());
  } 
}
return set;

1
您的例子可以写成:

您的示例可以编写为:

import utils.stream.Unthrow;

class Bank{
   ....
   public Set<String> getActiveAccountNumbers() {
       return accounts.values().stream()
           .filter(a -> Unthrow.wrap(() -> a.isActive()))
           .map(a -> Unthrow.wrap(() -> a.getNumber()))
           .collect(Collectors.toSet());
   }
   ....
}

Unthrow类可以在这里找到https://github.com/SeregaLBN/StreamUnthrower


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接