Java中的阻塞队列的异步等效物是什么?

6
我正在寻找一个队列,它应该是java.util.concurrent.BlockingQueue的异步(非阻塞)等价物。其接口应包括以下内容:
public interface AsynchronousBlockingQueue<E> {
    // - if the queue is empty, return a new CompletableFuture,
    //   that will be completed next time `add` is called
    // - if the queue is not empty, return a completed CompletableFuture,
         containing the first element of the list
    public CompletableFuture<E> poll();

    // if polling is in progress, complete the ongoing polling CompletableFuture.
    // otherwise, add the element to the queue
    public synchronized void add(E element);
}

如果有影响的话,应该只有一个轮询线程,并且轮询应该按顺序进行(在进行轮询时不会调用poll)。
我原以为在JVM中已经存在这个功能,但我找不到它,当然我更愿意使用JVM中的东西而不是自己编写。
另一个限制是,我被困在Java 8中(尽管我肯定很想知道在更近期的版本中存在什么)。

那个怎么样?这里 - Vault23
2
可能与 有关。有趣的问题。 - Naman
@Naman 是的,那些是我看到的...但它们并没有真正回答问题 :) - yannick1976
1
@Vault23 ConcurrentLinkedQueue并没有任何异步(或阻塞)的操作,它只是线程安全的。 - yannick1976
1个回答

3

最终我写了自己的类... 欢迎评论 :)

(这段文字涉及IT技术领域,意思是作者自己编写了一个类并且希望得到别人的反馈和建议。)
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;

public class AsynchronousBlockingQueue<E> {
    CompletableFuture<E> incompletePolling = null;
    Queue<E> elementsQueue = new LinkedList<>();

    // if the queue is empty, return a new CompletableFuture, that will be completed next time `add` is called
    // if the queue is not empty, return a completed CompletableFuture containing the first element of the list
    public synchronized CompletableFuture<E> poll() {
        // polling must be done sequentially, so this shouldn't be called if there is a poll ongoing.
        if (incompletePolling != null)
            throw new IllegalStateException("Polling is already ongoing");
        if (elementsQueue.isEmpty()) {
            incompletePolling = new CompletableFuture<>();
            return incompletePolling;
        }
        CompletableFuture<E> result = new CompletableFuture<>();
        result.complete(elementsQueue.poll());
        return result;
    }

    // if polling is in progress, complete the ongoing polling CompletableFuture.
    // otherwise, add the element to the queue
    public synchronized void add(E element) {
        if (incompletePolling != null) {
            CompletableFuture<E> result = incompletePolling;
            // removing must be done first because the completion could trigger code that needs the queue state to be valid
            incompletePolling = null;
            result.complete(element);
            return;
        }
        elementsQueue.add(element);
    }


}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接