RxJava线程安全

9

这段代码是否线程安全?

Observable<String> observable = ... // some observable that calls
                                    // onNext from a background thread

observable
  .scan(new ArrayList<String>(), (List<String> acc, String next) -> {
    acc.add(next);
    return acc;
  })
  .subscribe( list -> {
    // do somethind with sequence of lists
    ...
  });

我很好奇为什么ArrayList不是线程安全的数据结构。


Rx 设计指南非常有帮助:http://go.microsoft.com/fwlink/?LinkID=205219。 - James Moore
可能是Is SerializedSubject necessary for thread-safety in RxJava的重复问题。 - James Moore
2个回答

6
作为快速回答,在.NET(原始Rx实现)中,可以假定来自可观察序列的所有值都是顺序的。这并不排除它是多线程的。但是,如果您以多线程方式生成值,则可能需要查找与.NET Synchronize() Rx运算符相当的函数,以强制执行顺序性。
另一个选项是检查RxJava源代码中的Scan实现,以验证它是否强制执行你想要/期望的顺序性,以在累加器函数中提供安全性。

在RxJava中,scan的功能与.Net原始版本完全相同。 - allprog
干杯。我主要是这么说的,因为随着时间的推移,我不能确定.NET和Java代码库是否会相同。 - Lee Campbell

4
如果这段代码不是线程安全的,那么要么RxJava有问题,要么你的Observable源有问题 - 操作符不可重入是Rx协议的一部分。

如果RxJava实现了,这可以通过Rx的Checked()观察者运算符来强制执行。拉请求,有人吗? - Mike Strobel
在我看来,块的线程安全性并不那么简单。在帖子中展示的代码并没有包含太多细节。我猜 @orionll 没有注意到的是订阅默认是同步传递的。但如果它被转换为异步订阅,那么访问数组列表将不再安全。 - allprog
1
@allprog 你说得对,这并不简单,但这是 Rx 的优势之一,它在幕后完成工作以确保安全。 - Ana Betts
我想说的是,一旦程序员能够在可以操作全局状态的块中插入任意函数,那么不知道完整上下文情况下,线程安全就不再是一个可判定的问题。scan的源代码表明,只要初始列表支持并发或者不在全局上下文中访问,它就是安全的。但是这个语句仅仅指向单个对象。如果该块包含全局应用程序状态的管理,则仅仅查看块的代码就很难确定线程安全性。 - allprog

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接