Java并行流关闭线程。

7
我写了一个使用Java流的方法,它简单地遍历对象列表并在满足某些条件时返回true/false。
Java方法:
 boolean method(SampleObj sampleObj) {

   List testList = invokeSomeMethod();
   int result = testList
            .parallelStream()
            .filter(listObj -> (listObj.getAttr() = 1))
            .count(listObj -> listObj.isAttr4());

   return (result > 10);

 }

我还为此编写了一个模拟测试用例。当我执行测试用例时,测试成功,但是我收到项目自定义错误提示,指出所有创建的线程都没有关闭。
我甚至尝试使用带资源的try-with-resources流,但这也没有帮助。
模拟测试:
@Test
public void testSomeMethod() {
    SampleObj sampleObj1 = new SampleObj(10, 20, 30, true);
    SampleObj sampleObj2 = new SampleObj(10, 20, 30, true);
    SampleObj sampleObj3 = new SampleObj(10, 20, 30, false);
    SampleObj sampleObjTest = new SampleObj(10, 20, 30, true);

    List<SampleObj> testList = new ArrayList<SampleObj>();
    testList.add(sampleObj1);
    testList.add(sampleObj2);
    testList.add(sampleObj3);

    when(mockedAttribute.invokeSomeMethod()).thenReturn(nodeList);

    ClassToBeTested classTest = createGenericMockRules();
    Assert.assertTrue(classTest.method(sampleObjTest));
}

附言:我已经进行了调试,确认在调用invokeSomeMethod()时,我的模拟测试列表被返回。
据我所知,Java流内部会关闭它创建的线程。我这样实现是错误的吗?

你在模拟什么?你能展示一下你的测试用例吗? - buræquete
添加了代码片段。 - alwaysAStudent
你尝试过显式关闭你创建的线程吗? - Jay Harris
2
@learningMyWayThru,你能展示一下你实际收到的错误吗? - Eugene
2
  1. 您正在使用原始类型“List”。没有元素类型,无法找到“getAttr()”或“isAttr4()”。
  2. listObj.getAttr() = 1 是一次无效的尝试赋值
  3. Stream.count() 没有参数。为什么您要发布那段代码,当它显然与真实代码没有关系?
- Holger
显示剩余4条评论
1个回答

8
Java流不创建线程,因此不会释放线程。它们在内部使用线程池;虽然未指定但众所周知,这是Fork/Join框架的公共池
使用线程池的整个意图是允许池管理线程,而不是为每个作业创建和释放线程。创建和销毁线程与成本相关,应在后续排队多个作业时避免。特别是如果没有现有线程来接受它,则线程的创建时间会增加作业的执行时间。换句话说,线程比作业活得更久是正常且预期的。它们正在等待可能到达的新作业。 ForkJoinPool类文档说明:

一个静态的 commonPool() 可用于大多数应用程序。通常情况下,未明确提交到指定池中的任何 ForkJoinTask 都将使用公共池。使用公共池通常会减少资源使用(其线程在非使用期间缓慢回收,并在随后的使用中重新启动)。

它没有指定线程必须闲置多长时间才能被回收,除了“缓慢”之外,因此甚至可能因实现而异。对于当前实现,甚至不可能以超时的方式来描述它,因为该池将缩小线程数而不是在超时后终止所有空闲线程,因此剩余线程将再次等待,直到池再次缩小大小,直到没有空闲线程为止。换句话说,池中的线程越多,最后一个线程被回收所需的时间就越长,当所有线程都处于闲置状态时。

您可以通过以下方式强制测试等待所有线程结束:

while(ForkJoinPool.commonPool().getPoolSize()>0)
    LockSupport.parkNanos(1000);

但是这样做会显著增加测试的执行时间,例如在具有八个核心/线程的情况下可能需要一分钟左右。更好的解决方案是重新考虑您的“项目自定义错误”检查,不应该将您的代码视为内部使用的池创建的线程的责任。
否则,在使用异步I/O等时也可能出现类似的错误。

绝对棒极了的答案! - Eugene

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接