如何阻塞直到BlockingQueue为空?

28
我正在寻找一种方法,可以阻塞直到 BlockingQueue 为空。
我知道,在多线程环境中,只要有生产者向 BlockingQueue 中放置项目,就可能出现队列变空后几纳秒之后又被填满的情况。
但是,如果只有一个生产者,则它可能希望等待(并阻塞),直到在停止将项目放入队列后队列为空。
Java/伪代码:
// Producer code
BlockingQueue queue = new BlockingQueue();

while (having some tasks to do) {
    queue.put(task);
}

queue.waitUntilEmpty(); // <-- how to do this?

print("Done");

你有什么想法吗?

编辑:我知道包装BlockingQueue并使用额外的条件会解决问题,我只是想问是否有预制的解决方案和/或更好的替代方案。


显然,您可以调用peek直到它返回null。是什么让这种阻塞成为不可接受的解决方案? - OldCurmudgeon
据我所知,对于你的编辑,答案是否定的。请注意,正如我在我的回答中所写的那样,你的用例非常特殊...确保你真的需要做你所要求的事情。你没有说明为什么你需要这样的行为。 - João Fernandes
@JoãoFernandes:我现在并不是非得需要它,只是出于好奇心。我喜欢阅读关于编程问题的意见。 - gd1
现在这是一个很好的论点 :) 让我们等待更多的答案,看看是否有人知道我们不知道的东西。 - João Fernandes
6个回答

23
使用wait()notify()的简单解决方案:
// Producer:

// `sychronized` is necessary, otherwise `.notify` will not work
synchronized(queue) {
    while (!queue.isEmpty())
        queue.wait(); // wait for the queue to become empty
        // this is not a deadlock, because `.wait` will release the lock
    queue.put();
}

// Consumer:
synchronized(queue) {
    queue.get();
    if (queue.isEmpty())
        queue.notify(); // notify the producer
}

3
不,阅读 Object.wait() 的 Javadoc。它会释放锁直到另一个线程调用 notify。文档甚至明确指出:“当前线程必须拥有此对象的监视器”。 - Timmos
8
这将如何运作?假设消费者先运行,但由于队列中还没有项目,因此在queue.get()上被阻塞。现在生产者无法生产,因为它无法获取锁定。 - hankduan
1
我指的是take,因为在这种情况下poll没有任何效果。 - niculare
@hankduan 或许需要先运行生产者 - undefined
queue.put() should be placed inside the while block? - undefined
显示剩余3条评论

8
我知道你可能已经有很多线程在主动轮询或者从队列中取出任务了,但我还是觉得你的流程/设计不太对。
队列变为空并不意味着之前添加的任务都已经完成,有些项目可能需要很长时间才能处理完毕,所以检查空队列并没有太大用处。
因此你应该忘记 BlockingQueue,可以像使用其他集合一样使用它。将项目转换为 CallableCollections,然后利用 ExecutorService.invokeAll()
    Collection<Item> queue = ...
    Collection<Callable<Result>> tasks = new ArrayList<Callable<Result>>();

    for (Item item : queue) {
        tasks.add(new Callable<Result>() {

            @Override
            public Result call() throws Exception {
                // process the item ...

                return result;
            }
        });
    }

    // look at the results, add timeout for invokeAll if necessary
    List<Future<Result>> results = executorService.invokeAll(tasks);

    // done

这种方法将使您完全控制生产者等待的时间以及适当的异常处理。

4

虽然不完全符合您的要求,但使用 SynchronousQueue 与您的Java / 伪代码具有非常相似的效果,即生产者在某些消费者检索所有数据之前阻塞。

唯一的区别是每次放置时生产者都会阻塞,直到有消费者来检索数据,而不仅仅是在最后一次。不确定这是否会对您的情况造成影响。我期望它只会使生产者执行的任务相对昂贵。


2
您的使用情况应该非常特殊,因为通常您只想在队列已满时阻止生产者,而不是等到队列为空。无论如何,这是可行的。我认为旋转直到isEmpty返回true并不那么低效,因为生产者将局部旋转,即将访问其自己的缓存,而不是访问总线。但是,它会消耗CPU时间,因为线程仍然可以调度。但是本地旋转绝对是更容易的方法。否则,我看到两个选择:
1.像@niculare建议的那样使用wait+ notify 2.以无锁的方式使第一个注意到队列为空的消费者通知生产者;这将更慢,但会更优雅地降级。

1

@niculare给出的答案似乎可以,但请注意@hankduan的评论。

我在寻找解决类似问题的方案时看到了这个问题。
最后,我或多或少地重写了LinkedBlockingQueue
它具有其方法的子集,并且不实现CollectionIterable
它管理多个生产者和消费者。
对我来说,它很有效。

/**********************************************************************************************************************
 * Import specifications
 *********************************************************************************************************************/
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;

/**********************************************************************************************************************
 * This class implements a completely reentrant FIFO.
 *********************************************************************************************************************/
public class BlockingFIFO<E>
{
  /********************************************************************************************************************
   * The constructor creates an empty FIFO with a capacity of {@link Integer#MAX_VALUE}.
   *******************************************************************************************************************/
  public BlockingFIFO()
  {
    // -----------------
    // Initialize object
    // -----------------
    this(Integer.MAX_VALUE);

  } // constructor

  /********************************************************************************************************************
   * The constructor creates an empty FIFO with the specified capacity.
   *
   * @param capacity_ipar The maximum number of elements the FIFO may contain.
   *******************************************************************************************************************/
  public BlockingFIFO(int capacity_ipar)
  {
    // ---------------------
    // Initialize attributes
    // ---------------------
    lock_attr = new ReentrantLock();
    not_empty_attr = lock_attr.newCondition();
    not_full_attr = lock_attr.newCondition();
    head_attr = null;
    tail_attr = null;
    capacity_attr = capacity_ipar;
    size_attr = 0;

  } // constructor

  /********************************************************************************************************************
   * This method removes all of the elements from the FIFO.
   *
   * @return The number of elements in the FIFO before it was cleared.
   *******************************************************************************************************************/
  public int clear()
  {
    // -----------------
    // Initialize result
    // -----------------
    int result;
    result = 0;

    // ----------
    // Clear FIFO
    // ----------
    lock_attr.lock();
    try
    {
      result = size_attr;
      head_attr = null;
      tail_attr = null;
      size_attr = 0;
      not_full_attr.signalAll();
    }
    finally
    {
      lock_attr.unlock();
    }

    // ----
    // Done
    // ----
    return (result);

  } // clear

  /********************************************************************************************************************
   * This method returns the number of elements in the FIFO.
   *
   * @return The number of elements in the FIFO.
   *******************************************************************************************************************/
  public int size()
  {
    // -----------
    // Return size
    // -----------
    lock_attr.lock();
    try
    {
      return (size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // size

  /********************************************************************************************************************
   * This method returns the number of additional elements that the FIFO can ideally accept without blocking.
   *
   * @return The remaining capacity the FIFO.
   *******************************************************************************************************************/
  public int remainingCapacity()
  {
    // -------------------------
    // Return remaining capacity
    // -------------------------
    lock_attr.lock();
    try
    {
      return (capacity_attr - size_attr);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // remainingCapacity

  /********************************************************************************************************************
   * This method waits for the FIFO to become empty.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public void waitEmpty()
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
        not_full_attr.await();
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for the FIFO to become empty.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    lock_attr.lock();
    try
    {
      while (size_attr > 0)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // waitEmpty

  /********************************************************************************************************************
   * This method waits at most the specified time for the FIFO to become empty.
   * <br>It returns <code>true</code> if the FIFO is empty and <code>false</code> otherwise.
   *
   * @param  timeout_ipar         The maximum time to wait for the FIFO to become empty.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the FIFO is empty.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for the FIFO to become
   *                              empty.
   *******************************************************************************************************************/
  public boolean waitEmpty(long    timeout_ipar,
                           TimeUnit unit_ipar)
    throws InterruptedException
  {
    // -----------------------------
    // Wait for FIFO to become empty
    // -----------------------------
    return (waitEmpty(unit_ipar.toMillis(timeout_ipar)));

  } // waitEmpty

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO if it is possible to do so immediately without
   * exceeding the queue's capacity.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar The element to add to the FIFO.
   * @return              True if and only if the element was added to the FIFO.
   *******************************************************************************************************************/
  public boolean offer(E element_ipar)
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      if (capacity_attr > size_attr)
      {
        push(element_ipar);
        return (true);
      }
      else
        return (false);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum number of milliseconds to wait for space to become available.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E    element_ipar,
                       long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
      {
        if (!not_full_attr.awaitUntil(deadline))
          return (false);
      }
      push(element_ipar);
      return (true);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary up to the specified wait time
   * for space to become available.
   * <br>It returns <code>true</code> upon success and <code>false</code> if this queue is full.
   *
   * @param  element_ipar         The element to add to the FIFO.
   * @param  timeout_ipar         The maximum time to wait for space to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      True if and only if the element was added to the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public boolean offer(E        element_ipar,
                       long     timeout_ipar,
                       TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ----------------------------
    // Try to add specified element
    // ----------------------------
    return (offer(element_ipar, unit_ipar.toMillis(timeout_ipar)));

  } // offer

  /********************************************************************************************************************
   * This method adds the specified element at the end of the FIFO, waiting if necessary for space to become available.
   *
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public void put(E element_ipar)
    throws InterruptedException
  {
    // ----------------------
    // Try to add the element
    // ----------------------
    lock_attr.lock();
    try
    {
      while (size_attr == capacity_attr)
        not_full_attr.await();
      push(element_ipar);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // put

  /********************************************************************************************************************
   * This method retrieves, but does not remove, the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E peek()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (head_attr.contents);
    }
    finally
    {
      lock_attr.unlock();
    }

  } // peek

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, or returns <code>null</code> if the FIFO is
   * empty.
   *
   * @return The head of the FIFO, or <code>null</code> if the FIFO is empty.
   *******************************************************************************************************************/
  public E poll()
  {
    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      if (size_attr == 0)
        return (null);
      else
        return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum number of milliseconds to wait for an element to become available.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long timeout_ipar)
    throws InterruptedException
  {
    // ------------------
    // Determine deadline
    // ------------------
    Date deadline;
    deadline = new Date(System.currentTimeMillis() + timeout_ipar);

    // --------------------
    // Return first element
    // --------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
      {
        if (!not_empty_attr.awaitUntil(deadline))
          return (null);
      }
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting up to the specified wait time if necessary for an
   * element to become available.
   * <br>It returns <code>null</code> if the specified waiting time elapses before an element is available.
   *
   * @param  timeout_ipar         The maximum time to wait for an element to become available.
   * @param  unit_ipar            The unit of the specified timeout.
   * @return                      The head of the FIFO, or <code>null</code> if the specified waiting time elapses
   *                              before an element is available.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for an element to become
   *                              available.
   *******************************************************************************************************************/
  public E poll(long     timeout_ipar,
                TimeUnit unit_ipar)
    throws InterruptedException
  {
    // ------------------------
    // Try to get first element
    // ------------------------
    return (poll(unit_ipar.toMillis(timeout_ipar)));

  } // poll

  /********************************************************************************************************************
   * This method retrieves and removes the head of the FIFO, waiting if necessary for an element to become available.
   *
   * @return                      The head of the FIFO.
   * @throws InterruptedException Thrown when the current thread got interrupted while waiting for space to become
   *                              available.
   *******************************************************************************************************************/
  public E take()
    throws InterruptedException
  {
    // ---------------------------
    // Try to return first element
    // ---------------------------
    lock_attr.lock();
    try
    {
      while (size_attr == 0)
        not_empty_attr.await();
      return (pop());
    }
    finally
    {
      lock_attr.unlock();
    }

  } // take

  /********************************************************************************************************************
   * This class implements as node within the FIFO.
   *******************************************************************************************************************/
  private class Node
  {
    E    contents;
    Node next;

  } // class Node

  /********************************************************************************************************************
   * This method adds the specified element to the end of the FIFO.
   * <br>It sends a signal to all threads waiting for the FIFO to contain something.
   * <br>The caller should have locked the object and have made sure the list is not full.
   *******************************************************************************************************************/
  private void push(E element_ipar)
  {
    // -----------
    // Create node
    // -----------
    Node node;
    node = new Node();
    node.contents = element_ipar;
    node.next = null;

    // --------------
    // Add to the end
    // --------------
    if (head_attr == null)
      head_attr = node;
    else
      tail_attr.next = node;
    tail_attr = node;

    // ----------------------
    // We got another element
    // ----------------------
    size_attr++;
    not_empty_attr.signalAll();

  } // push

  /********************************************************************************************************************
   * This method removes the first element from the FIFO and returns it.
   * <br>It sends a signal to all threads waiting for the FIFO to have space available.
   * <br>The caller should have locked the object and have made sure the list is not empty.
   *******************************************************************************************************************/
  private E pop()
  {
    // ------------
    // Isolate node
    // ------------
    Node node;
    node = head_attr;
    head_attr = node.next;
    if (head_attr == null)
      tail_attr = null;

    // --------------------------
    // We removed another element
    // --------------------------
    size_attr--;
    not_full_attr.signalAll();

    // ----
    // Done
    // ----
    return (node.contents);

  } // pop

  /********************************************************************************************************************
   * This attribute represents the lock on the FIFO.
   *******************************************************************************************************************/
  private Lock lock_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being empty.
   *******************************************************************************************************************/
  private Condition not_empty_attr;

  /********************************************************************************************************************
   * This attribute represents the condition of the FIFO not being full.
   *******************************************************************************************************************/
  private Condition not_full_attr;

  /********************************************************************************************************************
   * This attribute represents the first element of the FIFO.
   *******************************************************************************************************************/
  private Node head_attr;

  /********************************************************************************************************************
   * This attribute represents the last element of the FIFO.
   *******************************************************************************************************************/
  private Node tail_attr;

  /********************************************************************************************************************
   * This attribute represents the capacity of the FIFO.
   *******************************************************************************************************************/
  private int capacity_attr;

  /********************************************************************************************************************
   * This attribute represents the size of the FIFO.
   *******************************************************************************************************************/
  private int size_attr;

} // class BlockingFIFO

-1
根据文档,BlockingQueue可以通过方法take()poll(time, unit)而不是poll()在“Remove”时被阻塞。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接