RX、线程和执行器在多线程性能方面的表现差异

8

我正在使用 Kotlin 编写后端应用程序。

为了提高速度,我目前在服务器上依赖 RxKotlin 来执行诸如数据库调用和 API 调用等 IO 任务的并行执行。代码通常是这样的。

val singleResult1 = Single.fromCallable{
  database.get(....)
}.io()

val singleResult2 = Single.fromCallable{
  database.update(....)
}.io()

Single.zip(singleResult1, singleResult2){ result1: Result1, result2: Result2 ->
    ....
}
.flatMap{
  //other RX calls
}
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.blockingGet()

然而,由于Rx仅适用于单个事件(而非多个事件),因此它会使代码变得混乱,并且只会添加大量样板代码(如果我想返回空值,它还会导致复杂性,并有时会破坏堆栈跟踪)。

我正在考虑移除Rx并改用Executors(或线程)来实现并行。这里有任何性能问题需要考虑吗?

下面是我的思路示例:

fun <T> waitAll(tasks: List<Callable<T>>, threadCount: Int = -1): List<T> {
    val threads = if (threadCount == -1) tasks.size else threadCount
    val executor = Executors.newFixedThreadPool(threads)
    val results = executor.invokeAll(tasks).map {
        it.get()
    }
    executor.shutdown()
    return results
}

并且可以这样使用:

waitAll(listOf(callable1, callable2))

或者使用普通线程并将它们连接起来?

threads.forEach{
   it.start()
}
threads.forEach{
   it.join()
}

或者为什么不使用流(streams)呢?

listOf(callable1,callable2)
.parallelStream()
.map{it.call()}
.collect(Collectors.toList())

1
你的意思是:_Rx 感觉有点混乱,只是添加了一堆样板代码(如果我想返回空值,它也会引起复杂性,并且有时可能会弄乱堆栈跟踪)_? - bubbles
1
求求你,不要走线程路线。你需要有一个合适的任务抽象,而线程很快就会变得非常混乱。如果你没有遇到任何限制,流路线看起来更有前途。如果事情变得更加复杂,我可能会用协程来替换 RxKotlin:它具有与 RxKotlin 相同的范围,并且不仅限于 UI,正如你在其他地方所写的那样,但它感觉更符合惯例。 - Arvid Heise
2个回答

5
Java Executor服务使用线程,RxKotlin使用ExecutorServices,因此这些在后台都是相同的。区别在于软件架构。因此,如果你选择最好的架构与你的代码集成,它将发挥最佳作用并正确完成工作。简单就是最好的。
如果您拥有基于事件或可观察模式的架构,并且正在尝试实现一个新库以处理基于事件的操作或作业,则可能会编写有关作业分离或定时的错误步骤。使用RxKotlin,不要重新发明轮子。
如果你的工作与事件或可观察者模式无关,只需要执行并行作业,那么就使用Executor服务。使用RxKotlin就过度设计了。当你在这种情况下使用RxKotlin时,你需要做更多的事情,而不是你需要的。
因此,在这种情况下,我认为问题不是速度,而是架构。

4

KotlinRx本身使用执行器,但它有两个预创建的线程池:Schedulers.io()(无界)和Schedulers.computation()(受核心数限制),并且不像您建议的代码那样每次都会启动一个新线程池。当然,您也可以自己手动创建:

private val executor = Executors.newCachedThreadPool() // equivalent to io()

fun <T> waitAll(tasks: List<Callable<T>>): List<T> {
    return executor.invokeAll(tasks).map {
        it.get()
    }
}

通常情况下,相较于为每个任务创建一个线程,使用协程可以更好地重用现有线程,但这取决于您的使用情况。

在我看来,协程更适合处理后台/UI线程通信(例如Android等)

协程是否有用非常取决于任务中包含的内容。阻塞式API(例如JDBC)?它们是不适合的。异步API(例如Retrofit)?它们是适合的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接