有没有一个固定容量且具有自定义比较器的优先队列实现?

46

相关问题:

我有一个非常大的数据集(超过500万个项目),我需要从中获取N个最大的项目。最自然的方法是使用堆/优先队列,仅存储前N个项目。JVM(Scala/Java)有几个很好的优先队列实现,即:

前两个很好,但它们存储所有项目,在我的情况下会产生关键的内存开销。第三个(Lucene实现)没有这样的缺点,但是从文档中可以看出,它也不支持自定义比较器,这使得它对我毫无用处。

所以,我的问题是:是否有一个具有固定容量和自定义比较器的PriorityQueue实现?

更新。最终,我基于Peter的答案创建了自己的实现:

public class FixedSizePriorityQueue<E> extends TreeSet<E> {

    private int elementsLeft;

    public FixedSizePriorityQueue(int maxSize) {
        super(new NaturalComparator());
        this.elementsLeft = maxSize;
    }

    public FixedSizePriorityQueue(int maxSize, Comparator<E> comparator) {
        super(comparator);
        this.elementsLeft = maxSize;
    }


    /**
     * @return true if element was added, false otherwise
     * */
    @Override
    public boolean add(E e) {
        if (elementsLeft == 0 && size() == 0) {
            // max size was initiated to zero => just return false
            return false;
        } else if (elementsLeft > 0) {
            // queue isn't full => add element and decrement elementsLeft
            boolean added = super.add(e);
            if (added) {
                elementsLeft--;
            }
            return added;
        } else {
            // there is already 1 or more elements => compare to the least
            int compared = super.comparator().compare(e, this.first());
            if (compared == 1) {
                // new element is larger than the least in queue => pull the least and add new one to queue
                pollFirst();
                super.add(e);
                return true;
            } else {
                // new element is less than the least in queue => return false
                return false;
            }
        }
    }
}

(其中NaturalComparator取自这个问题)
请注意,此处的“自然比较器”是从上述问题中获取的。

3
对你的实现我有一些谦虚的意见: 1)你真的需要扩展TreeSet吗?“FixedSizePriorityQueue是一个TreeSet”听起来不太好,我会将set作为成员变量而不是继承。 2)通过使elementsLeft成为非final变量来为你的类添加状态并不是必要的。 3)你确定add方法总是返回正确的值吗? 4)防范null或非法参数是一个好习惯。 - Murat Derya Özen
@Murat:感谢您的建议。请随意在此处发布您改进后的实现作为答案。 - ffriend
不用谢,谢谢你 :) 我已经把它作为一个答案发布了。 - Murat Derya Özen
如果队列已满,并且您添加的元素已经存在于其中(并且不是最后一个将被轮询的元素),则您的解决方案将错误地使elementsLeft == 0,而在这种情况下它必须变为1。 - Inego
实际上,当队列已满时,您应该先添加元素并检查结果。如果结果为false,则无需轮询。 - Inego
10个回答

18

您如何判断Lucene不支持自定义比较器?

实际上它是抽象的,您需要实现抽象方法 lessThan(T a, T b)


1
哦,我没有注意到,谢谢!点赞,但由于它需要额外的库,所以我认为基于标准 API 的另一种实现更可取(请看我的更新)。 - ffriend

15

您可以使用SortedSet,例如TreeSet,配合自定义比较器,当大小达到N时删除最小项。


1
在这种情况下,TreeSet 的性能比 PriorityQueue 差。 - Tomasz Elendt
或者简单地说,使用优先队列做同样的事情?编辑:我下面添加了一个答案来展示我的意思。 - Ayush

14

虽然这是一个老问题,但对其他人可能仍有帮助。 你可以使用 Google 的 Java 库 guava 的 minMaxPriorityQueue


仍然标记为@Beta,但自8.0以来就存在,因此我认为它非常稳定。 - Dave Moten
这实际上并不是 MinMaxPriorityQueue 的预期使用方式,而且对于该用例性能表现不佳。 - Louis Wasserman
@LouisWasserman为什么呢?这不应该只是O(n*log(k))吗(其中n是数据集大小,k是队列的最大大小)? - Kranach
3
@Kranach,与正常的“PriorityQueue”相比,这个算法的恒定因子会明显更差。使用正常的“PriorityQueue”效果会好得多,或者更好的办法是,“Ordering.greatestOf”使用O(n)时间,O(k)内存的算法。(我们正在考虑废弃“MinMaxPriorityQueue”,因为它往往被误用于此种情况。) - Louis Wasserman

4

以下是我之前使用的实现方法。符合Peter的建议。

 public @interface NonThreadSafe {
 }

/**
 * A priority queue implementation with a fixed size based on a {@link TreeMap}.
 * The number of elements in the queue will be at most {@code maxSize}.
 * Once the number of elements in the queue reaches {@code maxSize}, trying to add a new element
 * will remove the greatest element in the queue if the new element is less than or equal to
 * the current greatest element. The queue will not be modified otherwise.
 */
@NonThreadSafe
public static class FixedSizePriorityQueue<E> {
    private final TreeSet<E> treeSet; /* backing data structure */
    private final Comparator<? super E> comparator;
    private final int maxSize;

    /**
     * Constructs a {@link FixedSizePriorityQueue} with the specified {@code maxSize}
     * and {@code comparator}.
     *
     * @param maxSize    - The maximum size the queue can reach, must be a positive integer.
     * @param comparator - The comparator to be used to compare the elements in the queue, must be non-null.
     */
    public FixedSizePriorityQueue(final int maxSize, final Comparator<? super E> comparator) {
        super();
        if (maxSize <= 0) {
            throw new IllegalArgumentException("maxSize = " + maxSize + "; expected a positive integer.");
        }
        if (comparator == null) {
            throw new NullPointerException("Comparator is null.");
        }
        this.treeSet = new TreeSet<E>(comparator);
        this.comparator = treeSet.comparator();
        this.maxSize = maxSize;
    }

    /**
     * Adds an element to the queue. If the queue contains {@code maxSize} elements, {@code e} will
     * be compared to the greatest element in the queue using {@code comparator}.
     * If {@code e} is less than or equal to the greatest element, that element will be removed and
     * {@code e} will be added instead. Otherwise, the queue will not be modified
     * and {@code e} will not be added.
     *
     * @param e - Element to be added, must be non-null.
     */
    public void add(final E e) {
        if (e == null) {
            throw new NullPointerException("e is null.");
        }
        if (maxSize <= treeSet.size()) {
            final E firstElm = treeSet.first();
            if (comparator.compare(e, firstElm) < 1) {
                return;
            } else {
                treeSet.pollFirst();
            }
        }
        treeSet.add(e);
    }

    /**
     * @return Returns a sorted view of the queue as a {@link Collections#unmodifiableList(java.util.List)}
     *         unmodifiableList.
     */
    public List<E> asList() {
        return Collections.unmodifiableList(new ArrayList<E>(treeSet));
    }
}

顺便说一下,我会欣赏任何反馈。

编辑:事实上,使用 TreeSet 似乎并不是很有效,因为对 first() 的调用似乎需要花费次线性时间。我将 TreeSet 更改为 PriorityQueue。修改后的 add() 方法如下:

   /**
     * Adds an element to the queue. If the queue contains {@code maxSize} elements, {@code e} will
     * be compared to the lowest element in the queue using {@code comparator}.
     * If {@code e} is greater than or equal to the lowest element, that element will be removed and
     * {@code e} will be added instead. Otherwise, the queue will not be modified
     * and {@code e} will not be added.
     *
     * @param e - Element to be added, must be non-null.
     */
    public void add(final E e) {
        if (e == null) {
            throw new NullPointerException("e is null.");
        }
        if (maxSize <= priorityQueue.size()) {
            final E firstElm = priorityQueue.peek();
            if (comparator.compare(e, firstElm) < 1) {
                return;
            } else {
                priorityQueue.poll();
            }
        }
        priorityQueue.add(e);
    }

1
谢谢!在我看来,对于基于PriorityQueue实现的代码,asList()方法应该是这样的:List<E> mutableList = new ArrayList<E>(priorityQueue); Collections.sort(mutableList, comparator); return Collections.unmodifiableList( mutableList ); - Abdull
2
@Abdull是正确的。你的javadoc说它返回一个排序视图,但PriorityQueue的迭代器不能保证按顺序返回元素。 - Dave Moten

4

我没有现成的可用的,但你可以查看我的实现,它具有类似要求的集合。

区别在于比较器,但如果你从PriorityQueue扩展,你就会拥有它。在每次添加时检查是否已达到限制,如果达到了-则删除最后一个项目。


不幸的是,标准的PriorityQueue没有提供轻松(且快速)删除最小元素的方法(这在堆结构方面是可以理解的)。因此,我决定在TreeSet之上实现固定大小的优先级队列。无论如何,谢谢。 - ffriend
@Robert Muir:poll()会移除队列头部,即最大的元素,而不是最小的。 - ffriend
@Robert Muir:哎呀,你又是对的!我想象了另一种堆实现(类似树形结构),甚至没有想到获取最小元素是很容易的,因此我确信head是顶部元素,并在javadocs中错过了这一点。现在我明白了。再次感谢! - ffriend
你的 topN 的惯用语大概是这样的:1. 如果 pq.size == N && item < pq.peek(),则返回(不具有竞争力);2. pq.offer(item);3. 如果 (pq.size > N),则 pq.poll()。Lucene 的方法在这里有两个优点:1. 如果 N 很小,你可以使用哨兵来填充以避免大小检查;2. 如果 item 是可变的,你只需更改头部并调用 updateTop(),而不是 offer + poll。 - Robert Muir

2

正是我正在寻找的内容。但是,实现中存在一个bug:

具体来说:如果elementsLeft > 0并且e已经包含在TreeSet中。在这种情况下,elementsLeft会减少,但TreeSet中元素的数量保持不变。

我建议将add()方法中相应的行替换为:

        } else if (elementsLeft > 0) {
        // queue isn't full => add element and decrement elementsLeft
        boolean added = super.add(e);
        if (added) {
            elementsLeft--;
        }
        return added;

1

Try this code:

public class BoundedPQueue<E extends Comparable<E>> {
/**
 * Lock used for all public operations
 */
private final ReentrantLock lock;

PriorityBlockingQueue<E> queue ;
int size = 0;

public BoundedPQueue(int capacity){
    queue = new PriorityBlockingQueue<E>(capacity, new CustomComparator<E>());
    size = capacity;
    this.lock = new ReentrantLock();

}

public boolean offer(E e) {


    final ReentrantLock lock = this.lock;
    lock.lock();
    E vl = null;
    if(queue.size()>= size)  {
        vl= queue.poll();
        if(vl.compareTo(e)<0)
            e=vl;
    }

    try {
        return queue.offer(e);
    } finally {
        lock.unlock();
    }


}

public E poll()  {

    return queue.poll();
}

public static class CustomComparator<E extends Comparable<E>> implements Comparator<E> {


    @Override
    public int compare(E o1, E o2) {
        //give me a max heap
         return o1.compareTo(o2) *-1;

    }
}

}

1

嗯,这是一个很老的问题,但我很困惑为什么还没有提出更简单的解决方案。

除非我漏掉了什么,否则可以使用 小根堆(Java默认的PriorityQueue实现) 来轻松解决,只需稍微改变一下,即当PriorityQueue的大小大于k时(如果我们要存储前k个元素),就弹出头部。

下面是我的一个例子:

public void storeKLargest(int[] nums, int k) {
    PriorityQueue<Integer> pq = new PriorityQueue<>(k+1);
    for(int num: nums){
        if(pq.size() < k || pq.peek() < num)
            pq.offer(num);
        if(pq.size() == k+1)
            pq.poll();
    }
}

我使用了一个整数优先队列PriorityQueue,但是用自定义对象替换并提供自定义比较器也很简单。除非我忽略了什么明显的问题,我认为这就是原帖作者所寻找的东西。

1
谢谢!这确实是一个简单的解决方案。然而,如果我理解正确的话,它可能不是最优的:真正的固定大小优先队列会很快开始拒绝新提供的元素,因为它们中的大多数都比队列中的元素低。如果您还跟踪最低元素,则检查新元素将像一次比较那样容易。然而,在您的实现中,添加新元素将始终更改集合,这是昂贵的。对您的实现的明显优化是将新元素与 pq.peek() 进行比较(因为它是最低的),仅在它更大时提供它。 - ffriend
2
@ffriend 是的,我已经编辑了答案以反映这一点。 - Ayush

1

如果你有guava,这里是我整理的一个相关内容。我认为它非常完整。如果我漏掉了什么,请告诉我。

你可以使用gauva ForwardingBlockingQueue,这样你就不必映射所有其他方法。

import com.google.common.util.concurrent.ForwardingBlockingQueue;

public class PriorityBlockingQueueDecorator<E> extends
        ForwardingBlockingQueue<E> {

    public static final class QueueFullException extends IllegalStateException {

        private static final long serialVersionUID = -9218216017510478441L;

    }

    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    private int maxSize;

    private PriorityBlockingQueue<E> delegate;

    public PriorityBlockingQueueDecorator(PriorityBlockingQueue<E> delegate) {
        this(MAX_ARRAY_SIZE, delegate);
    }

    public PriorityBlockingQueueDecorator(int maxSize,
            PriorityBlockingQueue<E> delegate) {
        this.maxSize = maxSize;
        this.delegate = delegate;
    }

    @Override
    protected BlockingQueue<E> delegate() {
        return delegate;
    }

    @Override
    public boolean add(E element) {
        return offer(element);
    }

    @Override
    public boolean addAll(Collection<? extends E> collection) {
        boolean modified = false;
        for (E e : collection)
            if (add(e))
                modified = true;
        return modified;
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit)
            throws InterruptedException {
        return offer(e);
    }

    @Override
    public boolean offer(E o) {
        if (maxSize > size()) {
            throw new QueueFullException();
        }
        return super.offer(o);
    }
}

0
创建一个有大小限制的优先队列,它可以存储 N 个最大的数字。
import java.util.*;

class Demo
{
    public static <E extends Comparable<E>> PriorityQueue<E> getPq(final int n, Comparator<E> comparator)
    {
        return new PriorityQueue<E>(comparator)
        {
            boolean full()
            {
                return size() >= n;
            }

            @Override 
            public boolean add(E e)
            {
                if (!full())
                {
                    return super.add(e);
                }
                else if (peek().compareTo(e) < 0)
                {
                    poll();
                    return super.add(e);
                }
                return false;
            }

            @Override
            public boolean offer(E e)
            {
                if (!full())
                {
                    return super.offer(e);
                }
                else if (peek().compareTo(e) < 0)
                {
                    poll();
                    return super.offer(e);
                }
                return false;
            }
        };
    }

    public static void printq(PriorityQueue pq)
    {
        Object o = null;
        while ((o = pq.poll()) != null)
        {
            System.out.println(o);
        }
    }

    public static void main (String[] args)
    {
        PriorityQueue<Integer> pq = getPq(2, new Comparator<Integer>(){
        @Override
        public int compare(Integer i1, Integer i2)
        {
            return i1.compareTo(i2);
        }
        });
        pq.add(4);
        pq.add(1);
        pq.add(5);
        pq.add(2);
        printq(pq);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接