哪些Python包提供独立的事件系统?

253

我知道pydispatcher,但是Python肯定还有其他与事件相关的包。

都有哪些库可用呢?

我不感兴趣使用大型框架中的事件管理器,我宁愿使用一个小巧且容易扩展的基础解决方案。

16个回答

295

PyPI软件包

截至2022年10月,以下是可在PyPI上获得的与事件相关的软件包,按最近发布日期排序。

还有更多

有很多库可供选择,使用非常不同的术语(事件、信号、处理程序、方法调度、钩子等)。

我试图对上述包以及此处答案中提到的技术进行概述。

首先,一些术语...

观察者模式

最基本的事件系统风格是“处理程序方法集”,这是Observer pattern的简单实现。

基本上,处理程序方法(可调用对象)存储在数组中,并在事件“触发”时分别调用。

发布-订阅

观察者事件系统的缺点是,您只能在实际事件对象(或处理程序列表)上注册处理程序。 因此,在注册时,事件已经需要存在。

因此,第二种事件系统的样式出现了:发布-订阅模式。在这里,处理程序不会注册在事件对象(或处理程序列表)上,而是在中央调度程序上注册。通知器也只与调度程序交互。要监听什么或发布什么由“信号”确定,它只是一个名称(字符串)。

中介者模式

也可能感兴趣:中介者模式

钩子

“钩子”系统通常用于应用程序插件的上下文中。应用程序包含固定的集成点(钩子),每个插件都可以连接到该钩子并执行某些操作。

其他“事件”

注意:线程.Event不是上述意义上的“事件系统”。它是一种线程同步系统,其中一个线程等待另一个线程“发出”Event对象的信号。
网络消息传递库通常也使用“事件”这个术语;有时候这些概念是相似的,有时候则不是。它们可以跨越线程、进程和计算机边界。例如:pyzmq, pymq, Twisted, Tornado, gevent, eventlet

弱引用

在Python中,保持对方法或对象的引用可以确保它不会被垃圾回收器删除。这可能是可取的,但也可能导致内存泄漏:链接的处理程序永远不会被清理。

一些事件系统使用弱引用而不是常规引用来解决这个问题。

关于各种库的一些话

观察者式事件系统:

  • zope.event展示了这个机制的基本原理(参见Lennart's answer)。注意:这个例子甚至不支持处理程序参数。
  • LongPoke的“callable list”实现表明,通过子类化list可以非常简单地实现这样的事件系统。
  • Felk的变体EventHook还确保了调用方和被调用方的签名。
  • spassig的EventHook(Michael Foord的事件模式)是一个直接的实现。
  • Josip的Valued Lessons Event class基本上是一样的,但使用set而不是list来存储包,并实现了__call__,这两个都是合理的补充。
  • PyNotify在概念上类似,并提供了额外的变量和条件概念(“变量更改事件”)。主页无法正常工作。
  • axel基本上是一个处理程序集合,具有与线程、错误处理等相关的更多功能。
  • python-dispatch要求事件源类派生自pydispatch.Dispatcher
  • buslane是基于类的,支持单个或多个处理程序,并促进广泛的类型提示。
  • Pithikos的Observer/Event是一个轻量级的设计。

发布-订阅库:

  • blinker具有自动断开和基于发送者的过滤等一些不错的功能。
  • PyPubSub是一个稳定的包,承诺提供“促进主题和消息调试和维护的高级功能”。
  • pymitter是Node.js EventEmitter2的Python移植版本,提供命名空间、通配符和TTL。
  • PyDispatcher似乎强调在许多对多发布方面的灵活性。支持弱引用。
  • louie是重新设计的PyDispatcher,应该可以在“各种上下文中”工作。
  • pypydispatcher基于(你猜对了...)PyDispatcher,并且还可以在PyPy中工作。
  • django.dispatch是一个重新编写的PyDispatcher,“具有更有限的界面,但更高的性能”。
  • pyeventdispatcher基于PHP Symfony框架的事件分发器。
  • dispatcher是从django.dispatch中提取出来的,但已经相当老了。
  • Cristian Garcia的EventManger实现非常简短。

其他:

  • pluggy 包含一个钩子系统,被 pytest 插件使用。
  • RxPy3 实现了可观察模式并允许合并事件、重试等。
  • Qt 的信号和槽可以从 PyQtPySide2 中使用。它们在同一线程中使用时作为回调函数,在两个不同的线程之间使用时作为事件(使用事件循环)。信号和槽的限制是它们只能在派生自 QObject 的类的对象中使用。

由于我正在使用Django,我尝试了django.dispatch,它运行良好。谢谢。 - Mahmood Dehghan

118

我一直都是这样做的:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

然而,就像我看到的其他东西一样,这里也没有自动生成的pydoc和签名,这真的很糟糕。


3
我觉得这个风格相当有趣。它非常简洁。我喜欢它可以让我们把事件和订阅者作为自主操作来操纵的事实。我将看看它在一个真正的项目中表现如何。 - Rudy Lattae
2
非常漂亮的极简主义风格!太棒了! - akaRem
2
我无法为此点赞足够多,这真的很简单明了。 - user890167
3
能否有人向我解释一下,以10岁的孩子听得懂的方式,这个类被主类继承了吗?我没有看到__init__方法,所以不会使用super()。出于某种原因,我还是无法理解。 - omgimdrunk
1
@omgimdrunk 简单的事件处理程序会在事件触发时触发一个或多个可调用的函数。为您“管理”这个类需要以下最基本的方法 - add和fire。在该类中,您需要维护要执行的处理程序列表。让我们把它放在实例变量_bag_of_handlers中,它是一个列表。该类的add方法将简单地是self._bag_of_handlers.append(some_callable)。该类的fire方法将循环遍历_bag_of_handlers,将提供的args和kwargs传递给处理程序并按顺序执行每个处理程序。 - Gabe Spradlin
显示剩余2条评论

77

根据Michael Foord在他的事件模式中建议,我们使用EventHook:

只需向您的类添加EventHooks:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

我们在Michael的类中添加了一个从对象中删除所有监听器的功能,最终得到了以下代码:
class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler

使用此方法的缺点是您需要首先添加事件,然后才能注册为订阅者。如果只有发布者添加其事件(不是必须的,只是一个好习惯),那么您必须在订阅者之前初始化发布者,这在大型项目中很麻烦。 - Jonathan Livni
7
最后一种方法存在缺陷,因为在迭代过程中修改了self.__handlers。修复方法:self.__handlers = [h for h in self.__handlers if h.im_self != obj] - Simon Bergot
1
@Simon是正确的,但是他引入了一个bug,因为self.__handlers中可能会有未绑定的函数。修正方法:self.__handlers = [h for h in self._handlers if getattr(h, 'im_self', False) != obj] - Eric Marcos

23

我使用zope.event,它是您可以想象到的最基本的东西。:-)实际上,这是完整的源代码:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

请注意,您不能在进程之间发送消息。这不是一个消息系统,只是一个事件系统,仅此而已。


我仍然想要发送消息。我将使用构建在Tkinter上的应用程序中的事件系统。我没有使用它的事件系统,因为它不支持消息。 - Josip
你可以使用zope.event发送任何你想要的东西。但是我的观点是,它并不是一个合适的消息系统,因为你无法将事件/消息发送到其他进程或其他计算机。你可能应该对你的需求更加具体一些。 - Lennart Regebro

18

我在Valued Lessons上找到了这个小脚本。它似乎具有我所追求的简单和强大的比例。Peter Thatcher是以下代码的作者(没有提及许可证)。

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

1
使用 set() 而不是 list() 很好,可以避免多次注册处理程序。一个结果是处理程序不会按照注册的顺序被调用。虽然这不一定是坏事... - florisla
1
如果需要的话,@florisla可以将其替换为OrderedSet。 - Robino

12

这是一个简洁的设计,应该能够很好地工作。您需要做的就是在一个类中继承Observer,然后使用observe(event_name, callback_fn)来监听特定事件。每当代码中发生特定事件(例如Event('USB connected'))时,相应的回调函数将触发。

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

例子:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')

我喜欢你的设计,它简约易懂。而且不需要导入一些模块,因此它会更轻量级。 - Atreyagaurav

10
您可以查看 pymitter (pypi)。它是一个小型的单文件 (~250 行代码) 库,提供命名空间、通配符和 TTL。
以下是一个基本示例:
from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

10

我创建了一个EventManager类(代码在末尾)。语法如下:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

这是一个例子:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

输出:

最初的问候
你好奥斯卡
哈喽奥斯卡

现在移除问候语
哈喽奥斯卡

EventManger 代码:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)

9
我采用了Longpoke极简主义方法的变体,同时确保调用方和被调用方的签名:
class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()

我确实喜欢这个解决方案,但是要注意如果没有v.__name__属性(在使用typing.Dict时发生过),_kwargs_str(...)可能会崩溃。一个简单的重写函数可能像这样:''' def _kwargs_str(self): return ", ".join(k + "=" + (v.name if hasattr(v, "name") else v.name if hasattr(v, "name") else str(v)) for k, v in self._signature.items()) ''' - Engin

3
如果我使用pyQt编写代码,我会使用QT sockets/signals范例,django也是如此。
如果我在做异步I/O,我会使用本地的select模块。
如果我使用SAX python解析器,我会使用SAX提供的事件API。所以看起来我是底层API的受害者 :-)
也许你应该问问自己从事件框架/模块中期望什么。我个人更喜欢使用QT的Socket/Signal范例。有关更多信息,请参见此处

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接