如何从标准输入流(stdin)创建一个 rx.py 的Observable对象?

5

我正在尝试理解用于函数响应式编程(FRP)的rxpy库,但已经遇到了障碍。 我正在编写一个小程序,期望通过标准输入(sys.stdin)流传数据。

因此,我的问题很简单:如何创建一个rx.Observable实例,以异步方式从stdin读取数据? 是否有内置机制可以从流中创建Observable实例?

2个回答

4
我从未使用过RxPy,但我对RxJS有一些了解。 RxPy许多内置方法可以用于此目的,但我倾向于创建一个Observable工厂。以ObservableCreation.from_array为指导,现在让我们尝试一下。(注意:我没有运行这段代码,但它应该可以帮助你完成大部分工作)
from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler

class ObservableFile(Observable, metaclass=ObservableMeta):

    @classmethod
    def from_file(cls, readableFile, scheduler=None):
        scheduler = scheduler or current_thread_scheduler

        def subscribe(observer):
            def action(action1, state=None):
                try:
                    observer.on_next(readableFile.next())
                    action1(action)

                except StopIteration: # EOF
                    observer.on_completed()

            return scheduler.schedule_recursive(action)
        return AnonymousObservable(subscribe)

然后就像这样使用它:
res = rx.Observable.from_file(sys.stdin)

这将创建一个可观察对象,覆盖stdin的每一行直到EOF。它是阻塞的,但有绕过它的方法。也可以使用不同的调度程序进行调整。

3

我今天刚刚尝试过这个,

 d = rx.Observable.from_(sys.stdin).subscribe(print)

看起来是有效的(将行打印到标准输出)。from_from_iterable 的别名。d 是取消订阅的可丢弃对象。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接