Java并发:用少量线程执行许多“无限”任务

10

我正在构建一个(并发)模拟器,用于一组N个粒子根据牛顿定律在空间中移动。我的想法是将每个粒子建模为一个任务,该任务与其他粒子(任务)交互,以获取它们的位置和质量,以计算其所受的净力。每个粒子任务都是这样的:

while(true){
   force = thisParticle.calculateNetForce(allTheParticles);
   thisParticle.waitForAllTheParticlesToCalculateNetForce(); // synchronization
   thisParticle.updatePosition(force);
   thisParticle.waitForAllTheParticlesToUpdateTheirState(); // synchronization
}

我可以有很多粒子(100个或更多),因此无法创建如此数量的Java线程(这些线程映射到物理线程)。 我的想法是使用 Runtime.getRuntime().availableProcessors()+1 个线程,让许多任务在这些线程上执行。

然而,我不能使用FixedThreadExecutor,因为粒子任务不会结束。我想使用FixedThreadExecutor,它必须能够内部执行某种形式的调度。你知道有什么可以实现这个目的的方法吗?

或者,您能否建议我从并发性的角度来模拟这样一个系统的更好方法(例如,不同的任务分解)?

P.s.:我受限于“经典”的并发机制,不包括演员或类似的体系结构。


1
如果thisParticle.waitForAllTheParticlesToCalculateNetForce()有效地等待某些事情(通过实际等待或CountdownLatch/CyclicBarrier/Phaser等),则运行该方法的线程将返回到池中并可用于其他任务。不确定为什么您的FixedThreadPool方法不起作用。 - assylias
@RobertoCasadei await 是一个非阻塞调用:线程会变为空闲状态,直到所有参与方都调用了 await()。因此,除非在调用 await 时持有锁定,否则其他任务应该能够使用该空闲线程。也许值得发布一些你的代码。 - assylias
@RobertoCasadei,你发布的代码需要至少10个线程(以构造函数参数形式传递)才能正常工作。await将等待/阻塞,直到所有参与方都在等待。在你的情况下,只有两个线程会触发屏障并无限期地等待。一旦所有10个线程调用了await,那么方法的其余部分就会继续执行。 - John Vint
@JohnVint,这正是我想要展示的,即等待屏障不会导致当前任务被挂起并在释放的线程上执行另一个任务。我的问题是,我想要几个物理线程,但有很多长时间运行的任务;具有调度功能的Executor可以胜任此工作。 - metaphori
@RobertoCasadei 我明白你的意思,只是想澄清一下。通过分而治之的方式,使用fork join是Java为您处理这些任务的最佳方式(在我看来)。 - John Vint
显示剩余5条评论
7个回答

5

性能的最大杀手很可能是您执行的线程安全检查,以确保所有粒子以线程安全的方式进行交互。我建议您每个核心使用一个线程,并尽量减少线程之间的交互。这可以通过将空间划分为线程来完成,例如,将空间的一半划分为X轴,一半划分为Y轴,一半划分为Z轴,将空间分成8个部分。您可以同时独立地查看每个空间/线程中的所有交互,并且只有在一个粒子从一个空间/线程传递到另一个空间/线程时才需要担心。


不需要按位置分割粒子,最好是将它们最初分成相等大小的常量集合,每个集合由一个线程处理。事实上,最大的优化是从每个集合中汇总重力,用总质量的一粒子代替每个集合,位于重心处。 - Alexei Kaigorodov
@AlexeiKaigorodov 分裂粒子的原因是如果你想让它们发生碰撞。如果不需要,你可以按照你的建议将它们分开。 - Peter Lawrey
由于验证碰撞通常需要n^2次计算。将空间分成8个部分是一个好主意。然而,本质上你仍然有一个O(n^2)的问题。这个问题对于矩阵方法来说非常不利。 - rdllopes
@rdllopes 这是真的,然而常量要低得多,使它快8 - 80倍。(额外的10倍来自于不需要在访问周围进行锁定,并且在L1缓存中工作而非共享的L3缓存中) - Peter Lawrey
1
@peterlawrey,你的想法(或者使用像BSP这样的东西)特别适用于少量对象(几十个或者最多几百个)。当你有大量对象时,我应该再使用其他一些技巧。 10万个对象的模拟:http://www.youtube.com/watch?v=Qve54Z71VYU - rdllopes
显示剩余2条评论

3
我会假设你将所有粒子存储在一个二维数组中?这是分叉-合并框架的一个很好的应用场景。您可以将数组的计算部分拆分为较小的部分。您不断地进行拆分,直到到达某个特定的大小。最后您计算并返回结果。返回的值然后将与树的另一侧连接并计算。

2
与其为每个粒子创建一个线程,我会创建一个具有适当数量线程的ExecutorService。我将把粒子存储在列表(或其他类型的集合)中。我将为每个粒子的计算和更新步骤创建单独的要执行的工作块(作为Runnable或Callable)。当您向执行器提交工作块时,会返回Future。将这些Futures放入集合中。在您已经提交想要并行运行的所有工作块之后,您可以遍历Future列表,并对每个调用get()以实现同步步骤。
您可能最终会创建一个小POJO来关联一个粒子及其计算出的力(或将计算出的力存储在粒子实例中)。

这是我考虑过的一种方法。然而,从概念上看它似乎不太有效(因为直接将粒子与并发活动(绝非线程)进行映射是相当直接的),而且我注意到它的实现需要创建大量的对象和调用很多方法。 - metaphori
是的,您必须平衡可控线程数量、足够的并发以保持多个核心繁忙、垃圾回收开销等优势。根据我的经验,方法调用次数是性能问题的一个关注点。考虑到您的计算具有n平方的特性,并且“额外”对象的垃圾开销随着粒子数量的增加呈线性增长,这在这种情况下似乎是一个不错的折衷方案。 - Rob

1
为什么不将计算分步进行?
while(true){


for(Particle p : allParticles){
   force = p.calculateNetForce(allParticles);   
   p.setNextPosition(force); //Remembers, but doesn't change the current position
}

for(Particle p : allParticles){
    p.nextState(); //Change the position
}

}

首先计算每个粒子的力,但不改变其当前状态。在计算完每个粒子之后,根据之前的计算更新其内部状态。这样即使只有一个线程也足够了,当然你可以将计算分配到多个线程中,但需要额外的同步。
JAVA 8 更新
使用 Java 8,您可以利用多核系统,而不必担心线程、同步等问题。
 while(true){
       allParticles.parallelStream().forEach(p -> {
           double force = p.calculateNetForce(allParticles);
           p.setNextPosition(force)
       });

       allParticles.parallelStream().forEach(p ->   p.nextState());      
 }

实际上,我的问题源于需要识别可利用的并发性。 - metaphori
然后只需创建N个线程并在它们之间分配工作!将所有的Particle对象放入ConcurrentQueue中,对于每个线程: code while(true){ while(!queue.isEmpty()){ 1.获取一个particle 2.计算力 }3.更新状态//同步或交换队列 }` - Svetlin Zarev
我的想法是使用两个并发队列(CQ)和一个包含所有粒子对象(PO)和N线程的Set。初始时,第一个CQ(CQ1)也包含所有PO。然后每个线程开始从CQ1中取出PO,计算净力,并设置下一个状态(位置),而不修改当前PO的状态,然后将PO推入CQ2。当CQ1变为空时,最后访问它的线程设置一个标志,然后所有线程都从CQ2中取出PO并调用po.nextState(),然后将其推入CQ1。当CQ2变为空时,一切都从头再来... - Svetlin Zarev
@svetlin-zarev,你是对的。主要问题是如何将问题分解为小任务。使用OpenCL、Fork n Join等并不重要,因为实际上你需要一种聪明的方法来分割问题。在这种情况下,代理或者Actor模型是一个可怕的想法。 - rdllopes
自从我回答那个问题以来已经过了很长时间。现在在Java 8中有lambdasstream-API,非常容易利用多核系统。例如,他可以使用allParticles.paralellSream().forEach(p -> p.calculateNetForce(allTheParticles)),这将利用所有CPU,而他不需要手动处理线程、同步等。 - Svetlin Zarev
显示剩余2条评论

1
对于每个粒子,您调用calculateNetForce(allTheParticles),我想这使得您的计算与O(N^2)(所有粒子数量的平方)成比例。这是主要的性能杀手,您最好找到一个复杂度为O(N)的算法,然后再尝试并行化。我可以首先建议计算所有粒子的总质量和重心。然后,对于每个粒子,计算其余粒子的质量和中心。这可以通过将总质量和中心点添加一个带有负质量的“孔”来完成,而不是当前粒子。然后计算粒子与其余粒子之间的力。每个粒子的计算是独立的,并且可以使用其他评论者提出的任何方式进行并行化。

0

粒子本身应该是可运行和可调用的,这将允许您避免创建大量额外的对象并同步不同的步骤。以下是一个SSCCEE:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Particle implements Callable<Void> {

  private enum ParticleState {
    POSITION_UPDATED, FORCE_CALCULATED
  }

  private int id;
  private int calculatedForce;
  private ParticleState particleState = ParticleState.POSITION_UPDATED;
  private List<Particle> allTheParticles;

  public Particle(int id, List<Particle> allTheParticles) {
    this.id = id;
    this.allTheParticles = allTheParticles;
  }

  private void calculateNetForce() {
    System.out.println("calculation in " + id);
    String someIntenseOperation = "";
    for (int i = 0; i < 10000; i++) {
      someIntenseOperation += allTheParticles.size();
    }
    calculatedForce = 0;
    particleState = ParticleState.FORCE_CALCULATED;
  }

  private void updatePosition() {
    System.out.println("updating position of " + id);
    particleState = ParticleState.POSITION_UPDATED;
  }

  @Override
  public Void call() throws Exception {
    switch (particleState) {
      case FORCE_CALCULATED:
        updatePosition();
        break;
      case POSITION_UPDATED:
        calculateNetForce();
        break;
    }
    return null;
  }

  public static void main(String[] args) throws InterruptedException {
    final ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
    final List<Particle> allTheParticles = new ArrayList<>();
    for (int i = 0; i < 20; i++) {
      allTheParticles.add(new Particle(i, allTheParticles));
    }
    while (true) {
      executor.invokeAll(allTheParticles);
      executor.invokeAll(allTheParticles);
    }
  }
}

你如何确保所有粒子在状态上是同步的? - metaphori
executor.invokeAll 会为您完成这个任务。尝试运行这个例子。 - Yurii Shylov

0

由于验证碰撞通常需要n^2次计算,因此划分空间是一个好主意。尽管如此,它本质上是一个O(n^2)问题。

这个问题对于矩阵方法来说非常不利(但是请查看并行计算以了解处理它的最佳方法)您可以使用这里指出的一些技术:模拟许多粒子碰撞的有效方法?

请注意,使用Actor模型可能是一个坏主意,因为线程在某个数量后会有问题。

现在有Java OpenCL库(例如:Aparapi),而Java 9应该会带来原生的OpenCL和Sumatra项目。因此,您可以使用Fork和Join库,JVM将在幕后使用OpenCL。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接