如何检查一个任务是否已经在Python队列中?

21

我正在使用Python的线程和队列模块编写一个简单的网络爬虫。我获取页面,检查链接并将它们放入队列中。当某个线程完成对页面的处理后,它从队列中获取下一个页面。为了过滤掉已经访问过的页面,我使用一个数组来保存这些页面,并将要添加到队列中的链接与该数组进行比较。但是,如果有多个线程同时在不同的页面上获取相同的链接,那么它们会向队列中添加重复的链接。那么,如何判断某个URL是否已经在队列中,以避免将其再次放入队列中呢?


“array”?在Python中?您是指“list”、“tuple”或“dictionary”吗?如果您是指“array”,那么您使用的是哪种数组实现?numpy吗? - S.Lott
13个回答

22

如果您不关心处理项目的顺序,我建议使用Queue的一个子类,在内部使用set

class SetQueue(Queue):

    def _init(self, maxsize):
        self.maxsize = maxsize
        self.queue = set()

    def _put(self, item):
        self.queue.add(item)

    def _get(self):
        return self.queue.pop()

正如Paul McGuire所指出的那样,这将允许在从“待处理”集合中删除并尚未添加到“已处理”集合中之后添加重复项。为了解决这个问题,您可以在Queue实例中存储两个集合,但由于您正在使用更大的集合来检查项目是否已被处理,因此您可以同样回到正确排序请求的queue

class SetQueue(Queue):

    def _init(self, maxsize):
        Queue._init(self, maxsize) 
        self.all_items = set()

    def _put(self, item):
        if item not in self.all_items:
            Queue._put(self, item) 
            self.all_items.add(item)

与单独使用集合相比,这样做的优点在于,Queue的方法是线程安全的,因此您无需额外锁定即可检查其他集合。


1
这太优雅了。非常好,即使有第一个版本的缺点。 - Bite code
请注意此答案,关于覆盖put方法的内容。 - Yoel
请注意,它对于数字或字符串等简单内容可以正常工作,但是 SetQueue().put(["foo", "bar"]) 会失败。 - azmeuk
嗨,我完全是Python的新手,当我尝试子类化Queue时,我得到了“TypeError:调用元类基类函数时的错误()参数1必须是代码,而不是字符串”。我错过了什么吗? - Alexandre
不用在意了 - 我遇到的问题与我使用的是 Python 2.x 而这个示例使用的是 Python 3.x 有关。只是说一下,以防有人遇到相同的问题 :-) - Alexandre
显示剩余3条评论

4

如果不重写put方法,join调用将会永远阻塞。请参考https://github.com/python/cpython/blob/master/Lib/queue.py#L147

class UniqueQueue(Queue):

    def put(self, item, block=True, timeout=None):
        if item not in self.queue: # fix join bug
            Queue.put(self, item, block, timeout)

    def _init(self, maxsize):
        self.queue = set()

    def _put(self, item):
        self.queue.add(item)

    def _get(self):
        return self.queue.pop()

3
以下是对Lukáš Lalinský的后续solution进行改进。 重要的区别在于覆盖了put以确保unfinished_tasks准确,并且join按预期工作。
from queue import Queue

class UniqueQueue(Queue):

    def _init(self, maxsize):
        self.all_items = set()
        Queue._init(self, maxsize)

    def put(self, item, block=True, timeout=None):
        if item not in self.all_items:
            self.all_items.add(item)
            Queue.put(self, item, block, timeout)

1
这是完整版本的SetQueue
import Queue

class SetQueue(Queue.Queue):
    def _init(self, maxsize):
        Queue.Queue._init(self, maxsize)
        self.all_items = set()

    def _put(self, item):
        if item not in self.all_items:
            Queue.Queue._put(self, item)
            self.all_items.add(item)

    def _get(self):
        item = Queue.Queue._get(self)
        self.all_items.remove(item)
        return item

我认为更好的方式是这样调用父函数:super(SetQueue, self)._init()此外,_init 函数没有 maxsize 参数 - __init__ 接收该参数。 - lucidyan

1

使用:

url in q.queue

如果url在队列中,则返回True


如果已经被出队并处理,这就没有帮助了。 - S.Lott

1

我解决这个问题的方法(实际上我是用Scala而不是Python)是同时使用Set和Queue,只有当它们在Set中不存在时才将链接添加到Queue(和Set)中。

Set和Queue都封装在单个线程中,仅向消费者线程公开类似于队列的接口。

编辑:其他人建议使用SQLite,如果访问的URL集合需要增长,那么我也正在考虑这一点。(目前每次爬行只有几百页,因此轻松适合内存。)但是数据库也可以封装在Set本身中,因此消费者线程无需知道它。


1

SQLite非常简单易用,而且非常适合...只是一个建议。


2
如果您选择使用磁盘数据库,它还具有持久性的优势。如果出现未处理的异常,您可以修复错误并继续之前的进程。 - John La Rooy
2
这就好比说“使用if条件将完美契合”……与问题相关的N个上下文……使用SQLite会总体减缓整个过程。 - Angry 84

1
为什么只使用数组(最好是字典)来过滤已经访问过的内容?在将它们排队时,立即将它们添加到数组/字典中,仅当它们不在数组/字典中时才将它们添加到队列中。然后您有三个简单的独立事项:
  1. 尚未查看的链接(既不在队列中也不在数组/字典中)
  2. 计划访问的链接(在队列和数组/字典中)
  3. 已经访问过的链接(在数组/字典中,不在队列中)

保持所有先前排队的条目列表非常重要(我会使用一个集合而不是一个列表,不确定@sam对集合有什么问题)。如果您只在队列中搜索重复项,则可能会重新处理先前排队并已经处理过且已从队列中删除的条目。 - PaulMcG
是的,我的答案假设队列之外还有第二个数据结构(因此会出现“在队列和数组/字典中都存在”和“在数组/字典中存在但不在队列中”的情况)。您需要在将项目加入队列之前将其添加到“seen”数据结构中。您不会搜索队列,而是搜索您的“seen”数组。根据定义,“seen”数组中的任何内容都是在队列中或已经访问过了;这两种情况都不需要再次排队。主要的技巧是确保检查-如果未找到,则将其添加到“seen”并排队的操作是原子的。 - Amber

0

这是我的代码。希望它能帮助到某个人。

我将 set 添加为队列类的缓存。此缓存用于任务唯一性检查。此外,它还用于在队列类中实现 __contains__ 魔法方法。

唯一性可以有两种定义。 首先,任务在整个队列的生命周期内是唯一的。换句话说,即使任务完成并从队列中删除,队列也会拒绝接受重复任务。我将其实现为“be_unique_in_all_items”。其次,任务仅在队列中现有任务中是唯一的。这意味着任务可以在完成后被接受。我将其实现为“be_unique_in_existing_items”。

from queue import Queue
from traceback import print_exc


class MQueue(Queue):

    def __init__(self,
                 **kwargs):
        super().__init__(maxsize=kwargs.get("maxsize", 0))

        self._be_unique_in_existing_items = kwargs.get("be_unique_in_existing_items", False)
        self._be_unique_in_all_items = kwargs.get("be_unique_in_all_items", False)

        if self._be_unique_in_existing_items and self._be_unique_in_all_items:
            raise ValueError("Choose one criteria")

        self.cache = set()

    def get(self, *args, **kwargs):
        result = super().get(*args, **kwargs)

        if result:
            if self._be_unique_in_existing_items:
                self.cache.remove(result)

        return result

    def put(self, item, *args, **kwargs):
        if self._be_unique_in_existing_items or self._be_unique_in_all_items:
            if item in self.cache:
                raise ValueError("The given item exists in cache.")
            self.cache.add(item)

        return super().put(item, *args, **kwargs)

    def __contains__(self, item):
        if self._be_unique_in_existing_items or self._be_unique_in_all_items:
            return self.cache.__contains__(item)
        else:
            return Queue.__contains__(item)  # will raise you error


if __name__ == "__main__":
    # ordinary queue
    ordinary_queue_obj = MQueue(maxsize=0)

    ordinary_queue_obj.put(1)
    ordinary_queue_obj.put(1)

    try:
        print(1 in ordinary_queue_obj)
    except Exception:
        print_exc()

    # be unique in existing queue
    unique_in_existing_queue_obj = MQueue(maxsize=0,
                                          be_unique_in_existing_items=True)

    unique_in_existing_queue_obj.put(1)

    print(1 in unique_in_existing_queue_obj)

    try:
        unique_in_existing_queue_obj.put(1)
    except ValueError:
        print_exc()

    task = unique_in_existing_queue_obj.get()
    unique_in_existing_queue_obj.task_done()
    unique_in_existing_queue_obj.put(task)

    # be unique in all queue
    unique_in_all_queue_obj = MQueue(maxsize=0,
                                     be_unique_in_all_items=True)

    unique_in_all_queue_obj.put(1)
    print(1 in unique_in_all_queue_obj)

    try:
        unique_in_all_queue_obj.put(1)
    except ValueError:
        print_exc()

    task = unique_in_all_queue_obj.get()
    unique_in_all_queue_obj.task_done()
    try:
        print(task in unique_in_all_queue_obj)
        unique_in_all_queue_obj.put(task)
    except ValueError:
        print_exc()

注意:set 只能包含可哈希对象。对于不可哈希的对象,请使用 list。


0

遗憾的是,我没有足够的评分来评论Lukáš Lalinský最好的答案。

为了支持Lukáš Lalinský的SetQueue的第二个变体的SetQueue.task_done()SetQueue.join(),请在if语句中添加else分支:

def _put(self, item):
    if item not in self.all_items:
        Queue._put(self, item);
        self.all_items.add(item);
    else:
        self.unfinished_tasks -= 1;

已测试并可与Python 3.4一起使用。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接