将Python线程结果合并为一个列表

4
我正在尝试修改此处显示的解决方案:Python中发送100,000个HTTP请求的最快方法是什么?,不同的是我正在进行一个API请求,该请求返回一个字典,并且我希望所有这些API请求的最终结果是所有字典的列表。

这是我的代码--请考虑api_calls是一个包含每个要打开的URL以获取JSON请求的列表...

from threading import Thread
from Queue import Queue

concurrent = 200 

def doWork():
    while True:
        url = q.get()
        result = makeRequest(url[0])
        doSomethingWithResult(result, url)
        q.task_done()

def makeRequest(ourl):
    try:
        api_call = urlopen(ourl).read()
        result = json.loads(api_call)
        return result, ourl
    except:
        return "error", ourl

def doSomethingWithResult(result, url):
  print(url,result)

q = Queue(concurrent * 2)
for i in range(concurrent):
    t = Thread(target=doWork)
    t.daemon = True
    t.start()
try:
    for url in api_calls:
        q.put(url)
    q.join()
except KeyboardInterrupt:
    sys.exit(1)

像链接的示例一样,当前每行将成功打印url和结果。相反,我想在每个线程中将(url, result)添加到列表中,然后在最后将它们连接成一个主列表。我无法弄清楚如何拥有这个主列表并在最后加入结果。有人可以帮助我修改doSomethingWithResult吗?如果我正在执行一个大循环,我将只有一个空列表,并且在每个API请求之后将结果附加到列表中,但是现在我不知道如何模仿这个。我预计一个常见的回答将是使用异步I/O,如果这是建议,那么我希望有人提供实际完成与上面链接代码同样多的示例。

@TarnayKálmán,我希望在这里抄送您,因为您是该解决方案的原始作者。 - reese0106
1个回答

9
使用ThreadPool来处理。它会为您进行繁重的工作。这里是一个可以获取一些URL的工作示例。
import multiprocessing.pool
concurrent = 200 

def makeRequest(ourl):
    try:
        api_call = urlopen(ourl).read()
        result = json.loads(api_call)
        return "success", ourl
    except:
        return "error", ourl

def main():
    api_calls = [
        'http:http://jsonplaceholder.typicode.com/posts/{}'.format(i)
        for i in range(1,5)]

    # a thread pool that implements the process pool API.
    pool = multiprocessing.pool.ThreadPool(processes=concurrent)
    return_list = pool.map(makeRequest, api_calls, chunksize=1)
    pool.close()
    for status, data in return_list:
        print(data)

main()

这看起来确实更简单。但是,对我来说似乎并没有起作用。我收到了一个错误“AttributeError:'StdIn'对象没有'close'属性”,所以似乎存在一些问题?你有什么想法这里出了什么问题吗? - reese0106
我不明白这个例子怎么会导致那个错误。你有几个样本URL可以让我测试吗?否则,如果你有更新的代码失败了,请将其添加到你的原始问题中。 - tdelaney
@tdelaney 好的,现在似乎可以工作了!我能够切换“success”以返回结果对象,并且它看起来非常有效。从性能角度来看,如果我要进行10万个以上的API调用,您会采取任何不同的措施来加快速度吗? - reese0106
@tdelaney,当我提供100个URL时,这个程序运行得非常快,但是当我尝试扩展到1000个URL时,它的速度明显变慢了(比10倍还要多),而我希望将其扩展到100k+请求的级别。你能想到任何原因导致它无法扩展到超过100个请求吗? - reese0106
我不确定为什么,但有一些猜测... Python线程可能会崩溃,您可以使用一个进程池的两个层次结构,该进程池的数量等于系统上的CPU数量,其中每个进程工作器运行一个线程池,例如32个线程。对于这些池,请使用imap_unordered而不是map,并且不要在内存中保存列表,而是写入文件(每个进程工作器一个文件),然后稍后再读取它们。 - tdelaney
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接