以小的内存占用执行数百万个可运行对象

3
我有N个长整型ID,需要为每个ID执行一个Runnable(即我不关心返回值),并等待所有任务完成。每个Runnable可能需要几秒钟到几分钟的时间,并且可以安全地并行运行约100个线程。
在我们目前的解决方案中,我们使用Executors.newFixedThreadPool(),对每个ID调用submit(),然后在每个返回的Future上调用get()。
这段代码工作得很好,而且非常简单,因为我不必处理线程、复杂的等待逻辑等。它的缺点是内存占用。
所有仍在排队的Runnable占用内存(比long需要的8个字节多得多:这些是我的Java类,具有一些内部状态),所有N个Future实例也会占用内存(这些也是带有状态的Java类,我只用于等待但不需要实际结果)。我查看了堆转储,估计10百万时占用了略大于1 GiB的内存。如果将10百万个long存储在数组中,只需要76 MiB的内存。
是否有一种方法可以仅保留内存中的ID来解决此问题,最好不要使用低级并发编程?

你需要生成例如10k个可运行对象并将它们提交给执行器,一旦其中一半完成后再生成另外5k个,以此类推。 - ciamej
有没有一种方法,可以仅提交一个封装了您的Long参数的Runnable,并且让您的Runnable将该长整型转换为具有内部状态的自定义Java类?这将降低可运行队列的占用空间,并通过使它们中没有任何一个保留您的“内部状态”,从而降低Future的占用空间。一句话:将重量级状态的创建推迟到可运行内部。 - GPI
@GPI 这会有所帮助,但空对象的开销为16字节,仍然很大。 - fejesjoco
“我有N个长整型” - 是以文件的形式还是在服务器上? - Alexei Kaigorodov
4个回答

1

是的:您可以拥有一个共享的长整型队列。您可以向执行器提交 n 个Runnable,其中 n 是执行器中线程的数量,在run方法结束时,您从队列中获取下一个长整型,并重新提交一个新的Runnable


不错的开始,但是我仍然需要手动实现错误处理并且等待完成的代码仍然很棘手。 - fejesjoco
@fejesjoco 一点也不。 - Maurice Perry

1

不要创建数百万个可运行对象,而是创建一个特定的线程池,将长时间运行的任务作为其任务。

不要使用Future.get()等待任务完成,而是使用CountdownLatch。

可以像这样实现该线程池:

int N = 1000000;// number of tasks;
int T = 100; // number of threads;
CountdownLatch latch = new CountdownLatch(N);
ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<>();

for (int k=0; k<N; k++) {
   queue.put(createNumber(k));
}
for (int k=0; k<T; k++) {
  new WorkingThread().start();
}
CountdownLatch.await();

class WorkingThread extends Thread {
  public void run() {
      while (latch.getCount() != 0) {
           processNumber(queue.take());
           latch.countDown();
      }
  }
}

1
我认为你在获取工作单元时存在竞态条件。场景:N=1,T=1000,processNumber需要1分钟。1000个线程将看到latch.getCount()为1,但只会有1个任务可用,然后999个线程将无限期地等待在queue.take()内部。虽然我认为消费者/生产者是解决此问题的好方法。 - GPI
1
我们可以用检查队列是否为空然后退出来替换它。我认为我会采用这个解决方案,但是我会改变一下,仍然使用执行器来处理这些线程的错误。 - fejesjoco
@fejesjoco在评论中提到,堆内的每个“16字节空对象”所占用的10M个ID仍然存在相当大的开销。而在这个解决方案中,你仍然需要在等待队列中拥有这10M个对象... - GPI
@fejesjoco,您需要哪种错误处理方式?是在第一个任务出错时结束整个过程,统计错误数量,打印错误信息还是重试失败的任务? - Alexei Kaigorodov

1

使用ExecutorCompletionService怎么样?类似下面这样(可能包含错误,我没有测试):

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.function.LongFunction;

public class Foo {

  private final ExecutorCompletionService<Void> completionService;
  private final LongFunction<Runnable> taskCreator;
  private final long maxRunning; // max tasks running or queued

  public Foo(Executor executor, LongFunction<Runnable> taskCreator, long maxRunning) {
    this.completionService = new ExecutorCompletionService<>(executor);
    this.taskCreator = taskCreator;
    this.maxRunning = maxRunning;
  }

  public synchronized void processIds(long[] ids) throws InterruptedException {
    int completed = 0;

    int running = 0;
    for (long id : ids) {
      if (running < maxRunning) {
        completionService.submit(taskCreator.apply(id), null);
        running++;
      } else {
        completionService.take();
        running--;
        completed++;
      }
    }

    while (completed < ids.length) {
      completionService.take();
      completed++;
    }

  }

}

以上的另一种版本可以使用SemaphoreCountDownLatch,而不是CompletionService

public static void processIds(long[] ids, Executor executor,
                              int max, LongFunction<Runnable> taskSup) throws InterruptedException {
  CountDownLatch latch = new CountDownLatch(ids.length);
  Semaphore semaphore = new Semaphore(max);

  for (long id : ids) {
    semaphore.acquire();

    Runnable task = taskSup.apply(id);
    executor.execute(() -> {
      try {
        task.run();
      } finally {
        semaphore.release();
        latch.countDown();
      }
    });

  }

  latch.await();
}

这与普通的ExecutorService解决方案非常相似,只是该类收集Future的缓冲区而不是我们。因此,仅使用计数逻辑即可采用,而无需使用ExecutorCompletionService。但是,这个解决方案的两个部分都很有趣。 - fejesjoco
编辑答案以添加略微修改的选项。它基本上是相同的东西,但消除了阻塞队列(即ExecutorCompletionService)的需要。不确定您是否认为SemaphoreCountDownLatch是“低级”的,但我认为这并不过于复杂。 - Slaw

1
这通常可以使用生产者/消费者模式和一个BlockingQueue来协调,或者如果在项目中有的话,可以使用Akka actors。
但我想建议一些完全不同的东西,依赖于Java的Stream行为。
直觉是利用流的惰性执行来限制工作单元、futures及其结果的创建。
public static void main(String[] args) {
    // So we have a list of ids, I stream it
    // (note : if we have an iterator, you could group it by a batch of, say 100,
    // and then flat map each batch)
    LongStream ids = LongStream.range(0, 10_000_000L);
    // This is were the actual tasks will be dispatched
    ExecutorService executor = Executors.newFixedThreadPool(4);

    // For each id to compute, create a runnable, which I call "WorkUnit"
    Optional<Exception> error = ids.mapToObj(WorkUnit::new)
             // create a parralel stream
             // this allows the stream engine to launch the next instructions concurrently
            .parallel()
            // We dispatch ("parallely") the work units to a thread and have them execute
            .map(workUnit -> CompletableFuture.runAsync(workUnit, executor))
            // And then we wait for the unit of work to complete
            .map(future -> {
                try {
                    future.get();
                } catch (Exception e) {
                    // we do care about exceptions
                    return e;
                } finally {
                    System.out.println("Done with a work unit ");
                }
                // we do not care for the result
                return null;
            })
            // Keep exceptions on the stream
            .filter(Objects::nonNull)
            // Stop as soon as one is found
            .findFirst();


    executor.shutdown();
    System.out.println(error.isPresent());
}

说实话,我不确定这种行为是否被规范保证,但从我的经验来看它是有效的。每个并行“块”都会获取一些ID,并将其馈送到管道中(映射到工作单元,分派到线程池,等待结果,过滤异常),这意味着很快就会达到平衡,将活动工作单元的数量平衡到执行器的数量。
如果需要微调并行“块”的数量,应该在此处进行跟进:Java 8并行流中的自定义线程池

流来拯救!这似乎是最简单的解决方案。吞吐量只有一个问题。我使用100个线程创建执行器,但仍受到流使用的常见ForkJoinPool的限制。在12个CPU上,它只能达到64个线程的并行性。将默认池大小覆盖为100可以解决此问题。请参见https://dev59.com/fF4b5IYBdhLWcg3wiSPV#29272776。 - fejesjoco

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接