用Python编写的事件监控系统

8
我正在使用Python创建一个项目,并想添加一个监控系统,该系统使用事件和事件处理程序。我希望这个系统在整个项目中都可用。我有以下操作的考虑:
定义事件。事件可以将一些数据作为参数。 定义监视器。监视器注册某个事件。多个监视器可以注册相同的事件。我想创建不同种类的监视器,例如一个用于打印数据,一个用于创建包含数据的图表等。因此,监视器应该是一个类,能够保存它收集的所有数据,直到调用某个方法(例如print, create-log等)。 为监视器事件对定义一个事件处理程序。这定义了给定监视器将如何响应给定事件。此操作通常会将此数据添加到某个监视器类的实例的数据列表中。 一个通知函数,可以在事件发生时通知。这将触发已注册该事件的所有监视器的事件处理程序。理想情况下,notify函数应该可以从项目的任何地方调用。
我该如何创建这样的系统?有没有库可以帮助我实现这个目标?我特别想知道如何使该系统透明地在整个项目中可用。

1
这个链接是否符合您的需求?https://dev59.com/vXNA5IYBdhLWcg3wGJwV - Ludovic Guerra
@LudovicGuerra 这确实很有帮助,例如Blinker看起来不错。但我正在寻找处理程序是一个类而不是一个简单的函数。在这个类中,我将收集所有数据,然后调用一个函数,例如导出到CSV或创建记录数据的绘图。 - JNevens
4个回答

2

2
您可以使用40行Python代码完成大部分操作。这是我自己设计并经常使用的。函数名称被选择为Qt的“信号”和“槽”的替代品。
使用起来很简单。您创建一个PSignal。通过调用connect方法注册处理程序。处理程序可以是任何可调用对象。当事件发生时,通过调用emit函数来发出信号(即通知事件)。每个已注册的可调用对象都会在该点运行。调用emit的对象不知道或不关心是否有人在听或者他们在听到后会发生什么。
您也可以断开处理程序的连接。
由于我发现否则某些错误可能难以追踪,因此存在大量调试代码。
在您的问题中,您希望每个处理程序都是监视器,在我的设计中处理程序只是功能。但是,对我来说,您的“监视器”概念与事件/处理程序机制无关。您将不得不编写功能使应用程序正常运行,并且使这些功能调用您的监视器应该非常容易。
该代码已经在Python 3.3上进行了广泛测试。
#! python3
import traceback

class PSignal:
    def __init__(self, debug=False):
        self.debug = debug
        self.__handlers = []

    def clear(self):
        """Deletes all the handlers."""
        self.__handlers.clear()

    def connect(self, f):
        """f is a python function."""
        if not callable(f):
            raise ValueError("Object {!r} is not callable".format(f))
        self.__handlers.append(f)
        if self.debug:
            print("PSIGNAL: Connecting", f, self.__handlers)

    def disconnect(self, f):
        for f1 in self.__handlers:
            if f == f1:
                self.__handlers.remove(f)
                return

    def emit(self, *x, **y):
        self._emit(*x, **y)

    def check_debug(self):
        if self.debug and self.__handlers:
            print("PSIGNAL: Signal emitted")
            traceback.print_stack()

    def _emit(self, *x, **y):
        self.check_debug()
        for f in self.__handlers:
            try:
                if self.debug:
                    print("PSIGNAL: emit", f, len(x), x, y)
                f(*x, **y)
            except Exception:
                print("PSIGNAL: Error in signal", f)
                traceback.print_exc()

0

你可以使用像zmq和“发布者-订阅者”模式这样的分布式消息系统来创建自己的系统。

我已经构建了一个可定制的工作流引擎(Flows, https://github.com/mastro35/flows)。

再见 D.


0

我用这个来进行健康监控,允许用户指定回调,并且支持线程化、主动监视和被动监视:

https://gist.github.com/earonesty/4ccf8fc9bde6feac30e5c155e54dfa5f

我粘贴了下面的代码,没有测试(比代码多):

class MonitorInstance:
    def __init__(self, parent, label, func, threshold, active, metric):
        self.parent = parent
        self.label = label
        self.func = func
        self.threshold = threshold
        self.active = active
        self.metric = metric
        self.__errors = None

    def ok(self):
        if self.__errors is None or self.__errors:
            self.parent._ok(self)
        self.__errors = 0
        if self.metric:
            self.metric.set(0)

    def error(self):
        if not self.__errors:
            self.parent._error(self)

        if self.__errors is None:
            self.__errors = 0

        self.__errors += 1

        if self.metric:
            self.metric.inc()

    def check(self):
        try:
            self.func()
            self.ok()
        except Exception as e:
            log.error("%s error: %s", self.label, e)
            self.error()

    @property
    def healthy(self):
        return self.__errors < self.threshold

DEFAULT_THRESHOLD = 1           # errors to cause fault
DEFAULT_CHECKSECS = 5           # time in secs between checks

class Monitor:
    def __init__(self, health_callback=None, check_secs=DEFAULT_CHECKSECS, use_thread=False):
        self.active = []        # active moniors
        self.alerts = set()     # thresholds currently triggered (not healthy)
        self.health_callback = health_callback
        self.healthy = False    # default: not healthy unless a monitor is added!
        self.check_secs = check_secs
        self.last_check = 0

        if use_thread:
            assert self.check_secs > 0, "threads need to sleep"
            threading.Thread(target=self._thread_loop, daemon=True).start()

    def add(self, label, check, threshold=DEFAULT_THRESHOLD, active=False, metric=None):
        inst = MonitorInstance(self, label, check, threshold, active, metric)
        if active:
            self.active.append(inst)
        inst.check()
        return inst

    def _error(self, inst):
        self.alerts.add(inst)
        if self.healthy:
            self._callback(False)
        self.healthy = False

    def _thread_loop(self):
        while True:
            self.check()
            time.sleep(self.check_secs)

    def _callback(self, value):
        if not self.health_callback is None:
            try:
                self.health_callback(value)
            except:
                # health callback should always succeed!
                log.exception("deadlyexes: error calling %s", self.health_callback)

    def _ok(self, inst):
        self.alerts.discard(inst)
        if not self.healthy and not self.alerts:
            self._callback(True)
            self.healthy = True

    def check(self, force=False):
        if not force and (time.time() < (self.last_check + self.check_secs)):
            return False

        # returns true if check was done
        checked=False
        # convert to list prevents modifying iterators
        for inst in list(self.alerts) + self.active:
            try:
                checked=True
                inst.check()
            except:
                pass
        return checked

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接