如何使ThreadPoolExecutor命令在需要处理大量数据时等待?

14

我正在从队列服务器获取数据,需要处理并发送确认。大致如下:

while (true) {
    queueserver.get.data
    ThreadPoolExecutor //send data to thread
    queueserver.acknowledgement 

我不完全了解线程的运作,但是我认为这个程序获取数据后,将其发送到线程中并立即确认接收。因此,即使每个队列仅能有200个未确认的项目,它也会尽可能快地拉取数据。当我在单台服务器上编写程序时,这很好,但如果我使用多个工作者,则会出现问题,因为线程队列中的项目数量不是它已完成的工作的反映,而是从队列服务器获取数据的速度。

是否有任何方法可以使程序在线程队列中满载工作时等待一段时间?

4个回答

27

如果线程池需要处理的数据太多,我该如何让ThreadPoolExecutor等待呢?

你可以使用一个有限制的 BlockingQueue,而不是一个开放式队列:

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);

关于提交给ExecutorService的作业,可以创建自己的ExecutorService而不是使用使用无界队列的Executors创建的默认ExecutorService:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
              new ArrayBlockingQueue<Runnable>(200));

当队列填满时,它将拒绝任何新提交的任务。您需要设置一个RejectedExecutionHandler将其提交到队列。例如:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
           0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
      // check afterwards and throw if pool shutdown
      if (executor.isShutdown()) {
         throw new RejectedExecutionException(
              "Task " + r + " rejected from " + e);
      }
   }
});

我认为Java没有ThreadPoolExecutor.CallerBlocksPolicy是一个重大的缺失。


谢谢Gray。只是想澄清一下,这是否意味着如果我将200个项目放入队列中,它不会再允许更多的项目进入,直到该队列中的工作减少?如果是这样的话,那么它会在我用于提交到线程的命令处等待,直到有空间发送到阻塞队列中? - Lostsoul
如果您在 BlockingQueue @learningJava 上使用 put(),那么是的,如果队列已满,则会阻塞,直到队列大小减小后才会继续。 - Gray
另一个选项是让RejectedExecutionHandler运行任务。即 r.run(); - Peter Lawrey
1
同意@Peter,虽然我很少使用“CallerRunPolicy”,因为如果你有100个生产者,你将严重超过并发运行线程的数量。 - Gray

3
如果您想在工作线程开始执行任务时得到确认,可以创建一个自定义的ThreadFactory,在实际工作之前从线程发送确认。或者,您可以重写ThreadPoolExecutorbeforeExecute方法。
如果您想在有新的可用工作线程时得到确认,我认为您可以使用SynchronousQueueThreadPoolExecutor.CallerRunsPolicy初始化一个ThreadPoolExecutor,或者使用自己的策略来阻塞调用方。

0
首先,我认为你的态度是错误的,因为你在伪代码中所做的是忙等待,你应该阅读Java教程http://docs.oracle.com/javase/tutorial/essential/concurrency/中的并发教程。
除此之外,我会提供一个使用忙等待的解决方案(这不是推荐的方法):
     ExecutorService e1 =  Executors.newFixedThreadPool(20);
     while (true) {
          if (!serverq.isEmpty() && !myq.isFull()) myq.enq(serverq.poll());
          if (!myq.isEmpty()) e1.execute(myq.poll());
     }

注意:

1.确保您的myq已同步,如其他答案所述。您可以扩展一些阻塞队列以确保同步正确。

2.您需要实现一个可运行的类,该类在服务的迭代中执行您期望从服务器执行的操作,这些可运行的类必须将myq作为构造函数的参数,并将其保存为全局变量。

3.myq获取可运行的类,在其run方法的末尾,您必须确保可运行的类从myq中删除自身


OP的while(true)并没有忙等待。而你的while(true)正在尝试忙等待,却没有解决问题,即执行器的默认队列是无界的,因此它将继续独立地接受新作业,而不管有多少作业已经完成。 - trutheality
myp 应该是被绑定的人,你没有注意到代码和注释中写的内容。 - Ofek Ron

0

怎么样创建一个blockingPool,它最多只执行200个任务,并在提交201个任务之前等待一个任务完成。我在我的应用程序中使用信号量实现了这一点。您还可以通过将值传递给其构造函数来更改限制。

与@Gray的答案唯一的区别是,在这种情况下很少有任务会被拒绝。信号量将使任何201个任务等待,除非其他任务已经完成。尽管如此,我们有拒绝处理程序,在任何拒绝的情况下重新提交该任务到执行器。

private class BlockingPool extends ThreadPoolExecutor {
    private final Semaphore semaphore;      
    public BlockingPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int tasksAllowedInThreads){    
        super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                executor.execute(r);
            }
        });
        semaphore = new Semaphore(tasksAllowedInThreads);
    }

    @Override
    public void execute(Runnable task){
        boolean acquired = false;
        do{
            try{
                semaphore.acquire();
                acquired = true;
            } catch (final InterruptedException e){
                // log
            }
        } while (!acquired); // run in loop to handle InterruptedException
        try{
            super.execute(task);
        } catch (final RejectedExecutionException e){
            System.out.println("Task Rejected");
            semaphore.release();
            throw e;
        }
    }    

    @Override
    protected void afterExecute(Runnable r, Throwable t){
        super.afterExecute(r, t);
        if (t != null){
            t.printStackTrace();
        }
        semaphore.release();
    }
}

这有意义吗!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接