使用parallelStream.forEach(..)时,在本地Java代码中出现了空指针异常。

9
我有以下异常(堆栈跟踪):
java.lang.NullPointerException
at sun.reflect.GeneratedConstructorAccessor171.newInstance(Unknown Source) ~[?:?]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_40]
at java.lang.reflect.Constructor.newInstance(Constructor.java:422) ~[?:1.8.0_40]
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598) ~[?:1.8.0_40]
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) ~[?:1.8.0_40]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) ~[?:1.8.0_40]
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) ~[?:1.8.0_40]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_40]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_40]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_40]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) ~[?:1.8.0_40]
at com.tradair.tnet.services.trades.TradeService.updateUnrealizedPNL(TradeService.java:173) ~[tnet.jar:5.1.1.0-SNAPSHOT]

下面是我的TradeService类的代码:

    public void updateUnrealizedPNL(Set<Org> orgsToCaluclate, Set<Org> orgsToSendUpdate) {
    orgsToCaluclate.parallelStream().forEach(o -> {
        pnlService.updateMidPrices(o);
        Collection<SystemTradeOrder> allLiveTradesByOrgId = tradesRepository.getAllLiveTradesByOrgId(o.getId());
        updateUnrealizedPNL(o, allLiveTradesByOrgId);
    });

    // more code ....

看起来异常是在运行forEach(..)方法时在java本地代码中抛出的。

我的意思是,NullPointerException并不是从我的代码 - 不是从作为forEach(..)方法参数的我的消费函数中抛出的。

我仔细检查了当此段代码运行时,orgsToCaluclate集合没有被修改。

这是orgsToCaluclate的初始化:

        Set<Org> orgsToCaluclate = getMarginOrgs();
        orgsToCaluclate = orgsToCaluclate.stream()
                .filter(org -> !isOrgInCloseout(org.getId())).collect(Collectors.toSet());

有什么想法吗?..

你尝试过检查集合是否为空、null或其他吗?此外,如果你没有至少6000个条目,请避免使用parallelStreams。 - Creart
是的,这个集合有11个条目,没有空值。 为什么你建议在小规模情况下不要使用parallelStream?我确实怀疑可能与parallelStream有关,但无法确定具体原因。 - theDima
1
它在(普通的,串行的)流中不会失败吗? - RealSkeptic
1
它并不总是会出现问题。也就是说,在我的机器上没有问题,但在QA机器上偶尔会出现,而不是总是出现。因此,将其移动到常规(串行)流中,问题就不会再出现,但我仍然无法在并行流中确定问题所在。 - theDima
3
如果在串行流中从未发生过这种情况,那么这告诉我您的代码中存在线程不安全的问题。这意味着它不会在单线程情况下失败 - 只有在多个线程运行时才会失败。您应该检查您的代码,并确保您使用的数据结构都是线程安全的,或者受到同步或锁的保护。 - RealSkeptic
当你有很多条目时,使用parallelStreams会变得更加有趣,因为它会创建新的线程等。尽管当你只有几个条目时,创建线程和整个过程相当繁重,会降低性能。 - Creart
2个回答

15

我们习惯于说异常的堆栈跟踪反映了“出现异常的位置”,但这是一个不精确的说法。异常的堆栈跟踪通常反映了其实例被创建的位置。

当我们有如下形式的代码时,

1   String s=null;
2   s.length();

当我们尝试对null进行解引用并调用length()方法时,JRE将创建一个NullPointerException实例,因此其堆栈跟踪将报告第2行。

然而,当我们有如下代码时:

1   String s=null;
2   if(s == null) {
3       RuntimeException rt=new NullPointerException();
4       throw rt;
5   }

堆栈跟踪将不会报告出现错误条件的位置(第2行)和异常抛出的位置(第4行),而是报告了实例创建的位置,即第3行。

对于大多数实际情况来说,这些位置足够接近,没有显著差异,但在这里,我们有一个非同寻常的情况。

正如tonakai指出的那样ForkJoinTask将通过反射创建一个已经遇到的异常的新实例,就像我们可以从源代码中看到的那样,当线程不匹配时。

当成功时,它的堆栈跟踪将精确反映新异常实例的创建位置,即在执行反射实例创建的一些生成代码中。当然,这种成功的创建不能区分当JRE由于执行相同代码时发生错误条件时创建异常的情况。

但是,当我们仔细查看源代码时,我们会发现整个反射创建被包含在一个中。

584             try {
…
604             } catch (Exception ignore) {
605             }

代码块。因此,如果操作确实失败了,就不会显示任何异常。相反,代码已经跌倒返回原始异常。这表明反射代码没有失败,而是我们看到通过getThrowableException()反射成功创建的NullPointerException实例被返回,并在稍后由ForkJoinTask故意抛出以报告在处理期间另一个线程中存在NullPointerException

但是,此代码将新异常的cause初始化为指向原始异常。例如,以下代码:

import java.util.stream.IntStream;

public class Main
{
    public static void main(String[] args) {
        Thread main=Thread.currentThread();
        IntStream.range(0, 1000).parallel().forEach(i -> {
            if(Thread.currentThread()!=main)
                throw new NullPointerException();
        });
    }
}

打印

Exception in thread "main" java.lang.NullPointerException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
    at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:189)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.IntPipeline.forEach(IntPipeline.java:404)
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:560)
    at Main.main(Main.java:7)
Caused by: java.lang.NullPointerException
    at Main.lambda$main$0(Main.java:9)
    at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

所以您仍然能够识别发生了什么。您只需要注意原因。由于您问题中的堆栈跟踪看起来不像典型的Throwable.printStackTrace()输出,可能是产生此输出的代码忽略了异常的cause属性。


作为补充,我们可以使用自定义异常类型来检查如果那个重现确实失败会发生什么:

import java.util.stream.IntStream;

public class Main
{
    public static class CustomException extends RuntimeException {
        public CustomException() {
            System.err.println("will deliberately fail");
            throw new NullPointerException();
        }
        private CustomException(String message) {
            super(message);
        }
    }
    public static void main(String[] args) {
        Thread main=Thread.currentThread();
        IntStream.range(0, 1000).parallel().forEach(i -> {
            if(Thread.currentThread()!=main)
                throw new CustomException("forced failure");
        });
    }
}

将会打印

will deliberately fail
Exception in thread "main" Main$CustomException: forced failure
    at Main.lambda$main$0(Main.java:18)
    at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

通过默认构造函数反射重建时抛出的NullPointerException未被报告,并且直接抛出来自其他线程的原始异常。


非常感谢,这确实是非常有帮助的。 - theDima

4

从堆栈跟踪看,似乎在您的代码中发生了异常,并且正在尝试获取该异常的新实例,但由于NullPointerException而无法执行此操作,您可以检查调用该foreach循环中的任何代码是否抛出任何异常,并确保所有这些异常的构造函数都是正确的。

    at java.lang.reflect.Constructor.newInstance(Constructor.java:422) ~[?:1.8.0_40]
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598) ~[?:1.8.0_40]
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) ~[?:1.8.0_40]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) ~[?:1.8.0_40]

3
不,Tonakai在这里是正确的。 查看堆栈跟踪(或ForkJoinTask的源代码)。 - Stefan Zobel
3
也许您可以提供更多的代码供我们查看,或者您的一些代码不是线程安全的,就像@RealSkeptic所提到的那样。 - tonakai
2
@DimmaLih 是的,你的 forEach() Consumer 中出现了一个异常。这个异常不一定与你的 Set 中的元素或 Set 的修改有关。你没有看到这个异常是因为 a) 你没有捕获和记录它,b) 你的 Consumer 代码在 ForkJoinTask 中运行。当 ForkJoinTask 确定遇到异常时,它会尝试报告该异常。为了提供准确的堆栈跟踪,如果异常不是由当前线程抛出的,则会构造一个相同类型的新异常。正是构造新异常时出现了 NPE。 - Stefan Zobel
4
@DimmaLih 在 ForkJoinTask.getThrowableException 的第 598 行中,出现了 Throwable wx = (Throwable)(noArgCtor.newInstance()) 这段代码失败的情况。noArgCtor 是一个非空的 java.lang.reflect.Constructor,代表你的 forEach() Consumer 代码抛出的异常的一个公共、无参构造函数。以某种方式实例化这个异常的新实例会导致 NPE,这就是你在堆栈跟踪中看到的情况。 - Stefan Zobel
2
@Stefan Zobel:我们确实看到了主要的异常,至少是它的类型。它是一个“NullPointerException”。关键点在于,这个答案是正确的,因为有一个反射重建异常,但它误解了结果。这个重建是成功的。请参见我的答案 - Holger
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接