我正在努力寻找最佳方法来实现我的处理管道。
我的生产者向BlockingQueue提供工作。在消费者方面,我轮询队列,在获取到的内容上包装一个Runnable任务,并将其提交给ExecutorService。
while (!isStopping())
{
String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
if (work == null)
{
break;
}
executorService.execute(new Worker(work)); // needs to block if no threads!
}
这并不是理想的做法;ExecutorService有它自己的队列,所以实际上我总是完全清空我的工作队列并填充任务队列,随着任务完成而缓慢地清空。
我意识到我可以在生产者端排队任务,但我真的不想这样做——我喜欢我的工作队列是愚蠢的字符串的间接性/隔离性;对于生产者来说,发生了什么事情并不重要。强制生产者排队Runnable或Callable会打破一种抽象,IMHO。
但是我确实希望共享的工作队列表示当前的处理状态。如果消费者没有跟上,我希望能够阻塞生产者。
我很想使用Executors,但我觉得我正在与它们的设计作斗争。我能不能部分地饮用这种“魔药”,还是必须要一口气喝完?我抵制排队任务是错误的吗?(我怀疑我可以设置ThreadPoolExecutor使用一个1任务队列,并覆盖它的执行方法以阻塞而不是在队列满时拒绝,但感觉很恶心)
有什么建议吗?