ASP.NET Web Api 的事件发布器

4
我开始使用微服务,需要创建一个事件发布机制。我计划使用Amazon SQS。这个想法非常简单:在同一事务中将聚合和事件存储在数据库中。例如,如果用户更改了电子邮件,则会在数据库中存储事件“UserChangedEmail”。我还有事件处理程序(如“UserChangedEmailHandler”),它将负责将此事件发布到SQS队列,以便其他服务可以知道用户已更改电子邮件。我的问题是,应该如何实现这一点?是否应该有某种后台定时进程来扫描事件表并将事件发布到SQS?这可以在WebApi应用程序中进行处理吗?还是应该作为单独的进程?其中一个想法是使用Hangfire,但它不支持每分钟以下的cron作业。有什么建议吗?编辑:根据一个答案的建议,我看了NServicebus。NServiceBus页面上的一个示例展示了我的关注点的核心。
在他们的示例中,他们创建了一个记录已下订单的日志。如果日志或数据库条目成功提交,但发布出现问题并且事件从未被发布,会怎样呢?
以下是事件处理程序的代码:
public class PlaceOrderHandler :
    IHandleMessages<PlaceOrder>
{
    static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
    IBus bus;

    public PlaceOrderHandler(IBus bus)
    {
        this.bus = bus;
    }

    public void Handle(PlaceOrder message)
    {
        log.Info($"Order for Product:{message.Product} placed with id: {message.Id}");
        log.Info($"Publishing: OrderPlaced for Order Id: {message.Id}");

        var orderPlaced = new OrderPlaced
        {
            OrderId = message.Id
        };
        bus.Publish(orderPlaced); <!-- my concern
    }
}

我没有使用Amazon SQS的经验。但是你能告诉我你在这里想要实现什么吗?让所有服务都知道这个事件是目的吗? - Swagata Prateek
@SwagataPrateek 是的,那正是我想要实现的。 - Robert
2
根据您的编辑,我已更新了我的答案 - 请查看标题为“事务一致性和出箱模式”的部分。 - Chris Simon
2个回答

10

现成建议

与其自己动手,我建议您考虑现成的产品,因为这里有很多复杂性在一开始不会显而易见,例如:

  • 管理事件订阅者列表 - SQS队列更适合与事件消费者配对,而不是与事件生产者配对,因为当消息被消耗时,它就不再在队列中可用了 - 因此,如果您想支持给定事件的多个订阅者(这是事件驱动架构的重大优势),那么在首次引发事件消息时,如何知道要将其推送到哪些SQS队列?
  • 重试语义,错误转发队列 - 处理由于临时基础设施问题而导致的临时错误与由于业务逻辑语义问题而导致的永久错误
  • 哪些消息在何时引发并发送到哪里的审计跟踪
  • 通过SQS发送的消息的安全性(您的业务案例是否要求对其进行加密?SQS是亚马逊提供的应用程序服务,不提供存储级别的加密
  • 消息的大小 - SQS有一个消息大小限制,因此您最终可能需要处理大型消息的带外传输

这只是我想到的一些问题...

以下是一些现成系统,可以提供帮助:

  • NServiceBus提供了一个命令和事件消息管理框架,并且它有一个插件框架,允许灵活的传输类型——NServiceBus.SQS提供了SQS作为传输方式。
    • 提供全面和灵活的重试、审计和错误处理
    • 在使用命令和事件时有自己的观点(命令消息说“做这个”,并发送到单个服务进行处理,事件消息说“某事发生了”,并发送到任意数量的灵活订阅者)
    • Outbox模式即使使用非事务一致的传输,如SQS也提供了具有事务一致性的消息传递
    • 目前,SQS插件使用默认的NServiceBus订阅者持久性,需要SQL Server来存储事件订阅者列表(有一个选项利用SNS,见下文)
    • 内置对saga的支持,提供了一个框架,以确保通过补偿操作回滚的多个事务最终一致性
    • 支持定时消息处理的超时
    • 商业化产品,因此不是免费的,但许多插件/扩展都是开源的
  • Mass Transit
    • 不支持SQS,但支持Azure Service Bus和RabbitMq,因此如果这是可选项,它可以成为您的替代选择
    • 类似于NServiceBus的产品,但不完全相同——NServiceBus vs MassTransit提供了全面的比较
    • 完全开源/免费的
  • Just Saying
    • 一个专门针对SQS/SNS的轻量级开源消息框架
    • 每个事件使用SNS主题,每个微服务使用SQS队列,使用本地SNS SQS队列订阅实现扇出
    • 开源免费的
我建议您研究现成的解决方案,虽然可能还有其他选择,但我个人最有经验的是NServiceBus。这些解决方案将使您能够开始以业务事件为基础设计系统,而无需担心事件传输的机制。即使您想要自己构建作为学习练习,也可以回顾以上内容,了解可靠的事件驱动消息所需的一些提示。

事务一致性和Outbox模式

问题已被编辑为询问如果某些操作成功,但发布操作失败会发生什么。这通常称为消息的事务一致性,意味着在一个事务内,所有业务副作用都已提交或全部未提交。业务副作用可能包括:
  • 数据库记录更新
  • 另一个数据库记录删除
  • 消息发布到消息队列
  • 发送电子邮件
如果数据库操作失败,则通常不希望发送电子邮件或发布消息,同样,如果消息发布失败,则不希望提交数据库操作。 那么如何确保消息的一致性呢? NServiceBus以以下两种方式处理此问题:
  1. 使用事务一致的消息传输,例如 MSMQ。
    1. MSMQ 能够利用 Microsoft 的 DTC (分布式事务协调器),DTC 可以将消息发布的分布式事务与 SQL 服务器更新一起注册,这意味着如果您的业务交易失败,您的发布操作将被回滚,反之亦然。
  2. Outbox 模式
    1. 使用 Outbox 模式时,消息不会立即发送-它们会作为同一事务的一部分添加到数据库中的 Outbox 表中,最好与您的业务数据相同的数据库中
    2. 在提交事务后,它尝试发送每个消息,并仅在成功发送后从 outbox 中删除该消息
    3. 如果系统在发送但删除之前发生故障,则会再次传输该消息。为了弥补这一点,当启用 Outbox 时,NServiceBus 还将通过维护所有入站消息的记录并丢弃重复项来对入站消息进行去重。
    4. 去重对于 Amazon SQS 尤其有用,因为它本身是最终一致的,同样的消息可能会收到两次。
    5. 这与您问题中的原始概念相差不远,但存在一些差异:
      1. 您正在构想一个后台定时进程,用于扫描事件表(也称为 Outbox 表)并将事件发布到 SQS
      2. NServiceBus 在 管道 中执行处理程序-使用 Outbox 时,将消息发送到传输(即将消息推送到 SQS 队列)只是管道中的最后一步之一。因此,每当处理消息时,在处理期间生成的任何外发消息都将在业务交易提交后立即发送-无需定时扫描事件表。
    6. 注意:仅在存在环境 NServiceBus 处理程序事务时 Outbox 才能成功-i.e. 当您在 NServiceBus 管道内处理消息时。在某些情况下,例如 WebAPI 请求管道,这将不是这种情况。出于这个原因,NServiceBus 建议仅使用 API 请求发送单个命令消息, 然后在后端终端服务中的事务一致命令处理程序中将业务数据操作与进一步的消息组合起来。尽管他们文档中的第三点更与 MSMQ 比 SQS 传输相关。

处理程序语义

关于您的提案,还有一个评论 - 按照惯例,UserChangedEmailHandler 更常与响应电子邮件更改的服务相关,而不仅仅是参与传播电子邮件更改信息。当您的系统发布50个事件时,您是否希望有50个不同的处理程序只是将这些消息推送到不同的队列中?

上述系统使用通用框架通过传输来传播消息,因此您可以将 UserChangedEmailHandler 保留给订阅系统,并在其中包含每当用户更改其电子邮件时应发生的业务逻辑。


Chris,这是非常棒的回复。您能否再详细解释一下“但是,与其使用计时器轮询发件箱表,它将调度挂钩到业务数据事务的成功提交上”? - Robert
发布命令而不是直接调用聚合根的命令有什么好处呢? bus.publish("ChangeEmail") ->handle "ChangeEmail" && publish to SQS VS aggregate.ChangeEmail() && publish to SQS - Robert
编辑后,链接到Particular的页面,介绍如何从Web应用程序发布事件。这是一个复杂的主题,上面的答案已经变得相当冗长了!如果您想更清楚地了解它,可能值得再提一个问题? - Chris Simon
你说得很对。感谢你详细而优秀的回复。 - Robert

2
无论如何,我会选择有状态服务。如果你想更加轻松一点,可以看看Azure Service Fabric。
在我的情况下,我有自己的一组微服务,在这种情况下,我首先对数据库进行了基本的创建操作(更改电子邮件)。我有一个事件实体,并将一个事件推回到该集合中(在这种情况下是mongodb)。一个有状态的服务正在轮询数据库并批处理处理事件。
现在在你的情况下,如果你的Web应用程序进程是持久的,你可以立即选择将消息加入队列,并在事件中保留一个字段,指示它是否被任何服务实际处理后处理。我使用mongodb作为数据库和Azure Service Bus作为消息代理。我认为Amazon SQS会类似。
现在,如果你的Web应用程序是一个普通的asp.net Web api或mvc进程,你只需要在数据库中注册事件,以便你不必每次收到请求时都创建一个消息代理监听器。一个服务可以轮询数据库,使用消息代理让其他服务知道。
如果您想要完全基于事件驱动的范例,您可能需要查看 Event Hubs
我强烈建议从消息总线中保持一个选项卡,以确保它的可靠性,即检查任何资源是否已被处理。
希望对您有所帮助。 :)

一个服务可以轮询数据库,使用消息代理让其他服务知晓。这意味着每个服务应该有一个需要轮询其数据库以获取事件的服务,还是一个服务来轮询所有数据库? 一个服务可以轮询数据库,使用消息代理让其他服务知晓。这意味着每个服务都应该有一个需要轮询其数据库以获取事件的服务。 - Robert
每个需要传播事件的Web服务?如果您有一个易于设置的REST客户端来访问消息代理,那么您也可以摆脱轮询器。但即使如此,这也是不好的设计,因为您的控制器将执行其不应该执行的任务。我保留了一个服务来轮询数据库以获取事件。不只是一个,而是所有事件,并且它仅将消息推送到正确的主题。其他服务订阅该主题,并在消息出现时相应地执行操作。 - Swagata Prateek

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接