我该如何创建一个保留FIFO行为的Java PriorityBlockingQueue?

6
我正在尝试创建一个Java中的“优先级阻塞队列”,以维护具有相同优先级的元素的FIFO顺序。Oracle文档提供了一些帮助,但我仍然非常混乱。
我应该注意到以下主题对我来说都很新:泛型、接口作为类型和静态嵌套类。所有这些都在下面的类定义中发挥作用。特别是泛型很令人困惑,我肯定在这里完全搞砸了。
我已经包含了注释来标识我目前正在遇到的编译器错误。
几个具体问题:
1. 用类表示排队事件对象,并将实际队列作为静态类成员,这样做是否可以? 2. 将Oracle的FIFO事件“包装器”作为静态嵌套类包含在内是否合理? 3. 我至少在正确的轨道上吗?只使用一个外部类来完成所有操作?
以下是我写的类:
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class FIFOPBQEvent {

/**
 * First we define a static nested class which, when instantiated,
 * encapsulates the "guts" of the event - a FIFOPBQEvent - along with
 * a sequence number that assures FIFO behavior of events of like priority.
 *  
 * The following is lifted ALMOST verbatim (I added "static" and some 
 * comments) from Oracle documentation on adding FIFO-ness to a Priority
 * Blocking Queue: 
 * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/PriorityBlockingQueue.html
 * As the Oracle doc points out:
 * 
 * "A static nested class interacts with the instance members of its outer 
 * class (and other classes) just like any other top-level class. In 
 * effect, a static nested class is behaviorally a top-level class that 
 * has been nested in another top-level class for packaging convenience."
 *
 */
static class FIFOEntry<E extends Comparable<? super E>> implements
        Comparable<FIFOEntry<E>> {
    final static AtomicLong seq = new AtomicLong();
    final long seqNum;  // instance
    final E entry;

    public FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    public E getEntry() {
        return entry;
    }
    /** Here is implementation of Comparable */
    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry);
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1);
        return res;
    }
}

/**
 * Now we declare a single (static) PBQ of FIFO entries into which 
 * PBQFIFOEvents will be added and removed.
 */

/** FLAGGED AS ERROR BY COMPILER */
// Bound mismatch: The type FIFOPBQEvent is not a valid substitute for the
// bounded parameter <E extends Comparable<? super E>> of the type 
// FIFOPBQEvent.FIFOEntry<E>

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
    PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

/** 
 * And here are the "guts" of our event: the i.d. and state of the GUI widget 
 */
private ConsoleObject obj = ConsoleObject.UNDEFINED_OBJ; // widget that was affected
private ObjectState state = ObjectState.UNDEFINED_STATE; // the widget's new state

/** 
 * Constructor specifying the class variables 
 */
public FIFOPBQEvent(ConsoleObject theObj, ObjectState theState) {
    obj = theObj;
    state = theState;
}

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // The method put(FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>) in the type 
    // PriorityBlockingQueue<FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>> is not 
    // applicable for the arguments (FIFOPBQEvent)

    theQueue.put(this);
}

public static FIFOPBQEvent receiveEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // Type mismatch: cannot convert from FIFOPBQEvent.FIFOEntry<FIFOPBQEvent> 
    // to FIFOPBQEvent

    FIFOPBQEvent event = theQueue.take();
    return event;
}

/**
 * ConsoleEvent accessors
 */
public ConsoleObject getObj() {
    return this.obj;
}

public ObjectState getState() {
    return this.state;
}

/**
 * And for the first time, enums instead of public static final ints.
 */
public enum ConsoleObject {
    UNDEFINED_OBJ,
    RESERVED,

    /** Console keys */
    RESET,
    DISPLAY_MAR,
    SAVE,
    INSERT,
    RELEASE,
    START,
    SIE,
    SCE,

    /** Console toggle switches */
    POWER,
    PARITY_CHECK,
    IO_CHECK,
    OVERFLOW_CHECK,
    SENSE_SWITCH_1,
    SENSE_SWITCH_2,
    SENSE_SWITCH_3,
    SENSE_SWITCH_4
}

public enum ObjectState {
    UNDEFINED_STATE,

    /** Toggle switches */
    OFF,
    ON,

    /** Console keys */
    PRESSED,
}
}

FIFOPBQEvent没有实现Comparable<FIFOPBQEvent>,这导致了一些错误。你的FIFOEntry泛型声明需要一个实现Comparable接口的对象。 - user545680
4个回答

3
第一个错误是更重要的错误。它发生在FIFOPBQEvent类没有实现Comparable,而它必须作为FIFOEntry嵌套类的通用类型才能被考虑。这是因为您限制了E并说它extends Comparable<...>。基本上,您的FIFOPBQEvent类必须是可比较的,以便为队列提供优先级(可能基于事件类型)。
要修复错误,您需要:
  1. Change the header of your class to:

    public class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {
    
  2. add a compareTo method in the FIFOPBQEvent class; something like:

    public int compareTo (FIFOPBQEvent other) {
        // TODO - compare this and other for priority
        return 0;
    }
    
然后您需要在sendEvent方法中包装您的输入:
public void sendEvent () {
    theQueue.put(new FIFOEntry<FIFOPBQEvent> (this));
}

最后一个小错误就是你没有解包 FIFOEntry 对象。要修复这个问题,请将 receiveEvent 改为:

public static FIFOPBQEvent receiveEvent () {
    FIFOPBQEvent event = theQueue.take ().getEntry ();
    return event;
}

谢谢 - 我已经按照您的指示进行了更改,现在只有一个语句被标记:private static PriorityBlockingQueue> theQueue = PriorityBlockingQueue>();编译器诊断要求在()之间添加表达式。不知道为什么,因为PriorityBlockingQueue文档中记录了默认构造函数。 - Chap

2
让我们逐步了解你的代码。
static class FIFOEntry<E extends Comparable<? super E>> implements 
  Comparable<FIFOEntry<E>> {

这定义了一个带有泛型参数的类FIFOEntry。您已将泛型参数的类型限制为“实现自身可比较接口的任何对象”。

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
  PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

您在此处对PriorityBlockingQueue进行的声明是正确的,但是您对FIFOEntry<FIFOPBQEvent>的定义是不正确的。这是由于上述点 - 您已将FIFOEntry的类型限制为实现Comparable自身的任何内容,即它应该是

class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {

您的下一个问题是 -
public void sendEvent() {
  theQueue.put(this);
}

this 的类型为 FIFOPBQEvent,但队列仅接受 FIFOEntry 对象。要匹配您的队列签名,应将其更改为:

public void sendEvent() {
  // Requires FIFOPBQEvent to be Comparable
  theQueue.put(new FIFOEntry<FIFOPBQEvent>(this));     
}

您在receiveEvent()中也遇到了同样的问题——您的队列签名表示队列包含FIFOEntry对象,而您正在尝试提取FIFOPBQEvents。请注意调整队列签名以匹配所需的对象类型。

不想让我的回答变得混乱,泛型类型 <E extends Comparable<? super E>> 匹配“任何实现了自身或其超类的 Comparable 接口的对象”。也就是说,class Cat implements Comparable<Animal>class Cat implements Comparable<Cat> 一样符合这个签名。 - user545680
非常感谢您在这里的帮助。为什么这个类既要扩展又要实现接口Comparable?我以为只有_实现_接口。 - Chap
一个想法:如果我将FIFOEntry作为外部类,让它包含静态PBQ,并使FIFOPBQEntry成为静态嵌套(或内部?)类,那么这个代码会更容易理解吗?至少这样可以正确地反映FIFOEntry和PBQEntry之间的实际包含关系。现在看起来有点里外颠倒。 - Chap
1
@Chap 我会实际上有两个公共类。第一个将是像 PriorityFIFOEventQueue 这样的东西,用于保存两个静态方法 receiveEventsendEvent 以及包装器类 FIFOEntry(我会将其设置为私有)。第二个类将是事件本身,带有其信息和嵌套的 enum。这将至少在一个方向上将事件与队列解耦(事件不需要知道队列的存在)。 - 101100
@Chap 你问为什么这个类既“extends”又“implements”接口 - 实际上并不是这样的。在Java中,“extends”是用于“协变”的关键字,它的意义类似于“extends/implements”关键字,但并不完全相同。协变和逆变是仅限于泛型的概念,如果你想完全理解编译器的思维过程,我建议你查阅相关资料。 - user545680

1
根据@101100的建议,我重新设计了系统,将队列与事件解耦。这似乎使得它更简单易懂(和可重用),但遗憾的是我仍然不清楚一些概念。以下是PriorityFIFOEventQueue(为简洁起见,我省略了Event类)。我已经标注了我仍需要帮助的地方:
package ibm1620;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
 * This class represents a Priority Blocking Queue of FIFO entries.  Instantiating
 * it creates a queue.  It provides instance methods for sending and receiving
 * entries, a.k.a. events of type E, on the queue.
 */

以下标记为诊断: "类型PriorityFIFOEventQueue必须实现继承的抽象方法Comparable.compareTo(PriorityFIFOEventQueue)"
我相当确定我不想比较队列! 仍然不确定我在这里需要什么。
public class PriorityFIFOEventQueue<E extends Comparable<? super E>> implements Comparable<PriorityFIFOEventQueue<E>> {

/**
 * FIFOEntry is a static nested class that wraps an Entry and provides support for
 * keeping the Entries in FIFO sequence.
 */
private static class FIFOEntry<E> implements Comparable<FIFOEntry<E>> {

    /**
     * There's going to be one atomic seq per ... queue?  runtime?
     */

    final static AtomicLong seq = new AtomicLong();

    /**
     * FIFOEntry is simply an entry of type E, and a sequence number
     */
    final long seqNum; 
    final E    entry;

    /**
     * FIFOEntry() constructor
     */
    FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    /**
     * Accessor method for entry
     */
    E getEntry() {
        return entry;
    }

    /**
     * Here is implementation of Comparable that is called directly by PBQ.
     * We first invoke E's comparator which compares based on priority. 
     * If equal priority, order by sequence number (i.e. maintain FIFO).
     * */

在下面的代码中,包含 "compareTo" 的行被标记,并出现了错误提示信息 "The method compareTo(E) is undefined for the type E"。显然,我没有告诉编译器 "other" FIFOEntry 实现了 Comparable 接口。
    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry); // priority-based compare
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1); // FIFO compare
        return res;
    }
}

/**
 * Here is the PriorityBlockingQueue of FIFOEntries
 */

private PriorityBlockingQueue<FIFOEntry<E>> theQueue = 
    new PriorityBlockingQueue<FIFOEntry<E>>();

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent(E event) {
    theQueue.put(new FIFOEntry<E>(event));
}

/**
 * pollEvent() 
 * Will remove and return a ConsoleEvent from the queue, or return
 * null if queue is empty.
 */
public E pollEvent() {
    E event = null;
    FIFOEntry<E> aFIFOEvent = theQueue.poll();
    if (aFIFOEvent != null) {
        event = aFIFOEvent.getEntry();
        say("pollEvent() -> "+event);
    }
    else {
    }
    return event;
}

/**
 * receiveEvent() 
 * Will block if queue is empty.  Takes a FIFOEvent off the queue,
 * unwraps it and returns the 'E' payload.
 * 
 * @return
 */
public E receiveEvent() {
    E event = null;
    try {
        FIFOEntry<E> aFIFOEvent = theQueue.take();
        if (aFIFOEvent != null) {
            event = aFIFOEvent.getEntry();
            say("receiveEvent() -> "+event);
        }

    } catch (InterruptedException e) {
        say("InterruptedException in receiveEvent()");
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return event;
}

// for console debugging
static void say(String s) {
    System.out.println("ConsoleEvent: " + s);
}

}

好的 - 在我停止敲打我的头之后,我意识到FIFOEvent的类定义应该是:**private static class FIFOEntry<E extends Comparable<? super E>> implements Comparable<FIFOEntry<E>> {**。(嗯 - 正如Oracle文档所描述的那样!)然而,对于PriorityFIFOEventQueue的定义仍然不确定。 - Chap
1
为了理解“Comparable”和其他接口,我发现把它看作一个描述符(像易燃品一样)会有所帮助。因此,你应该能够逻辑地说,任何实现它的类都是可比较的。现在,你正在说“FIFOEntry”是可比较的,这是有道理的。但你也在说“PriorityFIFOEventQueue”是可比较的,这对我来说不合理;你为什么要将它们进行比较呢? - 101100
@101100 - 你说得对,我不想比较队列,所以我已经删除了“implements”子句。但是我确实想要比较“E” - 这让我觉得我应该写**public class PriorityFIFOEventQueue<E extends Comparable<E>>。(对我来说,“扩展”接口听起来很奇怪,但编译器似乎并不介意。)@Bringer128曾试图向我解释上面的<? super E>**部分,但我不确定我是否需要它。目前这个代码可以编译和正确运行,尽管在我感到舒适之前,我肯定需要复习有界类型参数。 - Chap
2
是的,需要使用 extends Comparable<E>,因为在 FIFOEntry 中你会在 E 上调用 compareTo<? super E> 部分并不是必需的,因为你的事件可能实现了 Comparable<EventClassName>。如果你曾经使用过队列来保存 Time,它们实现的不是完全相同的类型,而是一个超类型(Date),那么 ? super 将允许这样做,而仅使用 <E> 则不行。 - 101100
@101100:回过头来看,我被“Priority”这个词在Priority Blocking Queue中稍微误导了一下:我最好把它想象成一个“自然排序的阻塞队列”。PBQ的元素必须能够告诉它们应该在另一个元素的前面还是后面出现,但是决定的细节在它们的compareTo()方法中。所以……,对于FIFO维护,不严格需要使用包装类——我可以将该逻辑作为其compareTo()逻辑的一部分放入E中。(但是这种设计确实很方便。) - Chap

1

这里提供了一个实际替代PriorityBlockingQueue的解决方案,它可以维护具有相等优先级的项目的FIFO顺序。它会自动为用户进行所有包装/解包操作。

此代码是针对1.4 JVM编写的,并使用j.u.c.后移。在较新的JVM中使用并添加泛型应该很简单。

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.NoSuchElementException;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.PriorityBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicLong;

/**
 * A queue with all properties of a {@link PriorityBlockingQueue} but which additionally 
 * returns items with equal priority 
 * in a FIFO order.
 * In this respect, {@link PriorityBlockingQueue} explicitly gives no return order guarantees for equal priority elements.
 * 
 * This queue only accepts {@link Comparable} items. A custom {@link Comparator} cannot
 * be specified at construction time.
 * 
 *
 */
public final class FIFOPriorityBlockingQueue implements BlockingQueue {
    private final PriorityBlockingQueue q;

    public FIFOPriorityBlockingQueue() {
        q = new PriorityBlockingQueue();
    }

    public FIFOPriorityBlockingQueue(int initialCapacity) {
        q = new PriorityBlockingQueue(initialCapacity);
    }

    public boolean isEmpty() {
        return q.isEmpty();
    }

    public boolean addAll(Collection c) {
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    Iterator e = c.iterator();
    while (e.hasNext()) {
        if (add(e.next()))
        modified = true;
    }
    return modified;
    }

    /**
     * Always returns <code>null</code> as this {@link BlockingQueue} only accepts
     * {@link Comparable} objects and doesn't allow setting an own {@link Comparator} 
     * @return
     */
    public Comparator comparator() {
        return null;
    }

    public boolean containsAll(Collection c) {
        Iterator e = c.iterator();
        while (e.hasNext())
            if(!contains(e.next()))
            return false;

        return true;
    }

    public int size() {
        return q.size();
    }

    public int remainingCapacity() {
        return q.remainingCapacity();
    }

    public boolean removeAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public boolean retainAll(Collection c) {
        boolean modified = false;
        Iterator e = iterator();
        while (e.hasNext()) {
            if(!c.contains(e.next())) {
            e.remove();
            modified = true;
            }
        }
        return modified;
    }

    public Object remove() {
        return ((FIFOEntry)q.remove()).entry;
    }

    public Object element() {
        return ((FIFOEntry)q.element()).entry;
    }

    public boolean add(Object e) {
        return q.add(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e) {
        return q.offer(new FIFOEntry((Comparable)e));
    }

    public void put(Object e) {
        q.put(new FIFOEntry((Comparable)e));
    }

    public boolean offer(Object e, long timeout, TimeUnit unit) {
        return q.offer(new FIFOEntry((Comparable)e), timeout, unit);
    }

    public Object poll() {
        return ((FIFOEntry)q.poll()).entry;
    }

    public Object take() throws InterruptedException {
        return ((FIFOEntry)q.take()).entry;
    }

    public Object poll(long timeout, TimeUnit unit) throws InterruptedException {
        return ((FIFOEntry)q.poll(timeout, unit)).entry;
    }

    public Object peek() {
        return ((FIFOEntry)q.peek()).entry;
    }

    /**
     * If more than one equal objects are held by the queue, remove() will
     * remove any one of them, not necessarily the first or last inserted.
     */
    public boolean remove(Object o) {
        return q.remove(new FIFOEntry((Comparable)o));
    }

    public boolean contains(Object o) {
        return q.contains(new FIFOEntry((Comparable)o));
    }

    public Object[] toArray() {
        Object[] a = q.toArray();
        for (int i = 0; i < a.length; i++) { // unpacking
            a[i] = ((FIFOEntry)a[i]).entry;
        }
        return a;
    }

    public String toString() {
        return q.toString(); // ok, because each FIFOEntry.toString returns the toString of the entry 
    }

    public int drainTo(Collection c) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }



    public int drainTo(Collection c, int maxElements) {
        ArrayList tmp = new ArrayList(size());
        int n = q.drainTo(tmp, maxElements);
        for (Iterator it = tmp.iterator(); it.hasNext();) {
            FIFOEntry en = (FIFOEntry) it.next();
            c.add(en.entry);
        }
        return n;
    }

    public void clear() {
        q.clear();
    }



    public Object[] toArray(Object[] a) {
        Object[] res = q.toArray(a);
        for (int i = 0; i < res.length; i++) { // unpacking
            res[i] = ((FIFOEntry)res[i]).entry;
        }
        return res;
    }



    public Iterator iterator() {
        final Iterator it = q.iterator();
        return new Iterator() {
            public void remove() throws UnsupportedOperationException, IllegalStateException, ConcurrentModificationException {
                it.remove();
            }

            public Object next() throws NoSuchElementException, ConcurrentModificationException {
                return ((FIFOEntry)it.next()).entry;
            }

            public boolean hasNext() {
                return it.hasNext();
            }
        };
    }

    public int hashCode() {
        return q.hashCode();
    }

    public boolean equals(Object obj) {
        return q.equals(obj);
    }


    /**
     * Wrapping class which adds creation ordering to a {@link Comparable} object.
     * Based on the code in the javadoc for {@link PriorityBlockingQueue}
     * 
     *
     */
    private static class FIFOEntry implements Comparable {
        private final static AtomicLong seq = new AtomicLong();
        private final long seqNum;
        private final Comparable entry;
        public FIFOEntry(Comparable entry) {
            seqNum = seq.getAndIncrement();
            this.entry = entry;
        }

        public int compareTo(Object other) {
            FIFOEntry o = (FIFOEntry) other;
            int res = entry.compareTo(o.entry);
            if (res == 0 && o.entry != this.entry)
                res = (seqNum < o.seqNum ? -1 : 1);
            return res;
        }
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((entry == null) ? 0 : entry.hashCode());
            return result;
        }
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            FIFOEntry other = (FIFOEntry) obj;
            if (entry == null) {
                if (other.entry != null)
                    return false;
            } else if (!entry.equals(other.entry))
                return false;
            return true;
        }

        public String toString() {
            return entry.toString();
        }
    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接