生产者/消费者工作队列

11

我正在努力寻找最佳方法来实现我的处理管道。

我的生产者向BlockingQueue提供工作。在消费者方面,我轮询队列,在获取到的内容上包装一个Runnable任务,并将其提交给ExecutorService。

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

这并不是理想的做法;ExecutorService有它自己的队列,所以实际上我总是完全清空我的工作队列并填充任务队列,随着任务完成而缓慢地清空。

我意识到我可以在生产者端排队任务,但我真的不想这样做——我喜欢我的工作队列是愚蠢的字符串的间接性/隔离性;对于生产者来说,发生了什么事情并不重要。强制生产者排队Runnable或Callable会打破一种抽象,IMHO。

但是我确实希望共享的工作队列表示当前的处理状态。如果消费者没有跟上,我希望能够阻塞生产者。

我很想使用Executors,但我觉得我正在与它们的设计作斗争。我能不能部分地饮用这种“魔药”,还是必须要一口气喝完?我抵制排队任务是错误的吗?(我怀疑我可以设置ThreadPoolExecutor使用一个1任务队列,并覆盖它的执行方法以阻塞而不是在队列满时拒绝,但感觉很恶心)

有什么建议吗?


完全同意你的想法。感谢你把它写下来。有趣的是,没有一个库可以做到这一点。 - AlikElzin-kilaka
3个回答

22
我希望共享工作队列表示当前的处理状态。
尝试使用共享的 BlockingQueue 并拥有一个 Worker 线程池从队列中获取工作项。
如果消费者无法跟上,我希望能够阻止生产者。 ArrayBlockingQueueLinkedBlockingQueue 都支持有界队列,当队列满时会在 put 上阻塞。使用阻塞的 put() 方法可以确保生产者在队列已满时被阻塞。
这是一个初步的开始。您可以调整工作线程数和队列大小:
public class WorkerTest<T> {

    private final BlockingQueue<T> workQueue;
    private final ExecutorService service;

    public WorkerTest(int numWorkers, int workQueueSize) {
        workQueue = new LinkedBlockingQueue<T>(workQueueSize);
        service = Executors.newFixedThreadPool(numWorkers);

        for (int i=0; i < numWorkers; i++) {
            service.submit(new Worker<T>(workQueue));
        }
    }

    public void produce(T item) {
        try {
            workQueue.put(item);
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }


    private static class Worker<T> implements Runnable {
        private final BlockingQueue<T> workQueue;

        public Worker(BlockingQueue<T> workQueue) {
            this.workQueue = workQueue;
        }

        @Override
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    T item = workQueue.take();
                    // Process item
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }
}

1
谢谢;我的以前实现很像这个,尽管它只使用了一个ThreadFactory——一旦你将它减少到一组固定的线程,所有线程都尝试排空工作队列,再也没有使用ExecutorService 的必要性了。我切换到ExecutorService 是为了利用更可调整的线程池,并具有“查找现有可用工作线程(如果存在),必要时创建并在空闲时杀死它们”的语义。 - Jolly Roger
Executors.newCachedThreadPool()会执行类似的操作。您还可以在ThreadPoolExecutor本身上真正调整池策略。您想要什么? - Kevin
1
这就是我的想法...如果我愿意使用它的任务工作队列,它可以被调整得完全符合我的喜好。但我真正想要的是从执行器中剥离出线程池智能,并实现自己的线程池客户端,但它并没有为此做好准备。 - Jolly Roger

1
"如果存在可用的工作线程,则查找一个,如果需要则创建一个,如果它们处于空闲状态则杀死它们。"
管理所有这些工作状态既不必要又危险。我会创建一个监视器线程,在后台不断运行,其唯一任务是填充队列并生成消费者...为什么不将工作线程设置为守护进程,以便它们在完成后立即死亡?如果将它们全部附加到一个ThreadGroup中,则可以动态调整池的大小...例如:"
  **for(int i=0; i<queue.size()&&ThreadGroup.activeCount()<UPPER_LIMIT;i++ { 
         spawnDaemonWorkers(queue.poll());
  }**

0
你可以让消费者直接执行Runnable::run而不是启动一个新线程。再使用一个有最大大小的阻塞队列,我认为你将得到想要的结果。你的消费者变成了一个工作者,在基于队列上的工作项中执行任务。他们只会按照处理速度出队元素,因此当消费者停止消费时,生产者也会停止生产。

这只会提供一个工作线程,我想调整它以最大化给定系统配置的处理能力。处理作业将持续从几分之一秒到十秒钟不等,并且是I/O绑定和CPU绑定工作的混合。 - Jolly Roger
我假设有多个与一个或多个生产者相关联的消费者。如果您只有一个单独的消费者,那么为什么不让生产者直接将工作转储到执行器中呢? - D.Shawley

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接