使用事件存储和ORM的事件溯源

3

我正在使用Django构建一个身份验证服务,用于微服务架构。这更像是一个实验,而不是真正的实现,旨在更好地了解事件溯源。

我知道Django有一个非常耦合框架的ORM,使用Flask可能会更容易,但我正在尝试找到一种解决方法。

用例非常简单。用户注册后,他将收到一封电子邮件以激活帐户。用户再次激活时,我会发一封电子邮件通知他帐户已激活。

据我了解,在事件溯源系统中。事件被触发并存储,我们可以获取最新状态并将其存储在数据库中,例如Postgres使用Django ORM。

我使用Django中的信号(pre_save()信号)将事件发布到kafka,然后模型将保存对象。

在下面还未实现的更新情况下。我将只发布更新字段。并在Django中更新对象。

是否有人看到此方法存在任何问题?或者最好在模型的保存方法中实现它?

我很想听听您的反馈意见。

# app/services.py

class KafkaService:

    def __init__(self):
        try:
            self.producer = KafkaProducer(bootstrap_servers=settings.KAFKA_BROKERS,
                                          value_serializer=lambda m: json.dumps(m).encode('ascii'))
        except KafkaError as ke:
            logger.exception(f"Something went wrong creating a kafka producer: {ke}")
            self.producer = None
        except Exception as ex:
            logger.exception(f"Something went wrong creating a kafka producer: {ex}")
            self.producer = None

    def publish(self, topic, key, data):
        if not self.producer:
            logger.error(f"Kafka Producer could not establish a connection")
            pass
        try:
            self.producer.send(topic, key=bytes(key, encoding='utf-8'), value=data)
            self.producer.flush()
            logger.info("Message has been published to Kafka")
        except Exception as ex:
            logger.info(f"Errored while publishing to Kafka: {ex}")

# app/events.py

class UserEvent:

    def __init__(self, event_store):  # event store here is Kafka
        """ A user event class which is an injectable. I am using here.
        I need a key for kafka to place the correct event in the correct partition.

        :parameters:
            - event_store: a class form example :class:`KafkaService` which publishes data
        """
        self.event_store = event_store

    def user_created(self, email, data):
        """ Publish an event to the event store when a user is created

        :param email: string
        :param data: string - json
        """
        self.event_store.publish('user-created', email, data)

    def user_activated(self, email, data):
        """ Publish an event to the event store when a user is activated """
        self.event_store.publish('user-activated', email, data)

# app/signals.py

kafka_service = KafkaService()

user_event = UserEvent(kafka_service)

def user_added_event(sender, instance, created, **kwargs):
    if created:
        from users.api.v1.serializers import UserSerializer  # Avoid (Apps Not Read)

        value = UserSerializer(instance).data
        user_event.user_created(instance.email, value)

    else:
        logger.info("User Updated")
1个回答

5
你们是否有任何对这种方法的警告或者是最好在模型的保存方法中实现?通常的设计将领域模型(也称业务逻辑)与持久化问题分离。如果事件序列是你的真相来源,那么有两件事需要注意:首先,你要确保在发布之前成功存储事件。其次,你要确保写入语义是“第一个写入者获胜”。如果你做得对,并且每个人都明白数据库中的模型可能落后于事件存储中的事件,那么你就处于良好的状态。最终一致性系统使“读取自己的写入”期望具有挑战性。因此,你可能需要做一些额外的工作。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接