使用大量head请求时,gevent/requests会挂起

4

我需要发送100k个head请求,使用的是在requests之上的gevent。我的代码运行一段时间后会卡住。我不确定它为什么会卡住,或者是在requests还是gevent中卡住了。我在requests和gevent中都使用了timeout参数。

请看下面的代码片段,让我知道我应该改变什么。

import gevent
from gevent import monkey, pool
monkey.patch_all()
import requests

def get_head(url, timeout=3):
    try:
        return requests.head(url, allow_redirects=True, timeout=timeout)
    except:
        return None

def expand_short_urls(short_urls, chunk_size=100, timeout=60*5):
    chunk_list = lambda l, n: ( l[i:i+n] for i in range(0, len(l), n) )
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)
    results = {}
    for i, _short_urls_chunked in enumerate(chunk_list(short_urls, chunk_size)):
        print '\t%d. processing %d urls @ %s' % (i, chunk_size, str(datetime.datetime.now()))
        jobs = [p.spawn(get_head, _short_url) for _short_url in _short_urls_chunked]
        gevent.joinall(jobs, timeout=timeout)
        results.update({_short_url:job.get().url for _short_url, job in zip(_short_urls_chunked, jobs) if job.get() is not None and job.get().status_code==200})
    return results 

我尝试过使用grequests,但它已被放弃,我也查看了github的pull requests,但它们也存在问题。


gevent是一个硬性要求吗? - That1Guy
看起来你正在使用 gevent 来简单地拥有一个工作池,是这样吗?那么另一种类型的工作池是否足够,或者你需要 geventtornado 中特定的某些东西? - That1Guy
1
https://docs.python.org/dev/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor 可以异步执行并发任务。如果您想使用轻量级线程,请使用标准的 asyncio https://docs.python.org/dev/library/asyncio.html?highlight=asyncio - noxdafox
你尝试过减小池的大小吗? - woozyking
是的,将其降低到50。重要的是gevent一直在消耗内存,最终占用了系统内存的约20%,然后就会挂起。我是否正确地重复使用了池工作进程?也就是说,在每次迭代中应该回收工作进程池,这样做是否正确? - vgoklani
显示剩余3条评论
2个回答

10

您所观察到的RAM使用情况主要来自于存储100,000个响应对象时积累的所有数据以及底层开销。我已经重现了您的应用场景,并对排名前15000个Alexa的URL发出了HEAD请求。这并不真的很重要:

  • 无论我使用gevent Pool(即每个连接一个greenlet)还是一组固定的greenlet,都会请求多个URL
  • 我设置池大小有多大

最终,RAM使用情况随着时间的推移而增加,达到相当大的数量。然而,我注意到从requests更改为urllib2已经将RAM使用量减少了约两倍。也就是说,我替换了

result = requests.head(url)

随着

request = urllib2.Request(url)
request.get_method = lambda : 'HEAD'
result = urllib2.urlopen(request)

其他一些建议:不要同时使用两种超时机制。Gevent的超时方法非常稳定,你可以像这样轻松地使用它:

def gethead(url):
    result = None
    try:
        with Timeout(5, False):
            result = requests.head(url)
    except Exception as e:
        result = e
    return result

看起来可能有些棘手,但它将返回None(在5秒钟之后非常精确地表示超时),任何代表通信错误的异常对象,或者响应。效果很好!

虽然这可能不是问题的一部分,在这种情况下,我建议保持工作进程的活动状态并让它们同时处理多个任务!生成greenlets的开销很小,实际上。尽管如此,这将是一组长寿的greenlets的一个非常简单的解决方案:

def qworker(qin, qout):
    while True:
        try:
            qout.put(gethead(qin.get(block=False)))
        except Empty:
            break

qin = Queue()
qout = Queue()

for url in urls:
    qin.put(url)

workers = [spawn(qworker, qin, qout) for i in xrange(POOLSIZE)]
joinall(workers)
returnvalues = [qout.get() for _ in xrange(len(urls))]

此外,你需要真正意识到这是一个大规模的问题,会带来非标准的问题。当我使用20秒的超时时间、100个工作线程和15000个要请求的URL复现了你的场景时,我很容易就得到了大量的套接字:

# netstat -tpn | wc -l
10074

也就是说,操作系统需要管理超过10000个套接字,其中大部分处于TIME_WAIT状态。我还观察到“打开的文件太多”错误,并通过sysctl调整了限制。当您请求100,000个URL时,您可能也会遇到这样的限制,并且需要采取措施防止系统饥饿。

还要注意您使用requests的方式,它会自动从HTTP重定向到HTTPS,并自动验证证书,所有这些肯定会消耗内存。

在我的测量中,当我将请求的URL数量除以程序运行时间时,我几乎从未达到100个响应/秒,这是由于与世界各地的外国服务器的高延迟连接导致的结果。我想你也受到这样的限制影响。将其余的体系结构调整为此限制,并且您可能能够以不太大的RAM使用率从互联网生成数据流到磁盘(或数据库)之间。

我应该具体回答您的两个主要问题:

我认为gevent/您使用的方式并不是问题所在。我认为您只是低估了任务的复杂性。它带来了讨厌的问题,并将您的系统推向极限。

  • 您的RAM使用问题:如果可以,请先使用urllib2。然后,如果仍然累积太高,请努力抵制累积。尝试产生稳定状态:您可能想开始将数据写入磁盘,并通常朝着对象可以成为垃圾收集的情况工作。

  • 您的代码“最终挂起”:这可能是由于您的RAM问题引起的。如果不是,则不要生成太多的greenlets,而是如所示重用它们。此外,进一步减少并发性,监视打开套接字的数量,必要时增加系统限制,并尝试找出程序挂起的确切位置。


顺便说一下,我现在使用期货,更加高效。 - vgoklani

2

我不确定这是否能解决您的问题,但您没有正确使用pool.Pool()。

请尝试以下方式:

def expand_short_urls(short_urls, chunk_size=100):
    # Pool() automatically limits your process to chunk_size greenlets running concurrently
    # thus you don't need to do all that chunking business you were doing in your for loop
    p = pool.Pool(chunk_size)
    print 'Expanding %d short_urls' % len(short_urls)

    # spawn() (both gevent.spawn() and Pool.spawn()) returns a gevent.Greenlet object
    # NOT the value your function, get_head, will return
    threads = [p.spawn(get_head, short_url) for short_url in short_urls]
    p.join()

    # to access the returned value of your function, access the Greenlet.value property
    results = {short_url: thread.value.url for short_url, thread in zip(short_urls, threads) 

如果线程的值不为空且线程的状态码为200,则返回结果。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接