如何中断在take()操作上阻塞的BlockingQueue?

98

我有一个类,它从BlockingQueue获取对象并通过连续调用take()方法处理它们。在某个时刻,我知道不会再往队列中添加更多的对象了。那么,如何中断take()方法以停止阻塞呢?

这是处理对象的类:

public class MyObjHandler implements Runnable {

  private final BlockingQueue<MyObj> queue;

  public class MyObjHandler(BlockingQueue queue) {
    this.queue = queue;
  }

  public void run() {
    try {
      while (true) {
        MyObj obj = queue.take();
        // process obj here
        // ...
      }
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }
}

这里是使用这个类来处理对象的方法:

public void testHandler() {

  BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);  

  MyObjectHandler  handler = new MyObjectHandler(queue);
  new Thread(handler).start();

  // get objects for handler to process
  for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
    queue.put(i.next());
  }

  // what code should go here to tell the handler
  // to stop waiting for more objects?
}
5个回答

85

如果不能中断线程,则另一种方法是将“标记”或“命令”对象放置在队列上,MyObjHandler会将其识别为此类对象并跳出循环。


46
这也被称为“毒丸关闭”方法,详细讨论在《Java并发编程实践》的第155-156页。 - Brandon Yarbrough
1
如果有多个消费者,这种方法是否可行? - Rajat Aggarwal

16
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100);
MyObjectHandler handler = new MyObjectHandler(queue);
Thread thread = new Thread(handler);
thread.start();
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) {
  queue.put(i.next());
}
thread.interrupt();

然而,如果你这样做,当队列中仍然有等待被处理的项目时,线程可能会被中断。 你可以考虑使用 poll 而不是 take,它将允许处理线程在等待一段时间没有新输入后超时终止。


是的,如果线程在队列中仍有项目时被中断,那么这将是一个问题。为了解决这个问题,我添加了代码以确保在线程中断之前队列为空:<code>while (queue.size()>0) Thread.currentThread().sleep(5000);</code> - MCS
4
注意:@MCS - 提供的方法是一个技巧,不应在生产代码中复制,原因有三。首选总是找到实际的方法来挂钩关闭。使用 Thread.sleep() 代替正确的挂钩永远是不可接受的。在其他实现中,其他线程可能会将东西放入队列中,而 while 循环可能永远不会结束。 - Erick Robertson
幸运的是,不需要依赖这样的技巧,因为如果必要的话,可以在中断后很容易地处理它。例如,“详尽”的take()实现可能如下所示:try { return take(); } catch (InterruptedException e) { E o = poll(); if (o == null) throw e; Thread.currentThread().interrupt(); return o; }然而,在这个层面上实现它没有必要,稍微高一点的实现将会导致更有效率的代码(例如通过避免每个元素的InterruptedException和/或使用BlockingQueue.drainTo())。 - antak

15

虽然有些晚了,但希望这也可以帮助其他人,因为我遇到了类似的问题,并使用了erickson上面建议的poll方法,并进行了一些小的更改。

class MyObjHandler implements Runnable 
{
    private final BlockingQueue<MyObj> queue;
    public volatile boolean Finished;  //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL
    public MyObjHandler(BlockingQueue queue) 
    {
        this.queue = queue;
        Finished = false;
    }
    @Override
    public void run() 
    {        
        while (true) 
        {
            try 
            {
                MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS);
                if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return
                {
                    // process obj here
                    // ...
                }
                if(Finished && queue.isEmpty())
                    return;

            } 
            catch (InterruptedException e) 
            {                   
                return;
            }
        }
    }
}

public void testHandler() 
{
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler  handler = new MyObjHandler(queue);
    new Thread(handler).start();

    // get objects for handler to process
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); )
    {
        queue.put(i.next());
    }

    // what code should go here to tell the handler to stop waiting for more objects?
    handler.Finished = true; //THIS TELLS HIM
    //If you need you can wait for the termination otherwise remove join
    myThread.join();
}

这解决了两个问题:

  1. 标记了BlockingQueue,使其知道无需再等待元素
  2. 没有在处理块之间中断,因此只有当队列中所有项目都被处理并且没有剩余项目需要添加时,处理块才会终止。

2
Finished变量设置为volatile以确保线程之间的可见性。请参阅https://dev59.com/C3VD5IYBdhLWcg3wDXF3#106787。 - lukk
2
如果我没记错的话,队列中的最后一个元素不会被处理。当你从队列中取出最后一个元素时,finished为true并且队列为空,因此在处理完最后一个元素之前就会返回。添加第三个条件 if (Finished && queue.isEmpty() && obj == null) - Matt R
@MattR 谢谢,你的建议很正确。我会编辑已发布的答案。 - Deepak Bhatia

4

中断线程:

thread.interrupt()

1

或者不要打断,这很讨厌。

    public class MyQueue<T> extends ArrayBlockingQueue<T> {

        private static final long serialVersionUID = 1L;
        private boolean done = false;

        public ParserQueue(int capacity) {  super(capacity); }

        public void done() { done = true; }

        public boolean isDone() { return done; }

        /**
         * May return null if producer ends the production after consumer 
         * has entered the element-await state.
         */
        public T take() throws InterruptedException {
            T el;
            while ((el = super.poll()) == null && !done) {
                synchronized (this) {
                    wait();
                }
            }

            return el;
        }
    }
  1. 当生产者将对象放入队列时,调用 queue.notify(),如果结束了,则调用 queue.done()
  2. 循环直到 (!queue.isDone() || !queue.isEmpty())
  3. 测试 take() 返回值是否为 null

1
我认为之前的解决方案比这个更简洁和简单。 - sakthisundar
1
完成标志和毒药丸一样,只是实施方式不同 :) - David Mann
2
更简洁?我怀疑。你不知道线程被中断的确切时间。或者让我问一下,这有什么更简洁的呢?它代码更少,这是事实。 - tomasb
如果您打算从多个线程使用该类,则可能希望将done布尔值设置为volatile - Robert

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接