你好,我有一个 rxjava 的 flatMap,其中我想调用一个协程 use case onStandUseCase,这是一个 api 调用。
最初,该 use case 也是基于 rxjava 的,它返回了 Observable<GenericResponse>
,并且工作正常。
现在我改变了使用方式,变为基于协程的,它只返回GenericResponse
请问如何修改 flatMap 以使其与协程 use case 正常工作?
subscriptions += view.startFuellingObservable
.onBackpressureLatest()
.doOnNext { view.showLoader(false) }
.flatMap {
if (!hasOpenInopIncidents()) {
//THIS IS WHERE THE ERROR IS IT RETURNS GENERICRESPONSE
onStandUseCase(OnStandUseCase.Params("1", "2", TimestampedAction("1", "2", DateTime.now()))) {
}
} else {
val incidentOpenResponse = GenericResponse(false)
incidentOpenResponse.error = OPEN_INCIDENTS
Observable.just(incidentOpenResponse)
}
}
.subscribe(
{ handleStartFuellingClicked(view, it) },
{ onStartFuellingError(view) }
)
OnStandUseCase.kt
class OnStandUseCase @Inject constructor(
private val orderRepository: OrderRepository,
private val serviceOrderTypeProvider: ServiceOrderTypeProvider
) : UseCaseCoroutine<GenericResponse, OnStandUseCase.Params>() {
override suspend fun run(params: Params) = orderRepository.notifyOnStand(
serviceOrderTypeProvider.apiPathFor(params.serviceType),
params.id,
params.action
)
data class Params(val serviceType: String, val id: String, val action: TimestampedAction)
}
使用案例协程
abstract class UseCaseCoroutine<out Type, in Params> where Type : Any {
abstract suspend fun run(params: Params): Type
operator fun invoke(params: Params, onResult: (type: Type) -> Unit = {}) {
val job = GlobalScope.async(Dispatchers.IO) { run(params) }
GlobalScope.launch(Dispatchers.Main) { onResult(job.await()) }
}
}
startFuellingObservable 是
val startFuellingObservable: Observable<Void>
这里是错误的图片
请问有什么建议可以修复这个问题呢?
提前感谢 R
rxSingle { ... }.toObservable()
。由于您不需要观察者,因此可以使用concatMapSingle
代替flatMap
。 - George LeungObservable#concatMapSingle
返回一个Observable
。当我说“不需要 observables”时,我的意思是挂起函数对应的是一个Single
而不是一个Observable
。 - George Leung