你可以使用单个生成器和“订阅者生成器”:
subscribed_generators = []
def my_generator():
while true:
elem = yield
do_something(elem)
def publishing_generator():
for elem in the_infinite_stream():
for generator in subscribed_generators:
generator.send(elem)
subscribed_generators.extend([my_generator(), my_generator()])
for elem in publishing_generator():
pass
除了生成器函数外,您还可以创建一个具有以下方法的类:
__next__
、
__iter__
、
send
和
throw
。这样,您就可以修改
MyGenerator.__init__
方法,将新实例自动添加到
subscribed_generators
中。
这与一种“愚蠢实现”的基于事件的方法有些相似:
-
for elem in the_infinite_stream
类似于触发事件
-
for generator ...: generator.send
类似于向每个订阅方发送事件。
因此,实现“更复杂但结构化的解决方案”的一种方式是使用基于事件的方法:
- 例如,您可以使用
asyncio.Event
- 或者像
aiopubsub这样的第三方解决方案。
- 对于任何这些方法,应该为来自
the_infinite_stream
的每个元素触发事件,并且您的
my_generator
实例应该订阅这些事件。
其他方法也可以使用,最佳选择取决于您的任务细节以及在asyncio中如何使用事件循环。例如:
您可以将
the_infinite_stream
(或其包装器)实现为一些具有“光标”的类(跟踪不同订阅者在流中的当前位置的对象);然后,每个
my_generator
注册新的光标并使用它来获取无限流中的下一个项。在这种方法中,事件循环不会自动重新访问
my_generator
实例,如果这些实例“不相等”(例如具有某些“优先级平衡”),则可能需要重新访问这些实例。
中间生成器调用
my_generator
的所有实例(如前所述)。在这种方法中,
my_generator
的每个实例都会被事件循环自动重新访问。最有可能这种方法是线程安全的。
基于事件的方法:
- 使用
asyncio.Event
。与使用中间生成器类似。不是线程安全的。
- aiopubsub。
- 使用
观察者模式的某些内容。
使
the_infinite_generator
(或其包装器)成为“单例”,并“缓存”最新事件。其他答案中描述了一些方法。还可以使用其他“缓存”解决方案:
- 对于
the_infinite_generator
的每个实例,发出相同的元素一次(使用具有自定义
__new__
方法跟踪实例的类,或者使用具有返回“移位”迭代器的方法的相同类的实例
the_infinite_loop
),直到有人在
the_infinite_generator
的实例(或类)上调用特殊方法:
infinite_gen.next_cycle
。在这种情况下,始终应该有一些“最后的完成生成器/处理器”,在每个事件循环周期结束时将执行
the_infinite_generator().next_cycle()
。
- 与前面的类似,但允许同一事件在同一
my_generator
实例中触发多次(因此它们应该注意这种情况)。在这种方法中,可以使用
loop.call_later或loop.cal_at“定期”调用
the_infinite_generator().next_cycle()
。如果“订阅者”应该能够处理/分析:事件之间的延迟、速率限制、超时等等,则可能需要使用此方法。
还有许多其他解决方案。在不了解您当前的实现和不知道使用
the_infinite_loop
的生成器的期望行为的情况下,很难提出具体的建议。
如果我正确理解您对“共享”流的描述,那么您实际上需要“一个”
the_infinite_stream
生成器和一个相应的“处理器”。下面是一个尝试实现此功能的示例:
class StreamHandler:
def __init__(self):
self.__real_stream = the_infinite_stream()
self.__sub_streams = []
def get_stream(self):
sub_stream = []
self.__sub_streams.append(sub_stream)
while True:
while sub_stream:
yield sub_stream.pop(0)
next(self)
def __next__(self):
next_item = next(self.__real_stream)
for sub_stream in self.__sub_steams:
sub_stream.append(next_item)
some_global_variable = StreamHandler()
def my_generator():
for elem in some_global_variable.get_stream():
yield elem
但是,如果你的所有my_generator
对象都在无限流的同一点初始化,并且在循环内部“平等”迭代,那么这种方法会为每个“子流”(用作队列)引入“不必要”的内存开销。不必要:因为这些队列总是相同的(但可以进行优化:如果存在一些现有的“空”子流,则可以通过对"pop
"逻辑进行一些更改来将其重新用于新的子流)。还可以讨论许多其他实现和细微差别。
stream1
、stream2
等是否会并行使用,还是只能一个接一个地使用? - Quelklefinfinite_stream()
的事件A到达,我希望所有订阅者都能收到它。我不希望重播整个历史记录,而只是最后一个值(假设infinite_stream()
每分钟发出一次,当订阅者加入时,他不想等待1分钟才能接收到第一个值)。 - JonasVautherin