共享的Python生成器

19

我试图使用Python生成器来复制响应式扩展中的“共享”可观察概念。

假设我有一个API,它提供给我一个无限流,我可以像这样使用它:

def my_generator():
    for elem in the_infinite_stream():
        yield elem

我可以多次使用这个生成器,就像这样:

stream1 = my_generator()
stream2 = my_generator()

而且 the_infinite_stream() 将会被调用两次(每个生成器调用一次)。

现在假设 the_infinite_stream() 是一个昂贵的操作。是否有一种方法可以在多个客户端之间“共享”生成器?看起来 tee 可以做到这一点,但我必须事先知道我需要多少独立的生成器。

这个想法是,在其他语言中(Java、Swift),使用响应式扩展(RxJava、RxSwift) "共享" 流,我可以方便地在客户端上复制流。我想知道如何在 Python 中实现这一点。

注意:我正在使用 asyncio。


“分享”是指缓存值以在下一个生成器中使用吗? - Quelklef
此外,这些流是“真实”的流(具有.seek、.close等功能),还是只是一般的可迭代对象?stream1stream2等是否会并行使用,还是只能一个接一个地使用? - Quelklef
你需要存储第一个和最后一个消费者之间的整个流 - 如果有一个消费者一直轮询无限流,而另一个消费者滞后,则会变得明显。如果您希望任何新的消费者从开头开始,您需要缓存所有内容。如果这对您来说没问题,那么实现起来很容易。 - liborm
如果您不想将整个历史记录重放给新的消费者,那么您更倾向于寻找一些发布-订阅方案。 - liborm
通过“分享”,我指的是liborm建议的发布-订阅机制:如果来自infinite_stream()的事件A到达,我希望所有订阅者都能收到它。我不希望重播整个历史记录,而只是最后一个值(假设infinite_stream()每分钟发出一次,当订阅者加入时,他不想等待1分钟才能接收到第一个值)。 - JonasVautherin
这确实是“内置”的响应式扩展,我想知道在 Python 中是否存在这样的机制(可能在 asyncio 中)。自己实现听起来并不是非常简单,容易出错,因此我宁愿使用一些标准的东西(就像我使用 RxJava 或 RxSwift 一样)=)。 - JonasVautherin
5个回答

8
我使用了tee实现并对其进行修改,使其可以从infinite_stream中生成多个生成器。
import collections

def generators_factory(iterable):
    it = iter(iterable)
    deques = []
    already_gone = []

    def new_generator():
        new_deque = collections.deque()
        new_deque.extend(already_gone)
        deques.append(new_deque)

        def gen(mydeque):
            while True:
                if not mydeque:             # when the local deque is empty
                    newval = next(it)       # fetch a new value and
                    already_gone.append(newval)
                    for d in deques:        # load it to all the deques
                        d.append(newval)
                yield mydeque.popleft()

        return gen(new_deque)

    return new_generator

# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1

为了仅缓存一些值,您可以将 already_gone = [] 更改为 already_gone = collections.deque(maxlen=size),并在 generators_factory 中添加 size=None 参数。

很有趣。并且它可以被扩展,使得already_gone具有max_size(比如说1),这样它只缓存一个值。正确吗? - JonasVautherin
这是否意味着如果gen1gen2并行使用,顺序可能不被尊重?主要是出于好奇...我正在尝试(目前失败)将其移植到asyncio中,那里可能会有所不同。 - JonasVautherin
生成元素的顺序?对于tee和我的实现都不适用。 - sanyassh
也许更好的做法是提出一个新问题,涉及到您的asyncio尝试失败,并参考这个问题。 - sanyassh
对于Tee来说很有用!我已经使用asyncio实现了一些功能,但我仍然需要弄清楚如何删除取消订阅的消费者。一旦我让它正常工作,可能会在codereview上发布。 - JonasVautherin

3
你可以重复调用 "tee" 来根据需要创建多个迭代器。
it  = iter([ random.random() for i in range(100)])
base, it_cp = itertools.tee(it)
_, it_cp2 = itertools.tee(base)
_, it_cp3 = itertools.tee(base)

Sample: http://tpcg.io/ZGc6l5.


实际上,您不需要重新生成基础。在第一次创建后,“_, it_cp2 = itertools.tee(base)”同样有效。 - tarkmeper
1
我应该已经编辑过了。昨晚我无法弄清楚为什么它能工作,以为可能需要删除注释。今天早上我看了一下C代码 - 当你使用N>1调用tee时,我认为这就是它有效地所做的事情。 - tarkmeper
有趣的是,我没有意识到我可以重复调用 tee。然而,这是否会缓存无限流的所有事件呢?我还没有找到在 tee 中设置 max_size 的方法(这可能是为什么 @Sanyash 重新实现它的原因?)。 - JonasVautherin
1
根据C代码,我认为它只缓存至少有1个迭代器仍需要数据的项目。所以是的,在上面的答案中,它将缓存所有这些项目。另一方面,如果不是基于全部的“base”,而是从已经移动的迭代器开始,则会从该点开始缓存,但不具备@Sanyash解决方案向后查看X(或无限)元素缓存的灵活性。 - tarkmeper

3
你可以使用单个生成器和“订阅者生成器”:
subscribed_generators = []


def my_generator():
    while true:
        elem = yield
        do_something(elem) # or yield do_something(elem) depending on your actual use

def publishing_generator():
    for elem in the_infinite_stream():
        for generator in subscribed_generators:
            generator.send(elem)

subscribed_generators.extend([my_generator(), my_generator()])

# Next is just ane example that forces iteration over `the_infinite_stream`
for elem in publishing_generator():
    pass

除了生成器函数外,您还可以创建一个具有以下方法的类:__next____iter__sendthrow。这样,您就可以修改MyGenerator.__init__方法,将新实例自动添加到subscribed_generators中。
这与一种“愚蠢实现”的基于事件的方法有些相似:
- for elem in the_infinite_stream 类似于触发事件 - for generator ...: generator.send 类似于向每个订阅方发送事件。
因此,实现“更复杂但结构化的解决方案”的一种方式是使用基于事件的方法:
- 例如,您可以使用asyncio.Event - 或者像aiopubsub这样的第三方解决方案。 - 对于任何这些方法,应该为来自the_infinite_stream的每个元素触发事件,并且您的my_generator实例应该订阅这些事件。
其他方法也可以使用,最佳选择取决于您的任务细节以及在asyncio中如何使用事件循环。例如:
您可以将the_infinite_stream(或其包装器)实现为一些具有“光标”的类(跟踪不同订阅者在流中的当前位置的对象);然后,每个my_generator注册新的光标并使用它来获取无限流中的下一个项。在这种方法中,事件循环不会自动重新访问my_generator实例,如果这些实例“不相等”(例如具有某些“优先级平衡”),则可能需要重新访问这些实例。
中间生成器调用my_generator的所有实例(如前所述)。在这种方法中,my_generator的每个实例都会被事件循环自动重新访问。最有可能这种方法是线程安全的。
基于事件的方法: - 使用asyncio.Event。与使用中间生成器类似。不是线程安全的。 - aiopubsub。 - 使用观察者模式的某些内容。
使the_infinite_generator(或其包装器)成为“单例”,并“缓存”最新事件。其他答案中描述了一些方法。还可以使用其他“缓存”解决方案: - 对于the_infinite_generator的每个实例,发出相同的元素一次(使用具有自定义__new__方法跟踪实例的类,或者使用具有返回“移位”迭代器的方法的相同类的实例the_infinite_loop),直到有人在the_infinite_generator的实例(或类)上调用特殊方法:infinite_gen.next_cycle。在这种情况下,始终应该有一些“最后的完成生成器/处理器”,在每个事件循环周期结束时将执行the_infinite_generator().next_cycle()。 - 与前面的类似,但允许同一事件在同一my_generator实例中触发多次(因此它们应该注意这种情况)。在这种方法中,可以使用loop.call_later或loop.cal_at“定期”调用the_infinite_generator().next_cycle()。如果“订阅者”应该能够处理/分析:事件之间的延迟、速率限制、超时等等,则可能需要使用此方法。
还有许多其他解决方案。在不了解您当前的实现和不知道使用the_infinite_loop的生成器的期望行为的情况下,很难提出具体的建议。
如果我正确理解您对“共享”流的描述,那么您实际上需要“一个”the_infinite_stream生成器和一个相应的“处理器”。下面是一个尝试实现此功能的示例:

class StreamHandler:
    def __init__(self):
        self.__real_stream = the_infinite_stream()
        self.__sub_streams = []

    def get_stream(self):
        sub_stream = []  # or better use some Queue/deque object. Using list just to show base principle
        self.__sub_streams.append(sub_stream)
        while True:
            while sub_stream:
                yield sub_stream.pop(0)
            next(self)

    def __next__(self):
        next_item = next(self.__real_stream)
        for sub_stream in self.__sub_steams:
            sub_stream.append(next_item)

some_global_variable = StreamHandler()
# Or you can change StreamHandler.__new__ to make it singleton, or you can create an instance at the point of creation of event-loop

def my_generator():
    for elem in some_global_variable.get_stream():
        yield elem

但是,如果你的所有my_generator对象都在无限流的同一点初始化,并且在循环内部“平等”迭代,那么这种方法会为每个“子流”(用作队列)引入“不必要”的内存开销。不必要:因为这些队列总是相同的(但可以进行优化:如果存在一些现有的“空”子流,则可以通过对"pop"逻辑进行一些更改来将其重新用于新的子流)。还可以讨论许多其他实现和细微差别。


我明白了。那么,该流的使用者可以实现自己的 my_generator() 并手动将其附加到 subscribed_generators 中,是吗?但是我不明白最后一个 for 循环的作用 :/ - JonasVautherin
1
@JonesV:你总是有一些“上层代码”来创建那些my_generator()调用:你可以在那里添加它。你不受限于subscribed_generators的全局变量,代码只是“解释一个概念”,许多其他方法也是可能的。请看我的答案:我已经更新了更多细节,并列出了一些其他潜在的方法。 - imposeren
非常感谢您的详细解释!我在我的代码中选择了Sanyash实现,我可以很容易地适应asyncio,因此我验证了那个答案。然而,这里的想法非常好,我不知道aiopubsub,它看起来非常不错! - JonasVautherin

3
考虑简单的class attributes

给定

def infinite_stream():
    """Yield a number from a (semi-)infinite iterator."""
    # Alternatively, `yield from itertools.count()`
    yield from iter(range(100000000))


# Helper
def get_data(iterable):
    """Print the state of `data` per stream."""
    return ", ".join([f"{x.__name__}: {x.data}" for x in iterable])

代码

class SharedIterator:
    """Share the state of an iterator with subclasses."""
    _gen = infinite_stream()
    data = None

    @staticmethod
    def modify():
        """Advance the shared iterator + assign new data."""
        cls = SharedIterator
        cls.data = next(cls._gen)

演示

给定客户端流的元组 (A, BC),

# Streams
class A(SharedIterator): pass
class B(SharedIterator): pass
class C(SharedIterator): pass


streams = A, B, C

让我们修改并打印一个迭代器的状态,它们之间共享同一个迭代器。
# Observe changed state in subclasses    
A.modify()
print("1st access:", get_data(streams))
B.modify()
print("2nd access:", get_data(streams))
C.modify()
print("3rd access:", get_data(streams))

输出

1st access: A: 0, B: 0, C: 0
2nd access: A: 1, B: 1, C: 1
3rd access: A: 2, B: 2, C: 2

尽管任何流都可以修改迭代器,但类属性在子类之间是共享的。

参见

  • Docs 关于 asyncio.Queue - 一个异步共享容器的替代品
  • Post 关于观察者模式 + asyncio

但是我觉得当 A 消费(modify())一个事件时,B 和 C 同时也会消费它。所以如果我执行 next(A); next(B); next(C);,他们不会同时获得相同的事件,对吗? - JonasVautherin
我相信Sanyash的答案正好符合我的要求。但是你的解决方案没有以同样的方式运作,是吗?也许我在那里错过了什么 =/。 - JonasVautherin
1
根据您的评论,“如果从infinite_stream()到达事件A,我希望所有订阅者都能收到它”,这是正确的。这可能是您实际想要的另一种解释。 - pylang

0
如果您有一个单一的生成器,您可以为每个“订阅者”使用一个队列,并将事件路由到每个订阅者,因为主要生成器产生结果。
这样做的好处是允许订阅者按照自己的节奏移动,并且可以在现有代码中很少更改原始源代码的情况下进行操作。
例如:
def my_gen():
  ...

m1 = Muxer(my_gen)
m2 = Muxer(my_gen)

consumer1(m1).start()
consumer2(m2).start()

当项目从主生成器中提取时,它们将被插入到每个侦听器的队列中。侦听器可以随时通过构造一个新的Muxer()来订阅:

import queue
from threading import Lock
from collections import namedtuple

class Muxer():
    Entry = namedtuple('Entry', 'genref listeners, lock')

    already = {}
    top_lock = Lock()

    def __init__(self, func, restart=False):
        self.restart = restart
        self.func = func
        self.queue = queue.Queue()

        with self.top_lock:
            if func not in self.already:
                self.already[func] = self.Entry([func()], [], Lock())
            ent = self.already[func]

        self.genref = ent.genref
        self.lock = ent.lock
        self.listeners = ent.listeners

        self.listeners.append(self)

    def __iter__(self):
        return self

    def __next__(self):
        try:
            e = self.queue.get_nowait()
        except queue.Empty:
            with self.lock:
                try:
                    e = self.queue.get_nowait()
                except queue.Empty:
                    try:
                        e = next(self.genref[0])
                        for other in self.listeners:
                            if not other is self:
                                other.queue.put(e)
                    except StopIteration:
                        if self.restart:
                            self.genref[0] = self.func()
                        raise
        return e

原始源代码,包括测试套件:

https://gist.github.com/earonesty/cafa4626a2def6766acf5098331157b3

单元测试同时运行多个线程,按顺序处理相同的生成事件。代码是有序的,在单个生成器访问期间获取锁。

注意事项:此版本使用单例来控制访问,否则可能会意外逃避其对包含的生成器的控制。它还允许包含的生成器是“可重启”的,这对我来说是一个有用的功能。没有“close()”功能,只是因为我不需要它。然而,这是__del__的适当用例,因为最后一个对侦听器的引用是清理的正确时间。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接