如何正确关闭一个BlockingQueue?

4
我有一个BlockingQueue,它在单个后台线程上处理工作事件。各种线程调用add将一些工作添加到队列中,而单个后台线程调用take获取工作并逐个处理它们。最终可能是停止处理工作的时间,我想确保请求工作的调用者要么获得他们的结果,要么获得null,表示他们的工作未完成,因为BlockingQueue正在关闭。
如何干净地停止接受新工作,我能想到的最好方法是将BlockingQueue字段设置为null,然后在调用add时捕获NullPointerException。在将字段设置为null之前,我将保留指针的本地副本,以便在停止接受工作后将其排空。我认为这会起作用,但似乎有点hacky,有没有适当的方法来做到这一点?
以下是代码的样子:
ArrayBlockingQueue<Command> commandQueue = 
    new ArrayBlockingQueue<Command>(100, true);

public boolean addToQueue(Command command) {
  try {
    return commandQueue.add(command);
  } catch (IllegalStateException e) {
    return false;
  }
}

@Override
public void run() {
  try {
    while (!Thread.currentThread().isInterrupted()) {
      Command command = commandQueue.take();
      // ... work happens here
      // result is sent back to caller
      command.provideResponseData(response);
    }
  } catch (InterruptedException e) {
    // Break out of the loop and stop
  }

  // TODO: stop accepting any new work, drain the queue of existing work 
  // and provide null responses
}

3
阻塞队列的Javadoc说明:“阻塞队列本身不支持任何类型的“关闭”或“停止”操作来表示不再添加任何项。这些特性的需求和使用往往取决于具体的实现。例如,常见的策略是由生产者插入特殊的末尾流或毒药对象,在被消费者取走时进行相应的解释。”我认为毒药对象会是一个不错的解决方案。 - undefined
1
此外,请查看https://dev59.com/Vmox5IYBdhLWcg3wyHPc - undefined
2个回答

1

如前所述,使用ExecutorServiceThreadPool,但是提交Callable而不是简单的Runnable。让您的工作线程观察某个停止信号(可能是对所有线程可见的AtomicBoolean)。如果标志已设置,则使Callable返回一个特殊值以指示未执行任何操作。调用者必须保留submit返回的Future来获取Callable的结果。

也许我应该再详细解释一下。如果您当前正在使用Runnable,可以将其包装在Callable中,并在call中检查停止标志。如果在调用ExecutorService.shutdown之前设置了停止标志,它将正常完成当前作业,但有效地取消所有剩余作业,从而快速清空剩余队列。如果不关闭,甚至可以在重置停止标志后重新使用ExecutorService。

static enum EResult {
    Cancelled, Completed
}

static abstract class MyCallable implements Callable<EResult> {
    Runnable runner;

    public MyCallable( Runnable runner) {
        super();
        this.runner = runner;
    }
}

static AtomicBoolean cancelled = new AtomicBoolean( false);

static void main( String[] argv) {

    Runnable runnable = new Runnable() {
        @Override
        public void run() {
            System.out.println( "Done");
        }
    };

    Callable<EResult> callable = new MyCallable( runnable) {
        @Override
        public EResult call() throws Exception {
            if ( cancelled.get()) {
                return EResult.Cancelled;
            }
            runner.run();
            return EResult.Completed;
        }
    };

    ExecutorService executorService = Executors.newFixedThreadPool( 1);
    // while submitting jobs, change cancelled at some point
    Future<EResult> future = executorService.submit( callable);

    try {
        EResult completeOrNot = future.get();
        System.out.println( "result: " + completeOrNot);
    } catch ( InterruptedException e) {
        e.printStackTrace();
    } catch ( ExecutionException e) {
        e.printStackTrace();
    }
}

0

不要使用BlockingQueue和工作线程的方式,考虑使用单线程ThreadPoolExecutor。就像这样:

private class CommandRunner implements Runnable {
    public CommandRunner(Command command) {
        this.command = command;
    }

    public void run() {
        // ... work happens here
        // result is sent back to caller
        command.provideResponseData(response);
    }
}

private ExecutorService commandExecutor = Executors.newSingleThreadExecutor();

public boolean addToQueue(Command command) {
    commandExecutor.submit(new CommandRunner(command));
}

然后,您的关闭方法可以委托给执行器。

单线程执行器似乎是最佳解决方案,因为我只有一个后台线程。shutdownNow() 函数看起来几乎满足我的需求,奇怪的是它返回等待中的所有任务的 Runnable 对象,我需要将它们强制转换为 FutureTask 并取消它们。 - undefined
1
那就不要使用shutdownNow。不确定为什么你想取消从shutdownNow获取的任何任务,因为很明显这些任务还没有运行。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接