线程池线程阻塞问题:当有足够的任务提交时,线程池线程会被阻塞。

8
我有一个需要并行计算许多小任务并按照任务的自然顺序处理结果的流程。为此,我有以下设置:
一个简单的ExecutorService,和一个我将用于保存Callable提交到执行程序时返回的Future对象的阻塞队列:
ExecutorService exec = Executors.newFixedThreadPool(15);
LinkedBlockingQueue<Future<MyTask>> futures = new LinkedBlockingQueue<Future<MyTask>>(15 * 64);

以下是一些调试代码,用于定期计算提交的任务数和处理过的任务数,并将它们写出来(注意,processed 是在任务代码本身结束时递增的):

AtomicLong processed = new AtomicLong(0);
AtomicLong submitted = new AtomicLong(0);

Timer statusTimer = new Timer();
statusTimer.schedule(new TimerTask() {
      @Override
      public void run() {
        l.info("Futures: " + futures.size() + "; Submitted: " + submitted.get() + "; Processed: " + processed.get() + "; Diff: " + (submitted.get() - processed.get())));
      }             
}, 60 * 1000, 60 * 1000);

一个线程从队列中获取任务(其实是一个生成器),将它们提交给执行者,并将结果Future放入futures队列中(这样我就能确保不会提交太多任务而导致内存耗尽):

Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.poll()) != null) {
            futures.put(exec.submit(task));
            submitted.incrementAndGet();
        }
    } catch (Exception e) {l .error("Unexpected Exception", e);}
}, "SubmitTasks");
submitThread.start();

当前线程使用take方法从futures队列中取出已完成的任务并处理结果:
while (!futures.isEmpty() || submitThread.isAlive()) {
    MyTask task = futures.take().get();
    //process result
}

当我在一个拥有8个核心的服务器上运行此代码(注意,代码当前使用了15个线程),CPU利用率仅达到峰值约为60%。我会看到这样的调试输出:

INFO : Futures: 960; Submitted: 1709710114; Processed: 1709709167; Diff: 947
INFO : Futures: 945; Submitted: 1717159751; Processed: 1717158862; Diff: 889
INFO : Futures: 868; Submitted: 1724597808; Processed: 1724596954; Diff: 853
INFO : Futures: 940; Submitted: 1732030120; Processed: 1732029252; Diff: 871
INFO : Futures: 960; Submitted: 1739538576; Processed: 1739537758; Diff: 818
INFO : Futures: 960; Submitted: 1746965761; Processed: 1746964811; Diff: 950

线程转储显示,许多线程池线程像这样阻塞:

"pool-1-thread-14" #30 prio=5 os_prio=0 tid=0x00007f25c802c800 nid=0x10b2 waiting on condition [0x00007f26151d5000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00007f2fbb0001b0> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:897)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
        at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:439)
        at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

我的理解是,任何时候我都至少有几百个任务已经提交给执行器服务,但没有被处理(通过堆栈跟踪我也可以确认SubmitTasks线程在LinkedBlockingQueue.put上被阻塞)。然而,堆栈跟踪(以及服务器利用率统计)显示执行器服务在LinkedBlockingQueue.take上被阻塞(我假设这是内部任务队列为空的情况)。

我错在哪里了?

2个回答

1
2.5年后,我看到这个问题已经得到了一些浏览量,决定提供一个跟进。经过多次更改和测试,我最终将任务分组为每组10000个(也就是说,每个Future负责一组10000个MyTask任务,而不仅仅是1个)。这样,ExecutorService每秒执行大约10-20个任务(而不是我“要求”它执行的相当高的100000-200000个任务)。这种方法显著提高了速度,并导致CPU利用率达到了100%。事后看来,执行超过100k个任务每秒似乎“不合理”。我的理解是,太多的时间花费在并发管理/锁定开销和上下文切换上(这是一个猜测)。

0
涉及BlockingQueue的线程总是棘手的。仅仅通过查看您的代码而没有运行与您所做的规模,我有一些建议。像Jessica Kerr这样的行业专家建议您永远不要永久阻塞。您可以使用带有超时的方法在LinkedBlockingQueue中进行操作。
Thread submitThread = new Thread(() ->
{
    MyTask task;
    try {
        while ((task = taskQueue.peek()) != null) {
            boolean success = futures.offer(exec.submit(task), 1000, TimeUnit.MILLISECONDS);
            if(success) {
                submitted.incrementAndGet();
                taskQueue.remove(task);
            }
        }
    } catch (Exception e) {l .error("Unexpected Exception", e);}
}, "SubmitTasks");
submitThread.start();

还有这里。

while (!futures.isEmpty() || submitThread.isAlive()) {
    Future<MyTask> f = futures.poll(1000, TimeUnit.MILLISECONDS);
    if(f != null) {
        MyTask task = f.get();
    }
    //process result
}

观看Jessica Kerr关于JVM并发工具的视频


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接