Python - 如何更简洁地使用生成器?

4

(Python 3)

我正在使用 Python 生成器从队列中读取消息。

消费者读取队列消息后,需要能够告诉生成器删除队列消息,如果消息已经成功处理。

为了向 Python 生成器发送 .send(),似乎我必须先向生成器发送 .send(None)。这使得我的代码比我认为的要臃肿。

有人能建议一种方法,让 qconsumer.py 用更少的代码驱动生成器吗?我已经确定了我希望消除的哪些行。

简而言之,如何使下面的代码更紧凑,有什么建议可以删除行?

以下是 qconsumer.py 的代码:

from qserver import Qserver

myqserver = Qserver()

myproducer = myqserver.producer() # trying to eliminate this line
# first send to a generator must be None
myproducer.send(None) # trying to eliminate this line
for msg in myproducer:
    # do something with message
    print(msg)
    if messageprocessok:
        myproducer.send('delete')

以下是qserver.py的代码:

# -*- coding: utf-8 -*-
import boto
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message

QNAME = 'qinbound'
SQSREGION = 'us-west-1'

class Qserver():
    """A simple Q server."""

    def __init__(self, qname=None, sqsregion=None):
        self.qname = qname or QNAME
        self.sqsregion = sqsregion or SQSREGION
        self.sqsconn = boto.sqs.connect_to_region(self.sqsregion)
        self.q_in = self.sqsconn.get_queue(self.qname)

    def producer(self):
        while True:
            qmessage = self.q_in.read(wait_time_seconds=20)
            if qmessage is None:
                continue
            action = (yield qmessage.get_body())
            if action == 'delete':
                # if processing completed ok, clear message from this queue
                self.q_in.delete_message(qmessage)

3
你确定需要它吗?None是你可以发送给未启动生成器的唯一内容,但你并不一定需要发送。使用一个较简单的虚拟生成器对你的 qconsumer.py 代码进行简单测试时,即使没有调用 myproducer.send(None),也可以正常运行。然而,generator.send() 返回生成器的下一个值;你正在将迭代生成器与显式地使用 myproducer.send 获取下一个值混合在 for 循环中——你应该只使用其中之一。 - lanzz
@lanzz 嗯...听起来我的方法是错误的。 - Duke Dougal
2个回答

2

您当前的消费者正在丢弃消息,因为每个send调用都会返回一个。您应该改为执行以下操作:

myqserver = Qserver()
myproducer = myqserver.producer() 
messageprocessok = False
while True:
    msg = myproducer.send('delete' if messageprocessok else None)
    # do something with message
    print(msg)

或者,另一种选择是:
myqserver = Qserver()
myproducer = myqserver.producer() 
msg = next(myproducer)
while True:
    # do something with message
    print(msg)
    msg = myproducer.send('delete' if messageprocessok else None)

你需要分别调用Qserver()myqserver.producer()是因为你将producer作为一个类的方法。你也可以使用独立函数,或创建一个简单地返回Qserver().producer()的包装器函数。以下是独立函数版本:

def producer(qname=None, sqsregion=None):
    qname = qname or QNAME
    sqsregion = sqsregion or SQSREGION
    sqsconn = boto.sqs.connect_to_region(sqsregion)
    q_in = sqsconn.get_queue(qname)
    while True:
        qmessage = q_in.read(wait_time_seconds=20)
        if qmessage is None:
            continue
        action = (yield qmessage.get_body())
        if action == 'delete':
            # if processing completed ok, clear message from this queue
            q_in.delete_message(qmessage)

嗨,@Janne。在消息被处理之前,我们不知道消息是否已经成功处理。这是否符合您的解决方案? - Duke Dougal
@DukeDougal 第一次调用需要是 myproducer.send(None) 或者 next(myproducer)。我初始化 messageprocessok = False 来达到这个目的。在第一次调用时,生成器会从头开始运行,直到它产生第一条消息为止。 - Janne Karila
有价值的信息。我将另一个标记为解决方案,因为那是我要使用的,但你的方法似乎也很不错。StackOverflow说只能有一个解决方案...... - Duke Dougal

1

了解你想要做的事情后,我认为我会避免将send与迭代混合使用。将myqserver类本身作为迭代器似乎更有意义:

# -*- coding: utf-8 -*-
import boto
from boto.sqs.connection import SQSConnection
from boto.sqs.message import Message

QNAME = 'qinbound'
SQSREGION = 'us-west-1'

class Qserver():
    """A simple Q server."""
    _current_message = None

    def __init__(self, qname=None, sqsregion=None):
        self.qname = qname or QNAME
        self.sqsregion = sqsregion or SQSREGION
        self.sqsconn = boto.sqs.connect_to_region(self.sqsregion)
        self.q_in = self.sqsconn.get_queue(self.qname)

    def __iter__(self):
        return self

    def __next__(self):
        while True:
            qmessage = self.q_in.read(wait_time_seconds=20)
            if qmessage is not None:
                self._current_message = qmessage
                return qmessage

    next = __next__

    def delete_current(self):
        if self._current_message is not None:
            self.q_in.delete_message(self._current_message)

"使用方式将类似于:

"
from qserver import Qserver

myqserver = Qserver()
for msg in myqserver:
    # do something with message
    print(msg)
    if messageprocessok:
        myqserver.delete_current()

看起来很有趣。你可以解释一下你建议避免在迭代中混合发送的想法背后的思考过程吗?这样做有什么问题? - Duke Dougal
主要是因为它使事情更简单。仅消耗数据(通过 send)或仅生成数据(通过 yield)的生成器更容易理解和调试。如果两者都使用,则需要小心,因为正如上面提到的,send 会导致生成器恢复执行,直到遇到下一个 yield。如果下一个 yield 是语句而不是表达式,则结果将随着对 send 的调用返回。如果您对此不小心处理,就会丢失返回的值。 - Sahand
如果我想要能够向队列中写入消息,那么只需在您建议的Qserver实现中添加一个“writemessage”方法是否有意义? - Duke Dougal
抱歉,我之前(现已删除)的评论可能误解了您的问题。是的,像write_messageadd_message这样的方法可以将消息添加到队列中,这将是解决方案。 - Sahand

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接