from rx import Observable, Observer
def push_five_strings(observer):
observer.on_next("Alpha")
observer.on_next("Beta")
observer.on_next("Gamma")
observer.on_next("Delta")
observer.on_next("Epsilon")
observer.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source = Observable.create(push_five_strings)
source.subscribe(PrintObserver())
输出:
Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
import pymq
# common code
class MyEvent:
pass
# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
print('event received')
# publisher code
pymq.publish(MyEvent())
# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')
from pymq.provider.redis import RedisConfig
# starts a new thread with a Redis event loop
pymq.init(RedisConfig())
# main application control loop
pymq.shutdown()
另一个方便的包是events。它封装了事件订阅和事件触发的核心部分,并且感觉就像语言的“自然”部分一样。它似乎类似于C#语言,提供了一种方便的方式来声明、订阅和触发事件。从技术上讲,事件是一个“插槽”,可以将回调函数(事件处理程序)附加到其中-这个过程被称为订阅事件。
# Define a callback function
def something_changed(reason):
print "something changed because %s" % reason
# Use events module to create an event and register one or more callback functions
from events import Events
events = Events()
events.on_change += something_changed
当事件被触发时,所有附加的事件处理程序按顺序调用。要触发事件,请对插槽进行调用:
events.on_change('it had to happen')
'something changed because it had to happen'
buslane
模块。from dataclasses import dataclass
from buslane.commands import Command, CommandHandler, CommandBus
@dataclass(frozen=True)
class RegisterUserCommand(Command):
email: str
password: str
class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):
def handle(self, command: RegisterUserCommand) -> None:
assert command == RegisterUserCommand(
email='john@lennon.com',
password='secret',
)
command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
email='john@lennon.com',
password='secret',
))
$ pip install buslane
之前我写了一个库,可能对你有用。它允许你拥有本地和全局监听器,多种不同的注册方式,执行优先级等等。
from pyeventdispatcher import register
register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)
dispatch(Event("foo.bar", {"id": 1}))
# first second
这个链接 pyeventdispatcher