当线程被中断时,BlockingQueue方法是否总是抛出InterruptedException异常?

8
在我的Java 6应用程序中,有一个线程负责向主线程提供数据,同时从数据库中预取更多的记录。它使用ArrayBlockingQueue队列作为FIFO缓冲区,其主循环大致如下:
while (!Thread.interrupted()) {
    if (source.hasNext()) {
        try {
            queue.put(source.next())
        } catch (InterruptedException e) {
            break;
        }
    } else {
        break;
    }
}

代码在循环终止后会进行一些清理工作,例如污染队列并释放任何资源,但这基本上就是全部。

目前为止,主线程没有直接与进给线程进行通信:进给线程使用阻塞队列控制数据流,设置适当的选项,然后独自运行。

问题出现在主线程需要在队列满时关闭进给程序。由于没有直接控制通道,关闭方法使用Thread接口来interrupt()进给线程。不幸的是,在大多数情况下,尽管被中断,进给线程仍然被阻塞在put()中 - 没有抛出异常。

通过简要查阅interrupt()文档和队列实现源代码,我发现很多时候put()会阻塞而不使用JVM的任何可中断设施。具体来说,在我的当前JVM(OpenJDK 1.6b22)上,它会阻塞在sun.misc.Unsafe.park()本地方法上。也许它使用自旋锁或其他东西,但无论如何,这似乎属于以下情况之一:

如果没有满足前面的条件,则将设置此线程的中断状态。

状态标志被设置,但线程仍然在put()中被阻塞,并且不会进一步迭代以便可以检查该标志。结果是什么?一个永远不会死的僵尸线程!
  1. 我对这个问题的理解正确吗,还是我漏掉了什么?

  2. 修复此问题的可能方法有哪些?目前我只能想到两种解决方案:

    a. 在队列上调用poll()多次以取消阻塞饲料线程:从我所看到的来看,这种方法很丑陋且不太可靠,但它大多数情况下有效。

    b. 使用带超时的offer()方法而不是put(),以允许线程在可接受的时间范围内检查其中断状态。

除非我漏掉了什么,否则这是Java中BlockingQueue实现的一个相对较少记录的警告。当文档建议毒化队列以关闭工作线程等时,似乎有一些迹象,但我找不到任何明确的参考。

编辑:

好的,以上方案(a)还有一个更加激进的变化:ArrayBlockingQueue.clear()。我认为这个方案总是有效的,即使它不完全符合优雅的定义...

2个回答

7
我认为你的问题有两个可能的原因。
  1. As described in The Law of the Sabotaged Doorbell you may not be handling the interrupt correctly. There you will find:

    What should we do when we call code that may cause an InterruptedException? Don't immediately yank out the batteries! Typically there are two answers to that question:

    Rethrow the InterruptedException from your method. This is usually the easiest and best approach. It is used by the new java.util.concurrent.* package, which explains why we are now constantly coming into contact with this exception.
    Catch it, set interrupted status, return. If you are running in a loop that calls code which may cause the exception, you should set the status back to being interrupted.

    For example:

    while (!Thread.currentThread().isInterrupted()) {
        // do something
        try {
            TimeUnit.SECONDS.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            break;
        }
    }
    
  2. Either source.hasNext() or source.next() are consuming and discarding the interrupt status. See Added below for how I solved this problem.

我相信在ArrayBlockingqueue.put()中断线程一个有效的解决方案。

添加

我使用了一个CloseableBlockingQueue来解决问题2,该队列可以从读取端关闭。这样一来,一旦关闭,所有put调用都将被快捷处理。然后你可以从写入者那里检查队列的closed标志。

// A blocking queue I can close from the pull end. 
// Please only use put because offer does not shortcut on close.
// <editor-fold defaultstate="collapsed" desc="// Exactly what it says on the tin.">
class CloseableBlockingQueue<E> extends ArrayBlockingQueue<E> {
  // Flag indicates closed state.
  private volatile boolean closed = false;
  // All blocked threads. Actually this is all threads that are in the process
  // of invoking a put but if put doesn't block then they disappear pretty fast.
  // NB: Container is O(1) for get and almost O(1) (depending on how busy it is) for put.
  private final Container<Thread> blocked;

  // Limited size.
  public CloseableBlockingQueue(int queueLength) {
    super(queueLength);
    blocked = new Container<Thread>(queueLength);
  }

  /**
   * *
   * Shortcut to do nothing if closed.
   *
   * Track blocked threads.
   */
  @Override
  public void put(E e) throws InterruptedException {
    if (!closed) {
      Thread t = Thread.currentThread();
      // Hold my node on the stack so removal can be trivial.
      Container.Node<Thread> n = blocked.add(t);
      try {
        super.put(e);
      } finally {
        // Not blocked anymore.
        blocked.remove(n, t);
      }
    }
  }

  /**
   *
   * Shortcut to do nothing if closed.
   */
  @Override
  public E poll() {
    E it = null;
    // Do nothing when closed.
    if (!closed) {
      it = super.poll();
    }
    return it;
  }

  /**
   *
   * Shortcut to do nothing if closed.
   */
  @Override
  public E poll(long l, TimeUnit tu) throws InterruptedException {
    E it = null;
    // Do nothing when closed.
    if (!closed) {
      it = super.poll(l, tu);
    }
    return it;
  }

  /**
   *
   * isClosed
   */
  boolean isClosed() {
    return closed;
  }

  /**
   *
   * Close down everything.
   */
  void close() {
    // Stop all new queue entries.
    closed = true;
    // Must unblock all blocked threads.

    // Walk all blocked threads and interrupt them.
    for (Thread t : blocked) {
      //log("! Interrupting " + t.toString());
      // Interrupt all of them.
      t.interrupt();
    }
  }

  @Override
  public String toString() {
    return blocked.toString();
  }
}

您还需要使用 Container,它是无锁的并且具有 O(1) 的 put/get 操作(尽管它不是严格的集合)。 它在幕后使用了一个 Ring

public class Container<T> implements Iterable<T> {

  // The capacity of the container.
  final int capacity;
  // The list.
  AtomicReference<Node<T>> head = new AtomicReference<Node<T>>();

  // Constructor
  public Container(int capacity) {
    this.capacity = capacity;
    // Construct the list.
    Node<T> h = new Node<T>();
    Node<T> it = h;
    // One created, now add (capacity - 1) more
    for (int i = 0; i < capacity - 1; i++) {
      // Add it.
      it.next = new Node<T>();
      // Step on to it.
      it = it.next;
    }
    // Make it a ring.
    it.next = h;
    // Install it.
    head.set(h);
  }

  // Empty ... NOT thread safe.
  public void clear() {
    Node<T> it = head.get();
    for (int i = 0; i < capacity; i++) {
      // Trash the element
      it.element = null;
      // Mark it free.
      it.free.set(true);
      it = it.next;
    }
    // Clear stats.
    resetStats();
  }

  // Add a new one.
  public Node<T> add(T element) {
    // Get a free node and attach the element.
    return getFree().attach(element);
  }

  // Find the next free element and mark it not free.
  private Node<T> getFree() {
    Node<T> freeNode = head.get();
    int skipped = 0;
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free.
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
      skipped += 1;
      freeNode = freeNode.next;
    }
    if (skipped < capacity) {
      // Put the head as next.
      // Doesn't matter if it fails. That would just mean someone else was doing the same.
      head.set(freeNode.next);
    } else {
      // We hit the end! No more free nodes.
      throw new IllegalStateException("Capacity exhausted.");
    }
    return freeNode;
  }

  // Mark it free.
  public void remove(Node<T> it, T element) {
    // Remove the element first.
    it.detach(element);
    // Mark it as free.
    if (!it.free.compareAndSet(false, true)) {
      throw new IllegalStateException("Freeing a freed node.");
    }
  }

  // The Node class. It is static so needs the <T> repeated.
  public static class Node<T> {

    // The element in the node.
    private T element;
    // Are we free?
    private AtomicBoolean free = new AtomicBoolean(true);
    // The next reference in whatever list I am in.
    private Node<T> next;

    // Construct a node of the list
    private Node() {
      // Start empty.
      element = null;
    }

    // Attach the element.
    public Node<T> attach(T element) {
      // Sanity check.
      if (this.element == null) {
        this.element = element;
      } else {
        throw new IllegalArgumentException("There is already an element attached.");
      }
      // Useful for chaining.
      return this;
    }

    // Detach the element.
    public Node<T> detach(T element) {
      // Sanity check.
      if (this.element == element) {
        this.element = null;
      } else {
        throw new IllegalArgumentException("Removal of wrong element.");
      }
      // Useful for chaining.
      return this;
    }

    @Override
    public String toString() {
      return element != null ? element.toString() : "null";
    }
  }

  // Provides an iterator across all items in the container.
  public Iterator<T> iterator() {
    return new UsedNodesIterator<T>(this);
  }

  // Iterates across used nodes.
  private static class UsedNodesIterator<T> implements Iterator<T> {
    // Where next to look for the next used node.

    Node<T> it;
    int limit = 0;
    T next = null;

    public UsedNodesIterator(Container<T> c) {
      // Snapshot the head node at this time.
      it = c.head.get();
      limit = c.capacity;
    }

    public boolean hasNext() {
      if (next == null) {
        // Scan to the next non-free node.
        while (limit > 0 && it.free.get() == true) {
          it = it.next;
          // Step down 1.
          limit -= 1;
        }
        if (limit != 0) {
          next = it.element;
        }
      }
      return next != null;
    }

    public T next() {
      T n = null;
      if ( hasNext () ) {
        // Give it to them.
        n = next;
        next = null;
        // Step forward.
        it = it.next;
        limit -= 1;
      } else {
        // Not there!!
        throw new NoSuchElementException ();
      }
      return n;
    }

    public void remove() {
      throw new UnsupportedOperationException("Not supported.");
    }
  }

  @Override
  public String toString() {
    StringBuilder s = new StringBuilder();
    Separator comma = new Separator(",");
    // Keep counts too.
    int usedCount = 0;
    int freeCount = 0;
    // I will iterate the list myself as I want to count free nodes too.
    Node<T> it = head.get();
    int count = 0;
    s.append("[");
    // Scan to the end.
    while (count < capacity) {
      // Is it in-use?
      if (it.free.get() == false) {
        // Grab its element.
        T e = it.element;
        // Is it null?
        if (e != null) {
          // Good element.
          s.append(comma.sep()).append(e.toString());
          // Count them.
          usedCount += 1;
        } else {
          // Probably became free while I was traversing.
          // Because the element is detached before the entry is marked free.
          freeCount += 1;
        }
      } else {
        // Free one.
        freeCount += 1;
      }
      // Next
      it = it.next;
      count += 1;
    }
    // Decorate with counts "]used+free".
    s.append("]").append(usedCount).append("+").append(freeCount);
    if (usedCount + freeCount != capacity) {
      // Perhaps something was added/freed while we were iterating.
      s.append("?");
    }
    return s.toString();
  }
}

我曾经轻描淡写地排除了这种可能性,因为提供数据的线程在 put() 中花费了更多的等待时间。然而,这听起来确实是有道理的。source 对象属于第三方与数据库相关的库——由于所有的网络代码,肯定会抛出 InterruptedException 异常,但是顶层方法却没有抛出它们……唉,我讨厌挖掘第三方代码…… - thkala
哦,天哪...谁写的这个库,它会吞噬每一个InterruptedException!每一个都被吞了!谁会写这样的代码呢? - thkala
顺便提一下,将线程状态设置回中断并不会对这个特定的线程产生任何影响——一旦循环中断,道路就直通终止。而且我也没有收到任何需要处理的异常... - thkala
我已经添加了我的 CloseablBlockingQueue 代码。只要您在您的 Feeder 线程中检查它的 closed 状态,一切都应该正常。在 close 时,如果线程在 put 上被阻塞,它将被中断。如果没有被阻塞,它将静默消耗所有的 put 直到您注意到它已关闭。 - OldCurmudgeon
我会点赞并接受这个答案,因为它指出了我已经放弃的一个可能性,并提供了一个代码形式的万能解决方案 :-) 我并不完全相信第三方库是唯一导致我看到的问题的原因,但我没有时间进一步调查。 - thkala
这个解决方案的乐趣在于,您仍然可以只启动线程,将其放入队列并释放它。 - OldCurmudgeon

1
私有AtomicBoolean shutdown = new AtomicBoolean();
void 关闭() { shutdown.set(true); }
while (!shutdown.get()) { if (source.hasNext()) { Object item = source.next(); while (!shutdown.get() && !queue.offer(item, 100, TimeUnit.MILLISECONDS)) { continue; } } else { break; } }

我的代码中的 Thread.interrupted() 调用应该不会有影响 - 在它返回时,线程已经很好地进入终止状态了。 - thkala
主线程是否可以在提供程序线程中设置某种关闭标志?您说没有直接的通信渠道,但是有可能吗?我建议仅通过中断来控制线程的生命周期是不良设计,并且正如您所看到的,可能会受到超出您控制范围的第三方代码的干扰。 - brettw
如果第三方库正在使用InterruptedException,那么有时候即使清空队列后线程仍然不会被中断。我将修改我上面的答案。 - brettw
1
在这里您不需要使用AtomicBoolean,因为它只会被设置为true。将其设置为volatile就足够了。 - OldCurmudgeon
我会点赞这个,因为我除了其他几项措施外,还使用了类似的东西——在处理破损的第三方代码时,不存在过度保险的说法... - thkala
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接