Python中的非阻塞生成器

4

我正在QT应用程序中使用requests模块中的生成器函数,与requests-streaming示例中的函数基本相同:

import json
import requests

def get_stream():
    r = requests.get('http://httpbin.org/stream/20', stream=True)
    for line in r.iter_lines():
        if line:
            yield json.loads(line)

def consume_stream():
    for message in get_stream():
       #do something

然而,当没有任何响应时(例如来自Twitter Streaming API的不规则推文),生成器get_stream将会阻塞consume_stream方法。
这可能发生在任何生成器不立即生成内容并且必须等待传入消息等情况下,因此会阻塞消费者。
在Python中有没有模式可以以非阻塞的方式消耗生成器,即如果生成器生成内容,则处理其结果,否则执行其他操作直到下一次出现结果?

2
你如何确切地知道要继续使用生成器? - Simeon Visser
抱歉回复晚了:consume_stream方法基本上会更新GUI小部件(TreeModel),并提供“中止”按钮。目前,当get_stream没有产生任何内容时(例如等待传入的消息/推文),此按钮会被阻塞。我无法修改iter_lines方法,但我想在没有传入推文时产生类似“正在等待推文”的东西,并在有新推文到达时立即产生推文。因此,问题归结为是否可以控制消费者循环内生成器的消耗,还是必须等待。 - dorvak
4个回答

2

看一下生产者-消费者模式。通常使用 Queue 在Python中进行实现。

生成者通常在线程或另一个进程中运行(Queue支持任何一个),只需将消息放入队列即可。每当消费者感觉需要时,就可以从队列中取消息。此操作支持 timeout 参数。


1
如果您控制生成器函数,一个解决方案是在超时后抛出异常。 可能是这样的:

如果您控制生成器函数,则可以在超时期结束后引发异常。 可能是以下内容:

def get_stream(timeout=None):
    while message=read_message(timeout=timout):
        yield message

如果出现超时情况,那么read_message函数应该抛出TimeoutException或其他异常。当然,你仍然需要处理何时/如何重试/恢复的后勤工作。

1

正如Simeon在评论中提到的,你所描述的示例并不像你所说的那样简单。你需要注意一些细节。有不同的解决方案,根据您的用例,这些方案更或者更少有意义。您没有提供关于您真正想要做什么的详细信息,因此我将向您展示http://twistedmatrix.com/trac/wiki/QTReactor作为一个示例。有不同的解决方案/框架来实现异步消息队列。我认为这就是你要找的。


0

从Python 3.6开始,您可以使用异步生成器https://www.python.org/dev/peps/pep-0525/

import json
import requests

async def get_stream():
    r = requests.get('http://httpbin.org/stream/20', stream=True)
    for line in r.iter_lines():
        if line:
            yield json.loads(line)

async def consume_stream():
   await for message in get_stream():
       #do something

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接