在单线程应用中,就代码清晰度而言,EventBus/PubSub与响应式扩展(RX)有何区别?

39

目前,我正在使用Scala(和JavaFX)的EventBus/PubSub架构/模式来实现一个简单的笔记整理应用程序(有点像一个带有一些思维导图功能的Evernote客户端),我必须说,我真的很喜欢EventBus而不是观察者模式。

以下是一些EventBus库:

https://code.google.com/p/guava-libraries/wiki/EventBusExplained

http://eventbus.org(目前似乎已经失效),这是我在我的实现中使用的一个。

http://greenrobot.github.io/EventBus/

这里是EventBus库的比较: http://codeblock.engio.net/37/

EventBus与发布-订阅模式有关。

但是!

最近,我参加了Coursera的响应式课程,开始想知道在单线程应用程序中是否使用RXJava代替EventBus会使事件处理代码更加简单?

我想问一下那些使用过两种技术(某种类型的事件总线库某种形式的响应式扩展(RX))进行编程的人的经验:如果没有使用多个线程,使用RX比事件总线架构更容易解决事件处理复杂性吗?

我正在询问这个问题,因为我在Coursera上的响应式讲座中听说RX比使用观察者模式(即没有“回调地狱”)产生更干净的代码,但是我没有找到事件总线架构与RXJava之间的任何比较。所以很明显,事件总线和RXJava都比观察者模式更好,但是在单线程应用程序方面,在代码清晰性和可维护性方面,哪个更好
如果我理解正确,RXJava的主要卖点是如果存在阻塞操作(例如等待服务器响应),它可以用于生成响应式应用程序。
但是我根本不关心异步性,我只关心在单线程应用程序中保持代码清洁,松散并且易于推理。
在这种情况下,仍然最好使用RXJava而不是EventBus吗?
我认为EventBus会是一个更简单、更清晰的解决方案,我不明白为什么在单线程应用程序中,我应该使用RXJava而不是一个简单的EventBus架构。
但是我可能是错的!
如果我错了,请纠正我,并解释为什么在没有阻塞操作的情况下,在单线程应用程序中使用RXJava比简单的EventBus更好。

2
你可能想联系ReactFX的创始人Tomas Mikula。 - jewelsea
1
scalafx(以及javafx)有自己的Observable类(实际上,ObservableValue更接近于RX的observable)。我目前正在研究在两者之间创建适配器。如果这是可能的,你可以简单地使用scalafx的bind指令,这非常好且声明性强! - Luciano
2
我认为事件流(无论是RX还是ReactFX)可以提高代码的清晰度,因为1)事件总线是(或感觉像)全局对象,2)事件流配备了方便的流操作符来过滤/转换/组合事件,当正确使用时,可以封装大量状态,减少应用程序中的可变状态和副作用。如果您发布一些具体问题或代码,我们可能可以提供更多建议。 - Tomas Mikula
2
当您有一个组件A消费由组件B产生的事件时,它们通过事件总线的耦合是间接的,因此不太清晰。a.events().subscribe(b::handle)比在某个地方注册eventBus.register(b)和在另一个地方发布eventBus.post(evt)更加明确。此外,生产者的API没有说明它发布到事件总线的事件类型。另一方面,如果您有一个组件具有返回EventStream<E>和另一个返回EventStream<F>的方法,则清楚该组件生成类型为E和类型为F的事件。 - Tomas Mikula
1
你不一定需要依赖注入,至少在使用ReactFX时不需要。一个产生事件的组件将会创建一个事件流而不是将其作为依赖项获取。一个响应事件的组件可以仅暴露事件处理方法,或者一个事件汇(sink),用于推送事件。生产者和消费者之间的连接是在封装这两个组件的组件中设置的。 - Tomas Mikula
显示剩余10条评论
4个回答

30
以下是我认为在单线程同步应用程序中使用响应式事件流的好处:
1. 更加声明式,副作用更少,可变状态更少。
事件流能够封装逻辑和状态,潜在地使您的代码不受副作用和可变变量的影响。
考虑一个计算按钮点击次数并将点击次数显示为标签的应用程序。
普通Java解决方案:
private int counter = 0; // mutable field!!!

Button incBtn = new Button("Increment");
Label label = new Label("0");

incBtn.addEventHandler(ACTION, a -> {
    label.setText(Integer.toString(++counter)); // side-effect!!!
});

ReactFX解决方案:
Button incBtn = new Button("Increment");
Label label = new Label("0");

EventStreams.eventsOf(incBtn, ACTION)
        .accumulate(0, (n, a) -> n + 1)
        .map(Object::toString)
        .feedTo(label.textProperty());

没有使用可变变量,对 label.textProperty() 的副作用赋值被隐藏在一个抽象之后。
在他的硕士论文中,Eugen Kiss 提出了将 ReactFX 与 Scala 集成的方案。使用他的集成,解决方案可能如下所示:
val incBtn = new Button("Increment")
val label = new Label("0")

label.text |= EventStreams.eventsOf(incBtn, ACTION)
    .accumulate(0, (n, a) => n + 1)
    .map(n => n.toString)

这与之前的相等,额外的好处是消除了控制反转。

2. 消除故障和冗余计算的方法(仅限ReactFX)

故障是observable状态中的临时不一致。ReactFX有一些手段来暂停事件传播,直到对象的所有更新都已处理,从而避免故障和冗余更新。特别是,请查看可暂停事件流, 指示器, InhiBeans我关于InhiBeans的博客文章。这些技术依赖于事件传播是同步的事实,因此不能转换为rxJava。

3. 事件生产者和事件消费者之间的清晰连接。

事件总线是一个全局对象,任何人都可以发布和订阅。事件生产者和事件消费者之间的耦合是间接的,因此不够清晰。在响应式事件流中,生产者和消费者之间的耦合更加明确。比较如下: 事件总线:
class A {
    public void f() {
        eventBus.post(evt);
    }
}

// during initialization
eventBus.register(consumer);
A a = new A();

The relationship between a and consumer is not clear from looking at just the initialization code. 事件流:
class A {
    public EventStream<MyEvent> events() { /* ... */ }
}

// during initialization
A a = new A();
a.events().subscribe(consumer);

The relationship between a and consumer is very clear.
Events published by an object are reflected in its API.
Using the example from the previous section, in the event bus sample, the API of A does not indicate which events are published by instances of A. However, in the event streams sample, the API of A specifies that instances of A publish events of type MyEvent.

9
关于您提到的第三点,我认为“松散的”事件总线的优点在于其灵活性,而不是劣势。从消费者的角度来看,谁发布事件并不重要。反之,在发布者的角度看,他并不关心谁接收它 - 任何想要听取和做出反应的人都可以。这就是发布/订阅架构的美妙之处。至少,这是我的理解。 - Jiho Han
2
@JihoHan 你所说的关于我在第3点中的例子仍然是正确的:A 不关心生产的事件是否有人消费,而 consumer 不关心谁产生了事件。添加的好处是连线代码确切地知道 A 能够产生类型为 MyEvent 的事件,而 consumer 能够消费它们。这给你更多的类型安全性。过多的自由(例如全局事件总线)会减少对代码推理的能力。 - Tomas Mikula
1
@TomasMikula 完全同意你的观点 - 我有时候感觉一个执行不好的总线就像是在你的代码中乱扔全局变量... 一个执行得好的总线并不是问题,但我发现使用 Observables 更容易做到良好的执行。 - SJoshi

5

我认为你需要使用RxJava,因为它提供了更多的灵活性。 如果你需要一个“bus”,你可以像这样使用枚举:

public enum Events {

  public static PublishSubject <Object> myEvent = PublishSubject.create ();
}

//where you want to publish something
Events.myEvent.onNext(myObject);

//where you want to receive an event
Events.myEvent.subscribe (...);

.


为什么要使用枚举而不是类? - cyroxis
1
@cyroxis 你使用“枚举模式”是因为它是Java中实现单例的常用方式。但在这种特殊情况下,枚举是final的并禁止实例化,因此您不需要编写私有构造函数。 - Christian Ullenboom
1
@ChristianUllenboom 这不是一个单例,只是一个可以放在类或接口中的静态字段。此外,我不会说Enum是通常的方法,因为它不支持延迟实例化,这在加载时间很重要时(例如Android)可能很重要。 - cyroxis

1

我自从2年前提出这个问题以来学到了一两件事情,以下是我的现有理解(如Stephen的FRP book所述):

它们都试图帮助描述状态机,即描述程序状态如何响应事件而发生变化。

EventBus和FRP之间的关键区别在于组合性

  • 什么是组合性?

    • 函数式编程是组合性的。我们都可以同意这一点。我们可以将任何纯函数与其他纯函数组合,得到的是一个更复杂的纯函数。
    • 组合性意味着当您声明某些内容时,在声明实体的位置定义了所有行为
  • FRP是描述状态机的组合方式,而事件总线不是。为什么?

    • 它描述了一个状态机。
    • 它是组合性的,因为使用纯函数和不可变值进行描述。
  • 事件总线不是描述状态机的组合方式。为什么不是?

    • 您不能采取任何两个事件总线并以描述组合状态机的方式将它们组合起来得到新的事件总线。为什么不是?
      • 事件总线不是头等公民(与FRP的事件/流相对)。
        • 如果尝试使事件总线成为头等公民会发生什么?
          • 那么你会得到类似于FRP/RX的东西。
      • 由事件总线影响的状态不是
        • 头等公民(即引用透明,纯值与FRP的行为/单元格相对)
        • 以声明式/函数式方式绑定到事件总线,而是通过事件处理触发的命令式修改状态

总之,EventBus不是组合的,因为组合的EventBus的意义和行为(即受该组合的EventBus影响的状态的时间演变)取决于时间(即软件中未明确包含在组合EventBus声明中的部分的状态)。换句话说,如果我尝试声明一个组合的EventBus,那么仅通过查看组合的EventBus声明就无法确定管理受组合的EventBus影响的状态演变的规则,这与FRP相反,后者可以做到这一点。


1
根据我上面的评论,JavaFx有一个名为ObservableValue的类,它有点对应于RX Observable(可能更精确地说是ConnectableObservable,因为它允许多个订阅)。我使用以下隐式类将RX转换为JFX,像这样:
import scala.collection.mutable.Map
import javafx.beans.InvalidationListener
import javafx.beans.value.ChangeListener
import javafx.beans.value.ObservableValue
import rx.lang.scala.Observable
import rx.lang.scala.Subscription

/**
 * Wrapper to allow interoperability bewteen RX observables and JavaFX
 * observables. 
 */
object JfxRxImplicitConversion {
  implicit class JfxRxObservable[T](theObs : Observable[T]) extends ObservableValue[T] { jfxRxObs =>
    val invalListeners : Map[InvalidationListener,Subscription] = Map.empty
    val changeListeners : Map[ChangeListener[_ >: T],Subscription] = Map.empty
    var last : T = _
    theObs.subscribe{last = _}

    override def getValue() : T = last 

    override def addListener(arg0 : InvalidationListener) : Unit = {
      invalListeners += arg0 -> theObs.subscribe { next : T => arg0.invalidated(jfxRxObs) }
    }

    override def removeListener(arg0 : InvalidationListener) : Unit = {
      invalListeners(arg0).unsubscribe
      invalListeners - arg0
    }

    override def addListener(arg0 : ChangeListener[_ >: T]) : Unit = {
      changeListeners += arg0 -> theObs.subscribe { next : T => arg0.changed(jfxRxObs,last,next) }
    }

    override def removeListener(arg0 : ChangeListener[_ >: T]) : Unit = {
      changeListeners(arg0).unsubscribe
      changeListeners - arg0
    }
  }
}

然后你可以像这样使用属性绑定(这是ScalaFX,但在JavaFX中对应Property.bind):

new Label {
    text <== rxObs
}

其中 rxObs 可以是例如:

val rxObs : rx.Observable[String] = Observable.
  interval(1 second).
  map{_.toString}.
  observeOn{rx.lang.scala.schedulers.ExecutorScheduler(JavaFXExecutorService)} 

这只是一个简单的计数器,每秒钟自增一次。请记得导入隐式类。我无法想象会有比这更简洁的了!

由于需要使用与JavaFx兼容的调度程序,上述内容有些复杂。请参考this问题,获取JavaFXExecutorService的实现Gist链接。Scala RX提出了一个增强请求,将其转换为隐式参数,因此在未来您可能不需要.observeOn调用。


谢谢您的回答,但这并不是我真正想问的问题。 - jhegedus

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接