在Python中实现高效队列

17

我一直在尝试使用Python实现队列,但是遇到了问题。

我试图使用列表来实现队列数据结构,但是我无法想出如何使enqueuedequeue的时间复杂度为O(1)。

我看到的每个在线示例都似乎只是将enqueue操作追加到列表末尾,并从列表中删除第一个元素以进行dequeue操作。但这将使dequeue操作的时间复杂度为O(n)(其中n是列表的大小),对吗?

我是否错过了一些基本知识?或者您必须使用链表才能有效地实现队列?

import unittest

class Queue:
    def __init__(self):
        self._queue = []
        self.size = 0
        self.maxSize = 10

    def enqueue(self, item):
        if self.size < self.maxSize:
            self._queue.append(item)

    def dequeue(self):
        '''
        Removes an item from the front of the list. Remove first element of
        the array
        '''
        first = self._queue[0]
        del self._queue[0]
        return first

6
为什么不使用 collections.deque ? - Uri Goren
8个回答

32

正如Uri Goren上面指出的,Python标准库已经为您实现了一个高效的队列:collections.deque

不要做什么

避免通过手动编写自己的代码重复造轮子:

  • 链表实现。尽管这样做将dequeue()enqueue()方法的最坏时间复杂度降至O(1),但collections.deque类型已经可以实现此功能。鉴于其基于C语言实现,它还是线程安全的,可能更加节省空间和时间。
  • Python列表实现。正如我在下面指出的,使用Python列表实现enqueue()方法会将其最坏时间复杂度提高到O(n)。由于从基于C的数组以及Python列表中删除最后一个元素是一个常量时间操作,因此使用Python列表实现dequeue()方法会保持相同的最坏时间复杂度为O(1)。但是谁在乎呢?enqueue()仍然非常慢。

引用官方deque文档

虽然list对象支持类似的操作,但它们针对快速的固定长度操作进行了优化,并且对于pop(0)insert(0, v)操作需要O(n)内存移动成本,这些操作改变了底层数据表示的大小和位置。

更重要的是,deque还通过在初始化时传递maxlen参数来提供了对最大长度的开箱即用的支持,避免了手动尝试限制队列大小(由于if条件隐含的竞争条件而破坏线程安全)。

该怎么做

相反,按照以下方式使用标准的collections.deque类型实现您的Queue类:

from collections import deque

class Queue:
    '''
    Thread-safe, memory-efficient, maximally-sized queue supporting queueing and
    dequeueing in worst-case O(1) time.
    '''


    def __init__(self, max_size = 10):
        '''
        Initialize this queue to the empty queue.

        Parameters
        ----------
        max_size : int
            Maximum number of items contained in this queue. Defaults to 10.
        '''

        self._queue = deque(maxlen=max_size)


    def enqueue(self, item):
        '''
        Queues the passed item (i.e., pushes this item onto the tail of this
        queue).

        If this queue is already full, the item at the head of this queue
        is silently removed from this queue *before* the passed item is
        queued.
        '''

        self._queue.append(item)


    def dequeue(self):
        '''
        Dequeues (i.e., removes) the item at the head of this queue *and*
        returns this item.

        Raises
        ----------
        IndexError
            If this queue is empty.
        '''

        return self._queue.pop()

真相就在可怕的布丁里:

>>> queue = Queue()
>>> queue.enqueue('Maiden in Black')
>>> queue.enqueue('Maneater')
>>> queue.enqueue('Maiden Astraea')
>>> queue.enqueue('Flamelurker')
>>> print(queue.dequeue())
Flamelurker
>>> print(queue.dequeue())
Maiden Astraea
>>> print(queue.dequeue())
Maneater
>>> print(queue.dequeue())
Maiden in Black

一个人走很危险

实际上,也不要那样做。

你最好只是使用原始的 deque 对象,而不是尝试手动将该对象封装在 Queue 包装器中。上面定义的 Queue 类仅作为演示 deque API 的通用实用性的微不足道的示例。

deque 类提供了显著更多的功能,包括:

...迭代、pickling、len(d)reversed(d)copy.copy(d)copy.deepcopy(d)、使用 in 运算符进行成员测试以及诸如 d[-1] 的下标引用。

任何需要单端或双端队列的地方都可以使用 deque。就是这样。


14
在您的示例中,应该首先从队列中取出“黑色少女”吗?我认为您的“dequeue”方法应该使用self._queue.popleft()(使用popleft而不是pop)。话说回来,这只是展示了自己编写代码的另一个问题。:) - lindes
3
和 @lindes 所写的相关,这个回答实现的被称为后进先出队列 (LIFO),但它更常被称为栈。单独使用“队列”这个词通常用来指先进先出队列 (FIFO),因此容易混淆。 - recvfrom
你的代码注释说collections.dequeue是“内存高效”的,但是每个节点包含的指针数量是实际需要的两倍(因为只需要一个单向链表,不需要后向链接)。这就像建议使用二叉树数据结构来实现单向链表(只使用左子指针并忽略右子指针)一样,不是吗? - Don Hatch
不应将重新发明轮子列入“不要做什么”的范畴。尝试实现基本数据结构是有益的。当然,双端队列会快得多,但这并不是 OP 的问题的答案。我已经实现了一个队列,并将其与链表方法和使用 deque 进行了比较。性能差异并不是非常糟糕(慢8倍),它们可用于代码中非关键性能部分。有关详细信息,请参见下面的答案。 - Fırat Kıyak

8
您可以在队列类中使用头尾节点代替队列列表。
class Node:
    def __init__(self, item = None):
        self.item = item
        self.next = None
        self.previous = None


class Queue:
    def __init__(self):
        self.length = 0
        self.head = None
        self.tail = None

    def enqueue(self, value):
        newNode = Node(value)
        if self.head is None:
            self.head = self.tail = newNode
        else:
            self.tail.next = newNode
            newNode.previous = self.tail
            self.tail = newNode
        self.length += 1

    def dequeue(self):
        item = self.head.item
        self.head = self.head.next 
        self.length -= 1
        if self.length == 0:
            self.tail = None
        return item

1
Python自带的队列实现deque直接在CPython中以C代码实现。因此,很难用纯Python代码达到它的性能匹配。但这不应该阻止您尝试(与@Cecil Curry所说的不同),因为这会让您学得更好。
Shaon shaonty将其实现为一个链表。我使用字典实现如下:
class Queue:
    slots = ['_dict','_max_item','_min_item','_min_reshape_countdown_length', '_reshape_countdown']
    
    def __init__(self):
        #-------- Imlementation Constants ---------
        self._min_reshape_countdown_length = 1000
        #-------------------------------------------
        
        self._dict = {}
        self._max_item = None
        self._min_item = None
        self._reshape_countdown = self._min_reshape_countdown_length
    
    def appendleft(self, item): #queue method, named to be consistent with deque
        if not self._dict: #If the queue is empty
            self._dict = {0:item}
            self._max_item = self._min_item = 0
        else:
            self._dict[self._min_item - 1] = item
            self._min_item -= 1
        
        if self._reshape_countdown == 0:
            self._reshape_countdown = max(len(self._dict), self._min_reshape_countdown_length)
            self._reshape()
        else:
            self._reshape_countdown -=1
        
    def pop(self): #dequeue method, named to be consistent with deque
        if not self._dict:
            raise IndexError('Queue is empty!')
            
        item = self._dict[self._max_item]
        del self._dict[self._max_item]
        if not self._dict: #item was the last element
            self._max_item = self._min_item = None
        else:
            self._max_item -= 1
        
        if self._reshape_countdown == 0:
            self._reshape_countdown = max(len(self._dict), self._min_reshape_countdown_length)
            self._reshape()
        else:
            self._reshape_countdown -=1
        
        return item
        
    def _reshape(self):
        if not self._dict: #if empty, no reshaping is needed
            return
        
        positives = max(0, self._max_item)
        negatives = -min(0, self._min_item)
        size = len(self._dict)
        
        #If the data is evenly distributed around 0, then reshaping is not
        #necessary.
        if positives > size/3 and negatives > size/3:
            return
        
        center = (self._min_item + self._max_item)//2
        self._dict = {key-center: value for key,value in self._dict.items()}
        self._max_item = self._max_item - center
        self._min_item = self._min_item - center
    

请注意,这不是用于大型队列的好实现方式,因为创建大整数将会成为一个问题。可以通过将队列分成每个具有某个固定大小字典的块来解决该问题。
我使用以下代码比较了 Shaon shaonty的链表实现、我的字典实现和collections.deque。
import random

operations = []

queue_commands = 0
dequeue_commands = 0
for i in range(1000000):
    if queue_commands > dequeue_commands:
        operations.append(('q',random.random()) if random.random() < 1/2 else ('d',None))
    else:
        operations.append(('q',random.random()))
    
    if operations[-1][0] == 'q':
        queue_commands += 1
    else:
        dequeue_commands += 1

def test_case(queue):
    for command, element in operations:
        if command == 'q':
            queue.appendleft(element)
        else:
            queue.pop()
    

结果是:
%timeit test_case(Linked_Queue())
796 ms ± 2.13 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit test_case(Queue())
838 ms ± 1.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit test_case(deque())
120 ms ± 566 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

正如我所说的,用原始的Python代码来击败deque并不容易。
class Dummy:

    def appendleft(self,item):
        pass

    def pop(self):
        pass

%timeit test_case(Dummy())
176 ms ± 327 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

非常有见地。感谢分享您的性能测试结果。 - dingo

1
使用Python中的列表实现队列,处理入队和出队操作,与内置队列数据结构相同:
class queue:

    def __init__(self, max_size, size=0, front=0, rear=0):
        self.queue = [[] for i in range(5)] #creates a list [0,0,0,0,0]
        self.max_size = max_size
        self.size = size
        self.front = front
        self.rear = rear
    

    def enqueue(self, data):
        if not self.isFull():
            self.queue[self.rear] = data
            self.rear = int((self.rear + 1) % self.max_size)
            self.size += 1
        else:
            print('Queue is full')

    def dequeue(self):
        if not self.isEmpty():
            print(self.queue[self.front], 'is removed')
            self.front = int((self.front + 1) % self.max_size)
            self.size -= 1
        else:
            print('Queue is empty')

    def isEmpty(self):
        return self.size == 0

    def isFull(self):
        return self.size == self.max_size

    def show(self):
        print ('Queue contents are:')
        for i in range(self.size):
            print (self.queue[int((i+self.front)% self.max_size)])

        # driver program
        q = queue(5)
        q.enqueue(1)
        q.enqueue(2)
        q.enqueue(3)
        q.enqueue(4)
        q.enqueue(5)
        q.dequeue()
        q.show()

0

这是我使用数组实现队列的代码,enqueuedequeue都是O(1)操作。该实现基于CLRS。

class Queue:
    def __init__(self, length):
        """a queue of at most n elements using an array of n+1 element size"""
        self.length = length
        self.queue = [None]*(length+1)
        self.head = 0
        self.tail = 0

    def enqueue(self, x):
        if self.is_full():
            return 'Overflow'
        self.queue[self.tail] = x
        if self.tail == self.length:
            self.tail = 0
        else:
            self.tail = self.tail + 1

    def dequeue(self):
        if self.is_empty():
            return 'Underflow'
        x = self.queue[self.head]
        if self.head == self.length:
            self.head = 0
        else:
            self.head = self.head + 1
        return x

    def is_empty(self):
        if self.head == self.tail:
            return True
        return False

    def is_full(self):
        if self.head == self.tail+1 or (self.head == 0 and self.tail == self.length):
            return True
        return False

0
class Queue:
    def __init__(self,no):
        self.no = no 
        self.SQueue = []
        self.front = -1
        self.rear = -1
    def insert(self):
        if self.rear == self.no -1:
            print("Queue is Full.....")
        else:           
            if self.front == -1:
                self.front = 0
                self.rear = 0
            else :
                self.rear += 1
            n = int(input("enter an element :: "))
            self.SQueue.insert(self.rear, n)

    def delete(self):
        if self.front == -1 and self.front == no - 1:
            print("Queue is Empty.....")
        else:
            self.SQueue.pop(self.front)
            self.front +=1
    def disp(self):
        if self.front == -1 and self.front == no - 1:
            print("Queue is Empty.....")
        else:
            print("REAR \tELEMENT")
            for i in range(len(self.SQueue)):
                print(i," \t",self.SQueue[i])

no = int(input("ENTER  Size :: "))
q = Queue(no)
while(True):
    print(" 1: INSERT ")
    print(" 2: DELETE ")
    print(" 3: PRINT ")
    print(" 4: EXIT ")
    option = int(input("enter your choice :: "))

    if option == 1:
        q.insert()
 
    elif option == 2:
        q.delete()

    elif option == 3:
        q.disp()

    elif option == 4:
        print("you are exit!!!!!")
        break
    else:
        print("Incorrect option")
 

0
# Linear Queue Implementation using Array
class Queue:
    """Class with List as Array """
    def __init__(self):
        self.v_list=[]
    """Storing the in FIFO order"""
    def enqueue(self,value):
        self.v_list.append(value)
   """Removing Element from Queue in FIFO order"""
    def dequeue(self):
        if len(self.v_list)==0:
            print('Queue is Empty')
            return
        self.v_list.pop(0)

    def print_queue(self):
        print(self.v_list)

    def size_of_queue(self):
        return print(len(self.v_list))

object_queue=Queue()
object_queue.enqueue(0)
object_queue.enqueue(1)
object_queue.enqueue(2)
object_queue.enqueue(3)
object_queue.enqueue(4)
object_queue.enqueue(5)
object_queue.print_queue()
object_queue.dequeue()
object_queue.print_queue()
object_queue.dequeue()
object_queue.print_queue()
object_queue.size_of_queue()

#Circular Queue Implementation using Array
class CircularQueue():
    def __init__(self):
        """Class to hold the Postions for insert and delete"""
        self.start_pointer=0
        self.end_pointer=-1
        self.queue_list=[]
    """Storing the element in Circular order, with circular we can remove empty Block"""
    def enqueue(self,value):
        if len(self.queue_list)>10:
            print("Circular Queue is Full")
            return
        """Checking for Empty Block in Array and storing data and reseting the stat end point to process the element"""
        if 'None' in self.queue_list:
            self.queue_list[self.end_pointer]=value
            self.end_pointer+=1
        else:
            self.queue_list.append(value)
            self.end_pointer+=1
    """Removing element In FIFO order and reseting start ending point"""
    def dequeue(self):
        #self.queue_list.replace(self.queue_list[self.start_pointer],None)
        self.queue_list = [str(sub).replace(str(self.queue_list[self.start_pointer]),'None') for sub in self.queue_list] 
        self.start_pointer+=1
        for i ,j in enumerate(self.queue_list):
            if j=='None':
                self.end_pointer=i
                break
    """For Printing Queue"""            
    def print_cq(self):
        if len(self.queue_list)>10:
            print("Circular Queue is Full")
            return
        print(self.queue_list,self.start_pointer,self.end_pointer)


cir_object=CircularQueue()
cir_object.enqueue(0)
cir_object.enqueue(1)
cir_object.enqueue(2)
cir_object.enqueue(3)
cir_object.enqueue(4)
cir_object.enqueue(5)
cir_object.enqueue(6)
cir_object.enqueue(7)
cir_object.enqueue(8)
cir_object.enqueue(9)
#cir_object.print_cq()
cir_object.dequeue()
cir_object.dequeue()
cir_object.print_cq()
cir_object.enqueue(15)
cir_object.enqueue(20)
cir_object.print_cq()

-2
在出队方法中没有任何循环。您只需要执行列表操作。因此,出队的时间复杂度也是O(n)(线性)。
class Queue:
    def __init__(self):
       self.items=[]
    def enqueue(self,item):
       self.items.append(item)
    def dequeue(self):
       return self.items.pop(0)
    def isEmpty(self):
       return self.items==[]
    def __len__(self):
       return len(self.items)

1
Python列表在内部实现时是C数组,而不是链表。虽然向Python列表添加元素的平均和摊销最坏情况时间复杂度为O(1),但上述self.items.append(item)调用的非摊销最坏情况时间复杂度为**O(n)*(即线性而不是常数)。因此,您的enqueue实现确实*包含一个隐式循环。虽然该循环可能是通过基于高效连续内存复制的汇编实现的,但已知有一些高效的替代方法,无需进行任何这样的复制。 - Cecil Curry

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接