截至2022年10月,以下是可在PyPI上获得的与事件相关的软件包,按最近发布日期排序。
2.0.6
:2022年8月1.5
:2022年6月0.4.0
:2022年6月0.2.0
:2022年4月1.0.0
:2021年8月0.4
:2020年10月4.5.0
:2020年9月1.0.1
:2020年6月2.0
:2019年9月4.0.3
:2019年1月0.2.3a0
:2018年0.0.5
:2018年2.1.2
:2017年0.0.7
:2016年1.0
:2012年0.3.1
:2008年有很多库可供选择,使用非常不同的术语(事件、信号、处理程序、方法调度、钩子等)。
我试图对上述包以及此处答案中提到的技术进行概述。
首先,一些术语...
最基本的事件系统风格是“处理程序方法集”,这是Observer pattern的简单实现。
基本上,处理程序方法(可调用对象)存储在数组中,并在事件“触发”时分别调用。
观察者事件系统的缺点是,您只能在实际事件对象(或处理程序列表)上注册处理程序。 因此,在注册时,事件已经需要存在。
因此,第二种事件系统的样式出现了:发布-订阅模式。在这里,处理程序不会注册在事件对象(或处理程序列表)上,而是在中央调度程序上注册。通知器也只与调度程序交互。要监听什么或发布什么由“信号”确定,它只是一个名称(字符串)。在Python中,保持对方法或对象的引用可以确保它不会被垃圾回收器删除。这可能是可取的,但也可能导致内存泄漏:链接的处理程序永远不会被清理。
一些事件系统使用弱引用而不是常规引用来解决这个问题。
观察者式事件系统:
list
可以非常简单地实现这样的事件系统。set
而不是list
来存储包,并实现了__call__
,这两个都是合理的补充。pydispatch.Dispatcher
。发布-订阅库:
其他:
我一直都是这样做的:
class Event(list):
"""Event subscription.
A list of callable objects. Calling an instance of this will cause a
call to each item in the list in ascending order by index.
Example Usage:
>>> def f(x):
... print 'f(%s)' % x
>>> def g(x):
... print 'g(%s)' % x
>>> e = Event()
>>> e()
>>> e.append(f)
>>> e(123)
f(123)
>>> e.remove(f)
>>> e()
>>> e += (f, g)
>>> e(10)
f(10)
g(10)
>>> del e[0]
>>> e(2)
g(2)
"""
def __call__(self, *args, **kwargs):
for f in self:
f(*args, **kwargs)
def __repr__(self):
return "Event(%s)" % list.__repr__(self)
然而,就像我看到的其他东西一样,这里也没有自动生成的pydoc和签名,这真的很糟糕。
_bag_of_handlers
中,它是一个列表。该类的add方法将简单地是self._bag_of_handlers.append(some_callable)
。该类的fire方法将循环遍历_bag_of_handlers
,将提供的args和kwargs传递给处理程序并按顺序执行每个处理程序。 - Gabe Spradlin根据Michael Foord在他的事件模式中建议,我们使用EventHook:
只需向您的类添加EventHooks:
class MyBroadcaster()
def __init__():
self.onChange = EventHook()
theBroadcaster = MyBroadcaster()
# add a listener to the event
theBroadcaster.onChange += myFunction
# remove listener from the event
theBroadcaster.onChange -= myFunction
# fire event
theBroadcaster.onChange.fire()
class EventHook(object):
def __init__(self):
self.__handlers = []
def __iadd__(self, handler):
self.__handlers.append(handler)
return self
def __isub__(self, handler):
self.__handlers.remove(handler)
return self
def fire(self, *args, **keywargs):
for handler in self.__handlers:
handler(*args, **keywargs)
def clearObjectHandlers(self, inObject):
for theHandler in self.__handlers:
if theHandler.im_self == inObject:
self -= theHandler
self.__handlers = [h for h in self.__handlers if h.im_self != obj]
。 - Simon Bergotself.__handlers = [h for h in self._handlers if getattr(h, 'im_self', False) != obj]
- Eric Marcos我使用zope.event,它是您可以想象到的最基本的东西。:-)实际上,这是完整的源代码:
subscribers = []
def notify(event):
for subscriber in subscribers:
subscriber(event)
请注意,您不能在进程之间发送消息。这不是一个消息系统,只是一个事件系统,仅此而已。
我在Valued Lessons上找到了这个小脚本。它似乎具有我所追求的简单和强大的比例。Peter Thatcher是以下代码的作者(没有提及许可证)。
class Event:
def __init__(self):
self.handlers = set()
def handle(self, handler):
self.handlers.add(handler)
return self
def unhandle(self, handler):
try:
self.handlers.remove(handler)
except:
raise ValueError("Handler is not handling this event, so cannot unhandle it.")
return self
def fire(self, *args, **kargs):
for handler in self.handlers:
handler(*args, **kargs)
def getHandlerCount(self):
return len(self.handlers)
__iadd__ = handle
__isub__ = unhandle
__call__ = fire
__len__ = getHandlerCount
class MockFileWatcher:
def __init__(self):
self.fileChanged = Event()
def watchFiles(self):
source_path = "foo"
self.fileChanged(source_path)
def log_file_change(source_path):
print "%r changed." % (source_path,)
def log_file_change2(source_path):
print "%r changed!" % (source_path,)
watcher = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()
这是一个简洁的设计,应该能够很好地工作。您需要做的就是在一个类中继承Observer
,然后使用observe(event_name, callback_fn)
来监听特定事件。每当代码中发生特定事件(例如Event('USB connected')
)时,相应的回调函数将触发。
class Observer():
_observers = []
def __init__(self):
self._observers.append(self)
self._observed_events = []
def observe(self, event_name, callback_fn):
self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})
class Event():
def __init__(self, event_name, *callback_args):
for observer in Observer._observers:
for observable in observer._observed_events:
if observable['event_name'] == event_name:
observable['callback_fn'](*callback_args)
例子:
class Room(Observer):
def __init__(self):
print("Room is ready.")
Observer.__init__(self) # DON'T FORGET THIS
def someone_arrived(self, who):
print(who + " has arrived!")
# Observe for specific event
room = Room()
room.observe('someone arrived', room.someone_arrived)
# Fire some events
Event('someone left', 'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted', 'Lenard')
from pymitter import EventEmitter
ee = EventEmitter()
# decorator usage
@ee.on("myevent")
def handler1(arg):
print "handler1 called with", arg
# callback usage
def handler2(arg):
print "handler2 called with", arg
ee.on("myotherevent", handler2)
# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"
ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
我创建了一个EventManager
类(代码在末尾)。语法如下:
#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )
#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )
#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )
#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun
#Delete an event
del EventManager.eventName
#Fire the event
EventManager.eventName()
这是一个例子:
def hello(name):
print "Hello {}".format(name)
def greetings(name):
print "Greetings {}".format(name)
EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello
print "\nInitial salute"
EventManager.salute('Oscar')
print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')
输出:
最初的问候
你好奥斯卡
哈喽奥斯卡现在移除问候语
哈喽奥斯卡
EventManger 代码:
class EventManager:
class Event:
def __init__(self,functions):
if type(functions) is not list:
raise ValueError("functions parameter has to be a list")
self.functions = functions
def __iadd__(self,func):
self.functions.append(func)
return self
def __isub__(self,func):
self.functions.remove(func)
return self
def __call__(self,*args,**kvargs):
for func in self.functions : func(*args,**kvargs)
@classmethod
def addEvent(cls,**kvargs):
"""
addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
creates events using **kvargs to create any number of events. Each event recieves a list of functions,
where every function in the list recieves the same parameters.
Example:
def hello(): print "Hello ",
def world(): print "World"
EventManager.addEvent( salute = [hello] )
EventManager.salute += world
EventManager.salute()
Output:
Hello World
"""
for key in kvargs.keys():
if type(kvargs[key]) is not list:
raise ValueError("value has to be a list")
else:
kvargs[key] = cls.Event(kvargs[key])
cls.__dict__.update(kvargs)
class EventHook(object):
'''
A simple implementation of the Observer-Pattern.
The user can specify an event signature upon inizializazion,
defined by kwargs in the form of argumentname=class (e.g. id=int).
The arguments' types are not checked in this implementation though.
Callables with a fitting signature can be added with += or removed with -=.
All listeners can be notified by calling the EventHook class with fitting
arguments.
>>> event = EventHook(id=int, data=dict)
>>> event += lambda id, data: print("%d %s" % (id, data))
>>> event(id=5, data={"foo": "bar"})
5 {'foo': 'bar'}
>>> event = EventHook(id=int)
>>> event += lambda wrong_name: None
Traceback (most recent call last):
...
ValueError: Listener must have these arguments: (id=int)
>>> event = EventHook(id=int)
>>> event += lambda id: None
>>> event(wrong_name=0)
Traceback (most recent call last):
...
ValueError: This EventHook must be called with these arguments: (id=int)
'''
def __init__(self, **signature):
self._signature = signature
self._argnames = set(signature.keys())
self._handlers = []
def _kwargs_str(self):
return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())
def __iadd__(self, handler):
params = inspect.signature(handler).parameters
valid = True
argnames = set(n for n in params.keys())
if argnames != self._argnames:
valid = False
for p in params.values():
if p.kind == p.VAR_KEYWORD:
valid = True
break
if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
valid = False
break
if not valid:
raise ValueError("Listener must have these arguments: (%s)"
% self._kwargs_str())
self._handlers.append(handler)
return self
def __isub__(self, handler):
self._handlers.remove(handler)
return self
def __call__(self, *args, **kwargs):
if args or set(kwargs.keys()) != self._argnames:
raise ValueError("This EventHook must be called with these " +
"keyword arguments: (%s)" % self._kwargs_str())
for handler in self._handlers[:]:
handler(**kwargs)
def __repr__(self):
return "EventHook(%s)" % self._kwargs_str()
django.dispatch
,它运行良好。谢谢。 - Mahmood Dehghan