在Spring中使用什么样的“事件总线”?是内置的、Reactor还是Akka?

45
我们将在几周内开始一个新的Spring 4应用程序。我们想要使用一些事件驱动的架构。今年我在这里和那里读到了关于“Reactor”的文章,当我在网上寻找它时,我偶然发现了“Akka”。
所以现在我们有三个选择:
- Spring的ApplicationEvent:http://docs.spring.io/spring/docs/4.0.0.RELEASE/javadoc-api/org/springframework/context/ApplicationEvent.html - Reactor:https://github.com/reactor/reactor#reactor - Akka:http://akka.io/ 我找不到真正的比较。
现在我们只需要类似以下的东西:
  • X 注册以侦听 事件 E
  • Y 注册以侦听 事件 E
  • Z 发送一个 事件 E
然后,XY 将接收并处理该事件。
我们很可能会以异步方式使用它,但肯定也会有一些同步场景。而且我们很可能总是发送一个类作为事件。(Reactor示例大多使用字符串和字符串模式,但它也支持对象)。
据我所了解,ApplicationEvent 默认同步工作,而 Reactor 则以异步方式工作。同时,Reactor 还允许使用 await() 方法使其变得类似于同步。Akka 提供了与 Reactor 差不多的功能,但还支持远程操作。
关于 Reactor 的 await() 方法:它是否可以等待多个线程完成?或者甚至是这些线程的部分集合?如果我们采用上面的例子:
  • X 注册以侦听 Event E
  • Y 注册以侦听 Event E
  • Z 发送一个 Event E
是否可以通过说“等待 X Y 完成”来使其同步化?并且是否可以只等待 X 而不等待 Y
也许还有其他选择?比如JMS?有很多问题,但希望你能提供一些答案!谢谢!

编辑:使用案例示例

  1. 当特定事件触发时,我想创建10000封电子邮件。每个电子邮件都必须使用特定用户内容生成。因此,我会创建大量线程(最大=系统CPU核心),这些线程将创建邮件并不会阻塞调用线程,因为这可能需要几分钟。

  2. 当特定事件触发时,我希望从未知数量的服务中收集信息。每个获取需要大约100毫秒。在这里,我可以想象使用Reactor的await,因为我需要这些信息来继续我的主线程工作。

  3. 当特定事件触发时,我希望根据应用程序配置执行一些操作。因此,应用程序必须能够动态(取消)注册消费者/事件处理程序。他们将使用事件进行自己的操作,而我不在乎。因此,我将为每个处理程序创建一个线程,并在主线程中继续执行我的工作。

  4. 简单的解耦:我基本上知道所有接收方,但我只是不想在我的代码中调用每个接收方。这应该大多数情况下同步完成。

听起来我需要一个线程池或环形缓冲区。这些框架是否具有动态环形缓冲区,如果需要可以增长大小?


4
你最终使用了哪个库? - Gopinath
3个回答

32

我不确定在这个小空间里能否充分回答您的问题。但是我会尽力而为!:)

就功能而言,Spring的ApplicationEvent系统和Reactor确实非常不同。ApplicationEvent路由基于ApplicationListener处理的类型。除此之外,如果需要更复杂的逻辑,您将不得不自己实现(但这并不一定是坏事)。然而,Reactor提供了一个全面的路由层,也非常轻巧且完全可扩展。两者之间在订阅和发布事件方面的相似性仅限于任何事件驱动系统的功能。还不要忘记Spring 4中配备的新的spring-messaging模块。它是Spring Integration中可用工具的子集,并提供了围绕事件驱动架构构建的抽象。

Reactor将帮助您解决一些关键问题,否则您将不得不自己管理:

选择器匹配:Reactor进行Selector匹配,这包括一系列的匹配,从简单的.equals(Object other)调用到更复杂的URI模板匹配,允许占位符提取。您还可以使用自定义逻辑扩展内置选择器,以便使用丰富的对象作为通知键(例如域对象)。
流和Promise API:您已经提到了Promise API,引用了.await()方法,它的确适用于现有代码,该代码期望阻塞行为。当使用Reactor编写新代码时,强烈建议使用组合和回调,通过不阻塞线程有效利用系统资源。在依靠少量线程执行大量任务的架构中,阻塞呼叫者几乎从来不是一个好主意。Futures根本无法云扩展,这就是为什么现代应用程序利用替代解决方案的原因。
您的应用程序可以使用流或Promise API进行架构设计,但说实话,我认为您会发现流更灵活。关键的优点是API的可组合性,它允许您将操作连接起来形成依赖链而不会阻塞。以您电子邮件用例为基础的完全即兴的示例如下:
@Autowired
Environment env;
@Autowired
SmtpClient client;

// Using a ThreadPoolDispatcher
Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL);

input.compose()
  .map(new Function<DomainObject, EmailTemplate>() {
    public EmailTemplate apply(DomainObject in) {
      // generate the email
      return new EmailTemplate(in);
    }
  })
  .consume(new Consumer<EmailTemplate>() {
    public void accept(EmailTemplate email) {
      // send the email
      client.send(email);
    }
  });

// Publish input into Deferred
DomainObject obj = reader.readNext();
if(null != obj) {
  input.accept(obj);
}

Reactor还提供了Boundary,它基本上是用于阻塞任意消费者的CountDownLatch(因此,如果您只想阻塞Consumer完成,而不必构造一个Promise,则可以在这种情况下使用原始的Reactor并使用on()notify()方法触发服务状态检查。

然而,对于某些事情,似乎您需要从ExecutorService返回一个Future,是吗?为什么不保持简单呢? Reactor仅在吞吐量性能和开销效率很重要的情况下才会真正有益。如果您阻塞调用线程,那么您可能会抹去Reactor将给您带来的效率收益,因此在这种情况下,您最好使用更传统的工具集。

Reactor的开放性使得两者之间没有任何障碍。您可以自由地混合使用FuturesConsumers,而无需静态。在这种情况下,请记住,您的速度只能达到最慢的组件速度。

谢谢 :) 这是一个相当棒的解释!只有一个小问题:您是否也会使用 Reactor 处理那些可以使用 ApplicationEvent(同步和异步)发送/消费的简单事件?更详细地说:假设我想要获得 Spring 的 ApplicationEvent 提供的完全相同的功能。使用 Reactor 处理这些任务是否有意义?或者甚至是适得其反的? - Benjamin M
1
从服务类中发送“ApplicationEvents”非常容易,因为您只需注入“ApplicationContext”,然后就可以开始了。话虽如此,同样容易注入共享的“Reactor”并调用“notify()”。我想这真的取决于您的事件处理程序。您希望它们响应什么以及如何响应。 - Jon Brisbin
另外,我考虑创建一个简单的包装组件,将“ApplicationEvents”发送到配置的“Reactor”中。因此,使用该组件,您可以从任一来源获取事件并在单个位置(Reactor“Consumer<Event<?>>”)中处理它们。 - Jon Brisbin
1
所以基本上在这种简单情况下,只有类和方法名称存在差异。使用 Reactor 只会有性能方面的缺陷吗?或者甚至还有好处吗? - Benjamin M

9
让我们忽略Spring的ApplicationEvent,因为它并不是为你所要求的设计的(它更多地涉及Bean生命周期管理)。
你需要弄清楚的是,如果你想要做到以下两点之一:
1. 面向对象的方式(即Actor、动态消费者、即时注册) 2. 服务方式(静态消费者,在启动时注册)
使用你的X和Y的例子,它们是:
1. 短暂的实例 2. 长期存在的单例/服务对象
如果你需要即时注册消费者,那么Akka是一个不错的选择(我不确定Reactor是否可行,因为我从未使用过)。如果你不想在短暂对象中进行消费,则可以使用JMS或AMQP。
你还需要了解这些库试图解决两个问题:
1. 并发(即在同一台机器上并行执行任务) 2. 分布式(即在多台机器上并行执行任务)

Reactor 和 Akka 主要关注点是 #1。Akka 最近添加了集群支持,使用 Actor 抽象可以更轻松地实现 #2。消息队列(JMS、AMQP)主要关注 #2。

在我的工作中,我使用服务路由和经过大量修改的 Guava EventBus 和 RabbitMQ。我使用类似于 Guava Eventbus 的注释,但也为总线上发送的对象提供注释,但是您可以像我一样只使用异步模式的 Guava EventBus 作为 POC,然后制作自己的总线。

您可能认为需要动态消费者(1),但大多数问题都可以通过简单的发布/订阅来解决。此外,管理动态消费者可能会很棘手(因此选择 Akka 是一个不错的选择,因为演员模型具有各种此类管理功能)。


应用程序将有很多静态消费者,它们通过某些注释在启动时进行注册(我的意思是:事件在默认的Spring @Service类中被触发和消耗)。但对于一些任务,我们可以想象动态消费者...也许这可以通过使用静态消费者来规避,然后查找动态消费者或类似的东西?!**---**我已经使用过Guava的EventBus以及GWT的EventBus,它们有点相同,但这些都没有提供扩展功能。因此,我认为Reactor或Akka最合适?! - Benjamin M
谢谢。我刚看了幻灯片,但我不太明白为什么您建议在我们的电子邮件问题中使用SEDA?!我们没有分布式系统。只有一个应用程序处理所有内容。它接收生成10000封电子邮件的订单,然后将调用外部SMTP服务器发送每个电子邮件。(在我们当前的测试应用程序中,我们编写了自己的队列。因为:如果服务器无法发送邮件或崩溃,则应在重新启动后发送剩余的邮件。我们的自定义队列将所有生成的邮件写入数据库,并在成功发送后删除每个邮件)。 - Benjamin M
因为几个月后你可能会发送30000或100000条消息。使用反应器和在某种程度上使用Akka,你不能像使用消息队列一样启动另一台机器。使用消息队列,你可以获得冗余性并强制实现无状态/隔离(消息是不可变的)。此外,你仍然可以使用同一台机器来发布和消费消息。 - Adam Gent
此外,发送电子邮件并不是短暂的CPU密集型操作,而是长时间运行的IO阻塞操作。Reactor和Akka更适合利用多核机器中的并发性进行实时请求处理(在我看来),而不是你正在进行的批处理。 - Adam Gent
是的。发送任务是IO阻塞的,但是以前那些电子邮件的生成非常消耗CPU资源。因此,首先我需要大量的CPU计算能力,然后我需要尽可能多的网络带宽。但是网络方面是一个不同的话题,如果发送邮件需要10或120分钟也没关系。更重要的是进度可观(可以看到完成了多少百分比),并且不会使服务器崩溃,这样它仍然可以像以前一样快速地提供网站服务。 - Benjamin M
显示剩余2条评论

3
仔细定义您从框架中需要的内容。如果一个框架比您需要的功能更多,那并不总是好的。更多的功能意味着更多的漏洞、更多的代码要学习,以及更低的性能。
一些需要关注的特点包括:
- 演员的性质(线程或轻量级对象) - 能够在机器集群上工作(Akka) - 持久化消息队列(JMS) - 特定的特性,如信号(没有信息的事件)、转换(将来自不同端口的消息组合成复杂事件的对象,参见Petri网)等。
对于像await这样的同步特性要小心 - 它会阻塞整个线程,在演员在线程池上执行时很危险(线程饥饿)。
更多需要关注的框架:
- Fork-Join Pool - 在某些情况下,允许await而不会出现线程饥饿 - 科学工作流系统

Java 数据流框架 - 信号,转换

ADD-ON: 两种类型的执行者。

通常,并行工作系统可以表示为图形,其中活动节点相互发送消息。在 Java 中,就像大多数其他主流语言一样,活动节点(执行者)可以实现为线程或由线程池执行的任务(Runnable 或 Callable)。通常,部分执行者是线程,部分是任务。这两种方法都有其优点和缺点,因此对于系统中的每个执行者选择最适当的实现非常重要。 简而言之,线程可以阻塞(并等待事件),但消耗很多内存用于它们的堆栈。任务可能不会阻塞,但使用共享堆栈(线程池中的线程的堆栈)。

如果任务调用阻塞操作,则将一个池化线程排除在服务外。如果许多任务阻塞,则它们可能会排除所有线程,导致死锁-不能运行可以解除阻止的任务。这种死锁称为线程饥饿。如果为了防止线程饥饿而将线程池配置为无限制,则只需将任务转换为线程,从而失去任务的优势。

为了消除任务中调用阻塞操作的电话,应将任务分为两个(或更多)部分-第一个任务调用阻塞操作并退出,其余部分格式化为异步任务,在阻塞操作完成时启动。当然,阻塞操作必须具有替代异步接口。因此,例如,应使用NIO或NIO2库而不是同步读取套接字。

不幸的是,标准Java库缺乏流行同步设施的异步对应项,如队列和信号量。幸运的是,它们很容易从头开始实现(请参见 Dataflow framework for Java以获取示例)。

因此,仅使用非阻塞任务进行计算是可能的,但会增加代码大小。明显的建议是在可能的情况下使用线程,仅将任务用于简单的大规模计算。


谢谢你的回答。我知道await的(不)优点,但有时它是必要的。所以这是一个必备功能。我们在一台机器上工作,所以不需要Akka的远程功能。持久消息队列只在特殊情况下需要(与事件/消息无关)。结论:我们不需要Akka和JMS的特殊功能。我真正不理解你列表中的是演员的本质。框架如何处理这些?(而且如何在没有单独线程的情况下处理异步事件?!) - Benjamin M
也许你(或其他人)可以评论一下Spring的ApplicationEventReactor之间的区别。据我所知,它们基本上做相同的事情,但是为什么Spring Foundation要创建另一个项目(Reactor)?编辑:我还附加了一些用例。 - Benjamin M
你的插件是一个非常好的解释。幸运的是,我在大学参加了“并行计算”课程。否则,跟随你的文本会很困难。;) 就我所知,Java的标准库包含一些基本的信号量。但是据我记得,我的教授告诉我,那些Java信号量的行为不像真正的信号量,并提供破坏信号量概念的功能... - Benjamin M
@Benjamin 这取决于信号量所代表的内容。通常它代表资源计数器。一些资源管理策略只允许添加和删除资源(表示为锁定和释放),而其他策略则允许设置任意值。 - Alexei Kaigorodov
是的,也许我把一些东西搞混了。已经超过两年了,自从我在大学学习这门课程以来。我只记得它与Java和并发有关。也许这是Java的信号量底层实现,甚至像synchronized之类的其他东西......我记不起来了 :-D - Benjamin M
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接