阻塞队列 - 需要更多信息

5
这个问题与我之前的一个问题有关... 上一篇文章 在那里提到了阻塞性质作为优点。
我尝试编写一些简单的代码来演示阻塞性质,但是我卡住了。我只是尝试创建一个大小为4的BlockingQueue,并尝试添加5个元素,结果遇到了java.lang.IllegalStateException。有人能给我展示一个BlockingQueue阻塞性质的代码示例吗?
public static void main(String[] args) {
    BlockingQueue<String> bq = new LinkedBlockingQueue<String>(4);

    try {
        bq.offer("A");
        bq.offer("B");
        bq.offer("C");
        bq.offer("D");
        bq.offer("E");

        System.out.println("1 = " + bq.take());
        System.out.println("2 = " + bq.take());
        System.out.println("3 = " + bq.take());
        System.out.println("4 = " + bq.take());
        System.out.println("5 = " + bq.take());
        System.out.println("6 = " + bq.take());
    } catch (Exception e) {
        // TODO: handle exception
        e.printStackTrace();
    }
}

我使用了这段代码。在这种情况下,我试图将5个元素放入大小为4的队列中。在这种情况下,应该向队列中添加4个元素(A、B、C、D)。然后我在打印时调用take()方法。当我调用System.out.println("1 = " + bq.take());时,"E"不应该自动插入队列中吗?因为它会得到一个空闲的槽位?
4个回答

11
你是用addoffer还是put添加元素的?我猜你使用了add,因为只有它可以抛出IllegalStateException。但如果你看表格,你会发现如果你需要阻塞的语义,你应该使用put(和take来移除)。 编辑:你的示例有几个问题。首先,我将回答“为什么在第一次调用take()时E没有被插入?”答案是,在调用take()之前,您已经尝试并失败地插入了E。因此,在空间被释放后,没有东西可插入。
现在如果你将offer()更改为put()put("E")永远不会返回。为什么?因为它正在等待其他线程从队列中移除元素。请记住,BlockingQueues是设计用于多个线程访问的。如果您有单线程应用程序,则阻塞是无用的(实际上比无用还要糟糕)。
这是一个改进后的示例:
public static void main(String[] args) {
    final BlockingQueue<String> bq = new LinkedBlockingQueue<String>(4);

    Runnable producer = new Runnable() {
        public void run() {
            try {
                bq.put("A");
                bq.put("B");
                bq.put("C");
                bq.put("D");
                bq.put("E");
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt(); 
            }
        }
    };
    Runnable consumer = new Runnable() {
        public void run() {
            try {
                System.out.println("1 = " + bq.take());
                System.out.println("2 = " + bq.take());
                System.out.println("3 = " + bq.take());
                System.out.println("4 = " + bq.take());
                System.out.println("5 = " + bq.take());
                System.out.println("6 = " + bq.take());
            } catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
            }
        }
    };
    new Thread(producer).start();
    new Thread(consumer).start();
}
现在,put("E") 的调用实际上将会成功,因为它现在可以等待消费者线程从队列中移除"A"。最后一个 take() 仍然会无限期地阻塞,因为没有第六个元素可供移除。

以前我使用的是Add + Poll。但目前我正在尝试使用put + Take。 - Chathuranga Chandrasekara
我应该如何为PUT()指定超时时间? - Chathuranga Chandrasekara
2
Put 操作没有超时限制;如果您需要超时限制,请使用 offer。 - Adam Jaskiewicz

2
mmyers 抢先一步了 :P (+1) 那应该是你需要的,祝你好运!
注意:在您的示例中put()将失败,因为put()将阻塞直到有空间可用。由于没有空间可用,程序永远不会继续执行。
旧答案:
BlockingQueue是一个接口,您必须使用其中一个实现类。
“阻塞性质”仅表示您可以从队列请求某些内容,如果为空,则线程将阻塞(等待),直到向队列添加了内容,然后继续处理。
ArrayBlockingQueue DelayQueue LinkedBlockingQueue PriorityBlockingQueue SynchronousQueue

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

//your main collection
LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<Integer>();

//Add your values
lbq.put(100);
lbq.put(200);

//take() will actually remove the first value from the collection, 
//or block if no value exists yet.
//you will either have to interrupt the blocking, 
//or insert something into the queue for the program execution to continue

int currVal = 0;
try {
    currVal = lbq.take();
} catch (InterruptedException e) {
    e.printStackTrace();
}


谢谢。我使用了“LinkedBlockingQueue”。 - Chathuranga Chandrasekara
最好你把它发布出去。只需一个简短的例子 :) - Chathuranga Chandrasekara
1
这样行不通。如果你想要它阻塞,你需要使用 put,如果你想要在无法添加元素时返回,则需要使用 offer。 - Adam Jaskiewicz
此外,不要仅仅捕获InterruptedException并打印堆栈跟踪。 - Adam Jaskiewicz
@robbotic 这样的逻辑就是为什么这个习语会出现在 DevZone 上的文章中。然后,它最终会出现在生产代码中,三年后当人们决定实际关心应用程序不能干净地关闭时(或者他们只是睡眠三秒钟并执行 System.exit(0)),我必须修复它。 - Adam Jaskiewicz
显示剩余6条评论

1
具体回答你的问题:Offer是一个非阻塞的offer调用,因此在像你发布的单线程方法中,对offer('E')的调用只是返回false而不修改完整队列。如果您使用了阻塞put('E')调用,它将休眠直到有空间可用。在你的简单示例中永远如此。您需要有一个单独的线程从队列中读取以创建空间,以便put完成。

将offer替换为put,但失败了。我需要为PUT()使用单独的线程吗? - Chathuranga Chandrasekara
在我的回答中,我说:你需要有一个单独的线程从队列中读取数据,以便为put操作腾出空间。 - lostlogic

0

嗨,更多关于这个类的信息

 /**
 * Inserts the specified element into this queue if it is possible to do
 * so immediately without violating capacity restrictions, returning
 * {@code true} upon success and throwing an
 * {@code IllegalStateException} if no space is currently available.
 * When using a capacity-restricted queue, it is generally preferable to
 * use {@link #offer(Object) offer}.
 *
 * @param e the element to add
 * @return {@code true} (as specified by {@link Collection#add})
 * @throws IllegalStateException if the element cannot be added at this
 *         time due to capacity restrictions
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null
 * @throws IllegalArgumentException if some property of the specified
 *         element prevents it from being added to this queue
 */
boolean add(E e);

/**
 * Inserts the specified element into this queue if it is possible to do
 * so immediately without violating capacity restrictions, returning
 * {@code true} upon success and {@code false} if no space is currently
 * available.  When using a capacity-restricted queue, this method is
 * generally preferable to {@link #add}, which can fail to insert an
 * element only by throwing an exception.
 *
 * @param e the element to add
 * @return {@code true} if the element was added to this queue, else
 *         {@code false}
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null
 * @throws IllegalArgumentException if some property of the specified
 *         element prevents it from being added to this queue
 */
boolean offer(E e);

/**
 * Inserts the specified element into this queue, waiting if necessary
 * for space to become available.
 *
 * @param e the element to add
 * @throws InterruptedException if interrupted while waiting
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null
 * @throws IllegalArgumentException if some property of the specified
 *         element prevents it from being added to this queue
 */
void put(E e) throws InterruptedException;

/**
 * Inserts the specified element into this queue, waiting up to the
 * specified wait time if necessary for space to become available.
 *
 * @param e the element to add
 * @param timeout how long to wait before giving up, in units of
 *        {@code unit}
 * @param unit a {@code TimeUnit} determining how to interpret the
 *        {@code timeout} parameter
 * @return {@code true} if successful, or {@code false} if
 *         the specified waiting time elapses before space is available
 * @throws InterruptedException if interrupted while waiting
 * @throws ClassCastException if the class of the specified element
 *         prevents it from being added to this queue
 * @throws NullPointerException if the specified element is null
 * @throws IllegalArgumentException if some property of the specified
 *         element prevents it from being added to this queue
 */
boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException;

/**
 * Retrieves and removes the head of this queue, waiting if necessary
 * until an element becomes available.
 *
 * @return the head of this queue
 * @throws InterruptedException if interrupted while waiting
 */
E take() throws InterruptedException;

/**
 * Retrieves and removes the head of this queue, waiting up to the
 * specified wait time if necessary for an element to become available.
 *
 * @param timeout how long to wait before giving up, in units of
 *        {@code unit}
 * @param unit a {@code TimeUnit} determining how to interpret the
 *        {@code timeout} parameter
 * @return the head of this queue, or {@code null} if the
 *         specified waiting time elapses before an element is available
 * @throws InterruptedException if interrupted while waiting
 */
E poll(long timeout, TimeUnit unit)
    throws InterruptedException;

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接