Python中具有依赖关系的懒加载数据流(类似电子表格)属性

13
我的问题如下:我有一些Python类,其中的属性是从其他属性派生而来的;一旦计算出这些属性,它们就应该被缓存,并且每当基本属性发生更改时,缓存的结果应该被无效化。
我可以手动完成这项工作,但如果属性数量增加,维护起来似乎非常困难。因此,我希望在我的对象内部拥有类似于Makefile规则的东西,以自动跟踪需要重新计算的内容。
所需的语法和行为应该类似于以下内容:
# this does dirty magic, like generating the reverse dependency graph,
# and preparing the setters that invalidate the cached values
@dataflow_class
class Test(object):

    def calc_a(self):
        return self.b + self.c

    def calc_c(self):
        return self.d * 2

    a = managed_property(calculate=calc_a, depends_on=('b', 'c'))
    b = managed_property(default=0)
    c = managed_property(calculate=calc_c, depends_on=('d',))
    d = managed_property(default=0)


t = Test()

print t.a
# a has not been initialized, so it calls calc_a
# gets b value
# c has not been initialized, so it calls calc_c
# c value is calculated and stored in t.__c
# a value is calculated and stored in t.__a

t.b = 1
# invalidates the calculated value stored in self.__a

print t.a
# a has been invalidated, so it calls calc_a
# gets b value
# gets c value, from t.__c
# a value is calculated and stored in t.__a

print t.a
# gets value from t.__a

t.d = 2
# invalidates the calculated values stored in t.__a and t.__c

那么,是否已经有类似的东西可用,还是我需要开始实现自己的?如果是后者,欢迎提出建议 :-)

1
要自己编写,类似于Enthought Traits这样的工具可能会有用,可以处理变更通知的低级位和将对象属性作为一等实体来处理。 - millimoose
使用其中一个函数缓存装饰器的配方,该装饰器基于调用参数进行缓存?将property_getter函数设置为调用缓存装饰计算函数? - MattH
对抗Python的严格求值模型很困难。看起来你正在尝试用Python编写Haskell程序。你想用这个解决什么问题? - Simon Bergot
@Simon 基本上我有两个接口,一个用于更改对象状态(作为异步回调),另一个使用派生值。由于计算这些值可能很昂贵,因此需要将它们缓存,并且它们可以在更新时查询多次或根本不查询(因此如果不使用派生值,则计算派生值将是浪费时间)。我认为它更类似于惰性电子表格而不是Haskell,因为它具有可变性。 - fortran
@fortran:将缓存配方调整(或设置)为最大尺寸为1。 - MattH
显示剩余2条评论
3个回答

8

这里,这应该可以解决问题。 描述符机制(通过该机制语言实现“属性”)足以满足您的需求。

如果下面的代码在某些边缘情况下无法正常工作,请写信给我。

class DependentProperty(object):
    def __init__(self, calculate=None, default=None, depends_on=()):
        # "name" and "dependence_tree" properties are attributes
        # set up by the metaclass of the owner class
        if calculate:
            self.calculate = calculate
        else:
            self.default = default
        self.depends_on = set(depends_on)

    def __get__(self, instance, owner):
        if hasattr(self, "default"):
            return self.default
        if not hasattr(instance, "_" + self.name):
            setattr(instance, "_" + self.name,
                self.calculate(instance, getattr(instance, "_" + self.name + "_last_value")))
        return getattr(instance, "_" + self.name)

    def __set__(self, instance, value):
        setattr(instance, "_" + self.name + "_last_value", value)
        setattr(instance, "_" + self.name, self.calculate(instance, value))
        for attr in self.dependence_tree[self.name]:
            delattr(instance, attr)

    def __delete__(self, instance):
        try:
            delattr(instance, "_" + self.name)
        except AttributeError:
            pass


def assemble_tree(name,  dict_, all_deps = None):
    if all_deps is None:
        all_deps = set()
    for dependance in dict_[name].depends_on:
        all_deps.add(dependance)
        assemble_tree(dependance, dict_, all_deps)
    return all_deps

def invert_tree(tree):
    new_tree = {}
    for key, val in tree.items():
        for dependence in val:
            if dependence not in new_tree:
                new_tree[dependence] = set()
            new_tree[dependence].add(key)
    return new_tree

class DependenceMeta(type):
    def __new__(cls, name, bases, dict_):
        dependence_tree = {}
        properties = []
        for key, val in dict_.items():
            if not isinstance(val, DependentProperty):
                continue
            val.name = key
            val.dependence_tree = dependence_tree
            dependence_tree[key] = set()
            properties.append(val)
        inverted_tree = {}
        for property in properties:
            inverted_tree[property.name] = assemble_tree(property.name, dict_)
        dependence_tree.update(invert_tree(inverted_tree))
        return type.__new__(cls, name, bases, dict_)


if __name__ == "__main__":
    # Example and visual test:

    class Bla:
        __metaclass__ = DependenceMeta

        def calc_b(self, x):
            print "Calculating b"
            return x + self.a

        def calc_c(self, x):
            print "Calculating c"
            return x + self.b

        a = DependentProperty(default=10)    
        b = DependentProperty(depends_on=("a",), calculate=calc_b)
        c = DependentProperty(depends_on=("b",), calculate=calc_c)




    bla = Bla()
    bla.b = 5
    bla.c = 10

    print bla.a, bla.b, bla.c
    bla.b = 10
    print bla.b
    print bla.c

看起来不错,基本上就是我正在编写的内容...谢谢 :) - fortran

1
我想要类似于Makefile规则的东西,那就用一个吧!你可以考虑这个模型:
- 一个规则 = 一个Python文件 - 一个结果 = 一个*.data文件 - 管道是通过makefile或其他依赖分析工具(cmake、scons)实现的
我们公司的硬件测试团队使用这样的框架进行深入的探索性测试:
- 你可以轻松地集成其他语言和工具 - 你会得到一个稳定和经过验证的解决方案 - 计算可以在多个CPU/计算机上分布 - 你可以跟踪值和规则的依赖关系 - 中间值的调试很容易
这种方法的(大)缺点是你必须放弃Python的import关键字,因为它会创建一个隐式(且未跟踪的)依赖关系(有解决方法)。

我猜想为每个查询/更新生成进程和创建文件对于我的需求来说太慢了。这应该是一个软实时服务。 - fortran
1
此外,Python的内省能力已经足够满足所需。 - jsbueno

1
import collections

sentinel=object()

class ManagedProperty(object):
    '''
    If deptree = {'a':set('b','c')}, then ManagedProperties `b` and
    `c` will be reset whenever `a` is modified.
    '''
    def __init__(self,property_name,calculate=None,depends_on=tuple(),
                 default=sentinel):
        self.property_name=property_name
        self.private_name='_'+property_name 
        self.calculate=calculate
        self.depends_on=depends_on
        self.default=default
    def __get__(self,obj,objtype):
        if obj is None:
            # Allows getattr(cls,mprop) to return the ManagedProperty instance
            return self
        try:
            return getattr(obj,self.private_name)
        except AttributeError:
            result=(getattr(obj,self.calculate)()
                    if self.default is sentinel else self.default)
            setattr(obj,self.private_name,result)
            return result
    def __set__(self,obj,value):
        # obj._dependencies is defined by @register
        map(obj.__delattr__,getattr(obj,'_dependencies').get(self.property_name,tuple()))
        setattr(obj,self.private_name,value)        
    def __delete__(self,obj):
        if hasattr(obj,self.private_name):
            delattr(obj,self.private_name)

def register(*mproperties):
    def flatten_dependencies(name, deptree, all_deps=None):
        '''
        A deptree such as {'c': set(['a']), 'd': set(['c'])} means
        'a' depends on 'c' and 'c' depends on 'd'.

        Given such a deptree, flatten_dependencies('d', deptree) returns the set
        of all property_names that depend on 'd' (i.e. set(['a','c']) in the
        above case).
        '''
        if all_deps is None:
            all_deps = set()
        for dep in deptree.get(name,tuple()):
            all_deps.add(dep)
            flatten_dependencies(dep, deptree, all_deps)
        return all_deps

    def classdecorator(cls):
        deptree=collections.defaultdict(set)
        for mprop in mproperties:
            setattr(cls,mprop.property_name,mprop)
        # Find all ManagedProperties in dir(cls). Note that some of these may be
        # inherited from bases of cls; they may not be listed in mproperties.
        # Doing it this way allows ManagedProperties to be overridden by subclasses.
        for propname in dir(cls):
            mprop=getattr(cls,propname)
            if not isinstance(mprop,ManagedProperty):
                continue
            for underlying_prop in mprop.depends_on:
                deptree[underlying_prop].add(mprop.property_name)

        # Flatten the dependency tree so no recursion is necessary. If one were
        # to use recursion instead, then a naive algorithm would make duplicate
        # calls to __delete__. By flattening the tree, there are no duplicate
        # calls to __delete__.
        dependencies={key:flatten_dependencies(key,deptree)
                      for key in deptree.keys()}
        setattr(cls,'_dependencies',dependencies)
        return cls
    return classdecorator

这些是我用来验证其行为的单元测试。
if __name__ == "__main__":
    import unittest
    import sys
    def count(meth):
        def wrapper(self,*args):
            countname=meth.func_name+'_count'
            setattr(self,countname,getattr(self,countname,0)+1)
            return meth(self,*args)
        return wrapper

    class Test(unittest.TestCase):
        def setUp(self):
            @register(
                ManagedProperty('d',default=0),
                ManagedProperty('b',default=0),
                ManagedProperty('c',calculate='calc_c',depends_on=('d',)),
                ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
            class Foo(object):
                @count
                def calc_a(self):
                    return self.b + self.c
                @count
                def calc_c(self):
                    return self.d * 2
            @register(ManagedProperty('c',calculate='calc_c',depends_on=('b',)),
                      ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
            class Bar(Foo):
                @count
                def calc_c(self):
                    return self.b * 3
            self.Foo=Foo
            self.Bar=Bar
            self.foo=Foo()
            self.foo2=Foo()            
            self.bar=Bar()

        def test_two_instances(self):
            self.foo.b = 1
            self.assertEqual(self.foo.a,1)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)

            self.assertEqual(self.foo2.a,0)
            self.assertEqual(self.foo2.b,0)
            self.assertEqual(self.foo2.c,0)
            self.assertEqual(self.foo2.d,0)


        def test_initialization(self):
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)            
            self.assertEqual(self.foo.b,0)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)
            self.assertEqual(self.bar.a,0)
            self.assertEqual(self.bar.b,0)
            self.assertEqual(self.bar.c,0)
            self.assertEqual(self.bar.d,0)

        def test_dependence(self):
            self.assertEqual(self.Foo._dependencies,
                             {'c': set(['a']), 'b': set(['a']), 'd': set(['a', 'c'])})

            self.assertEqual(self.Bar._dependencies,
                             {'c': set(['a']), 'b': set(['a', 'c'])})

        def test_setting_property_updates_dependent(self):
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)

            self.foo.b = 1
            # invalidates the calculated value stored in foo.a
            self.assertEqual(self.foo.a,1)
            self.assertEqual(self.foo.calc_a_count,2)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)

            self.foo.d = 2
            # invalidates the calculated values stored in foo.a and foo.c
            self.assertEqual(self.foo.a,5)
            self.assertEqual(self.foo.calc_a_count,3)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,4)
            self.assertEqual(self.foo.d,2)

            self.assertEqual(self.bar.a,0)
            self.assertEqual(self.bar.calc_a_count,1)
            self.assertEqual(self.bar.b,0)
            self.assertEqual(self.bar.c,0)
            self.assertEqual(self.bar.calc_c_count,1)
            self.assertEqual(self.bar.d,0)

            self.bar.b = 2
            self.assertEqual(self.bar.a,8)
            self.assertEqual(self.bar.calc_a_count,2)
            self.assertEqual(self.bar.b,2)
            self.assertEqual(self.bar.c,6)
            self.assertEqual(self.bar.calc_c_count,2)
            self.assertEqual(self.bar.d,0)

            self.bar.d = 2
            self.assertEqual(self.bar.a,8)
            self.assertEqual(self.bar.calc_a_count,2)            
            self.assertEqual(self.bar.b,2)
            self.assertEqual(self.bar.c,6)
            self.assertEqual(self.bar.calc_c_count,2)
            self.assertEqual(self.bar.d,2)

    sys.argv.insert(1,'--verbose')
    unittest.main(argv=sys.argv)

你的描述符没有正确实现 - __get__方法必须使用它的obj参数,以便知道我们正在谈论哪个实例的值。你的测试通过了,因为你只有每个受管理类的一个实例 - 如果你创建一个测试,创建两个"Foo"类的实例,并在这些实例之间交替断言,你将会发现这个缺陷。使用描述符的正确方法是将它们需要的任何状态存储在实例(__get__obj参数)本身上或者使用实例的哈希或ID作为键的字典中。 - jsbueno

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接