Python中实现可观察集合的推荐方法是什么?

6

我希望在Python中有一些可观察的集合/序列,使我能够监听更改事件,如添加新项或更新项:

list = ObservableList(['a','b','c'])
list.addChangeListener(lambda new_value: print(new_value))
list.append('a') # => should trigger the attached change listener

data_frame = ObservableDataFrame({'x': [1,2,3], 'y':[10,20,30]})
data_frame.addChangeListener(update_dependent_table_cells) # => allows to only update dependent cells instead of a whole table

A. 我发现了一个提供可观察集合实现的项目,看起来非常有前途:

https://github.com/dimsf/Python-observable-collections

它可以满足我的需求:

from observablelist import ObservableList

def listHandler(event):
    if event.action == 'itemsUpdated':
        print event.action + ', old items: ' + str(event.oldItems) + ' new items: ' + str(event.newItems) + ' at index: ' + str(event.index)
    elif event.action == 'itemsAdded' or event.action == 'itemsRemoved':
        print(event.action + ', items: ' + str(event.items) + ' at index: ' + str(event.index))

myList = ObservableList()
myList.attach(listHandler)

#Do some mutation actions, just like normal lists.
myList.append(10)
myList.insert(3, 0)

然而,最后一次更改是在6年前,我想知道是否有更加现代或内置于Python的替代方案?

B. 我还发现了RxPy:https://github.com/ReactiveX/RxPY

import rx
list = ["Alpha", "Beta", "Gamma"]
source = rx.from_(list)
source.subscribe(
   lambda value: print(value),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
) 

有没有一种方法可以保持订阅开放,以便我能够在订阅之后追加新值到列表中? 伪代码:

source.subscribe(..., keep_open = True)
source.append("Delta")  # <= does not work; there is no append method
source.close()

换句话说:我可以/应该将RxPy源用作可观察集合吗?

C. 在Python中处理事件和实现观察者模式似乎存在许多不同的可能性:

Python中的事件系统

Python观察者模式:示例,提示?

Python实现观察者模式的替代方法

使用装饰器在Python3中实现观察者模式

=> 在Python中实现可观察集合的推荐/Pythonic方式是什么?我应该使用过时的A还是改编自B的形式(似乎用于不同目的?)或甚至来自C的另一种策略?

=> 是否有计划以某种方式标准化这些可能性并直接在Python中包含默认可观察的集合?

相关问题,专门针对数据框:

如何使表格/电子表格(例如Pandas DataFrame)可观察,使用触发器或更改事件?

2个回答

3

我从未使用过RxPy,但它似乎是非常接近js / ts的rx模式的实现。

首先,您需要一个可观察对象,该对象用于将数据推入其中并进行观察。这是一个subject,可能是行为主题或重放主题。创建此主题,然后使用on_next()运算符向其中推送新值。

对于您的第二个问题,似乎您想要将多个可观察对象合并为一个可观察对象。有多种方法可以做到这一点,但最有可能的是,您正在寻找CombineLatest或Concat。查看operators

如果我采取您的第二个示例,则代码将如下所示:

from rx.subject.subject import Subject

list = ["Alpha", "Beta", "Gamma"]
# assuming that you want each item to be emitted one after the other
subject = Subject()
subject.subscribe(
    lambda value: print(value),
    on_error = lambda e: print("Error : {0}".format(e)),
    on_completed = lambda: print("Job Done!")
)
subject.on_next('Alpha')
subject.on_next('Beta')
subject.on_next('Gamma')
subject.on_next('Delta')

如果您使用BehaviorSubject,您将能够提供一个初始值,当新的观察者订阅时,它将接收到最后发出的值。 如果您使用ReplaySubject,可以提供值,然后订阅,观察者将收到主题发出的到这一点为止的所有值。

0

刚刚发现了一个基于RxPy的实现。 最后一次更改是在2018年,似乎还没有准备好支持RxPY 3.x。

https://github.com/shyam-s00/ObservableCollections

https://github.com/shyam-s00/ObservableCollections/issues/1

from reactive.ObservableList import ObservableList

ol = ObservableList([1, 2, 3, 4])
ol.when_collection_changes() \
    .map(lambda x: x.Items) \
    .subscribe(print, print)

ol.append(5)

它提供了

  • ObservableList
  • ObservableDict
  • ObservableSet

还可以参考https://github.com/ReactiveX/RxPY/issues/553


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接