哪些Python包提供独立的事件系统?

253

我知道pydispatcher,但是Python肯定还有其他与事件相关的包。

都有哪些库可用呢?

我不感兴趣使用大型框架中的事件管理器,我宁愿使用一个小巧且容易扩展的基础解决方案。

16个回答

2
如果你想做更复杂的事情,比如合并事件或重试,你可以使用Observable模式和一个成熟的库来实现。Observables在Javascript和Java中非常常见,对于一些异步任务来说非常方便易用。你可以使用https://github.com/ReactiveX/RxPY这个库。
from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

输出:

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

2
如果您需要跨进程或网络边界工作的事件总线,可以尝试使用 PyMQ。它目前支持发布/订阅、消息队列和同步RPC。默认版本在Redis后端上运行,因此您需要运行一个Redis服务器。还有一个内存后端用于测试。您也可以编写自己的后端。
import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

初始化系统:
from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

免责声明:本人是这个库的作者。

2
这里有另一个需要考虑的模块,它似乎是更为严谨的应用的可选方案。
Py-notify是一个Python包,提供了实现观察者编程模式的工具。这些工具包括信号、条件和变量。
信号是处理程序列表,当信号被发射时调用处理程序。条件基本上是布尔变量,与发出状态变化信号相关联。它们可以使用标准逻辑运算符(not,and等)组合成复合条件。变量与条件不同,可以容纳任何Python对象,而不仅仅是布尔值,但它们不能组合在一起。

1
主页可能已经不再得到支持,因此无法使用。 - David Parks

2

另一个方便的包是events。它封装了事件订阅和事件触发的核心部分,并且感觉就像语言的“自然”部分一样。它似乎类似于C#语言,提供了一种方便的方式来声明、订阅和触发事件。从技术上讲,事件是一个“插槽”,可以将回调函数(事件处理程序)附加到其中-这个过程被称为订阅事件。

# Define a callback function
def something_changed(reason):
    print "something changed because %s" % reason

# Use events module to create an event and register one or more callback functions
from events import Events
events = Events()
events.on_change += something_changed

当事件被触发时,所有附加的事件处理程序按顺序调用。要触发事件,请对插槽进行调用:

events.on_change('it had to happen')

这将输出:
'something changed because it had to happen'

更多的文档可以在github仓库文档中找到。


1
你可以尝试使用 buslane 模块。
该库使得基于消息的系统的实现更加容易。它支持命令(单个处理程序)和事件(0或多个处理程序)的方式。Buslane使用Python类型注释来正确地注册处理程序。
简单示例:
from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='john@lennon.com',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='john@lennon.com',
    password='secret',
))

安装buslane很简单,只需使用pip:
$ pip install buslane

1

之前我写了一个库,可能对你有用。它允许你拥有本地和全局监听器,多种不同的注册方式,执行优先级等等。

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

请看

这个链接 pyeventdispatcher


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接