我正在尝试理解用于函数响应式编程(FRP)的rxpy
库,但已经遇到了障碍。 我正在编写一个小程序,期望通过标准输入(sys.stdin
)流传数据。
因此,我的问题很简单:如何创建一个rx.Observable
实例,以异步方式从stdin读取数据? 是否有内置机制可以从流中创建Observable
实例?
我正在尝试理解用于函数响应式编程(FRP)的rxpy
库,但已经遇到了障碍。 我正在编写一个小程序,期望通过标准输入(sys.stdin
)流传数据。
因此,我的问题很简单:如何创建一个rx.Observable
实例,以异步方式从stdin读取数据? 是否有内置机制可以从流中创建Observable
实例?
RxPy
,但我对RxJS
有一些了解。
RxPy
有许多内置方法可以用于此目的,但我倾向于创建一个Observable工厂。以ObservableCreation.from_array
为指导,现在让我们尝试一下。(注意:我没有运行这段代码,但它应该可以帮助你完成大部分工作)from rx.observable import Observable, ObservableMeta
from rx.anonymousobservable import AnonymousObservable
from rx.concurrency import current_thread_scheduler
class ObservableFile(Observable, metaclass=ObservableMeta):
@classmethod
def from_file(cls, readableFile, scheduler=None):
scheduler = scheduler or current_thread_scheduler
def subscribe(observer):
def action(action1, state=None):
try:
observer.on_next(readableFile.next())
action1(action)
except StopIteration: # EOF
observer.on_completed()
return scheduler.schedule_recursive(action)
return AnonymousObservable(subscribe)
res = rx.Observable.from_file(sys.stdin)
我今天刚刚尝试过这个,
d = rx.Observable.from_(sys.stdin).subscribe(print)
看起来是有效的(将行打印到标准输出)。from_
是 from_iterable
的别名。d
是取消订阅的可丢弃对象。