如何从RxJava flatMap中调用协程用例

7

你好,我有一个 rxjava 的 flatMap,其中我想调用一个协程 use case onStandUseCase,这是一个 api 调用。

最初,该 use case 也是基于 rxjava 的,它返回了 Observable<GenericResponse>,并且工作正常。 现在我改变了使用方式,变为基于协程的,它只返回GenericResponse

请问如何修改 flatMap 以使其与协程 use case 正常工作?

        subscriptions += view.startFuellingObservable
        .onBackpressureLatest()
        .doOnNext { view.showLoader(false) }
        .flatMap {
            if (!hasOpenInopIncidents()) {
                //THIS IS WHERE THE ERROR IS IT RETURNS GENERICRESPONSE
                onStandUseCase(OnStandUseCase.Params("1", "2", TimestampedAction("1", "2", DateTime.now()))) {
                    
                }
               
            } else {
                val incidentOpenResponse = GenericResponse(false)
                incidentOpenResponse.error = OPEN_INCIDENTS
                Observable.just(incidentOpenResponse)
            }
        }
        .subscribe(
            { handleStartFuellingClicked(view, it) },
            { onStartFuellingError(view) }
        )

OnStandUseCase.kt

class OnStandUseCase @Inject constructor(
    private val orderRepository: OrderRepository,
    private val serviceOrderTypeProvider: ServiceOrderTypeProvider
) : UseCaseCoroutine<GenericResponse, OnStandUseCase.Params>() {

    override suspend fun run(params: Params) = orderRepository.notifyOnStand(
        serviceOrderTypeProvider.apiPathFor(params.serviceType),
        params.id,
        params.action
    )

    data class Params(val serviceType: String, val id: String, val action: TimestampedAction)
}

使用案例协程

abstract class UseCaseCoroutine<out Type, in Params> where Type : Any {

    abstract suspend fun run(params: Params): Type

    operator fun invoke(params: Params, onResult: (type: Type) -> Unit = {}) {
        val job = GlobalScope.async(Dispatchers.IO) { run(params) }
        GlobalScope.launch(Dispatchers.Main) { onResult(job.await()) }
    }
}

startFuellingObservable 是


val startFuellingObservable: Observable<Void>

这里是错误的图片

输入图像描述这里

请问有什么建议可以修复这个问题呢?

提前感谢 R


使用集成库,然后编写rxSingle { ... }.toObservable()。由于您不需要观察者,因此可以使用concatMapSingle代替flatMap - George Leung
你好 @GeorgeLeung,startFuellingObservable 仍然是一个 observable,我不想改变那一面。 - BRDroid
Observable#concatMapSingle 返回一个 Observable。当我说“不需要 observables”时,我的意思是挂起函数对应的是一个 Single 而不是一个 Observable - George Leung
请问您能否建议一下我需要如何修改我的代码呢?我对此还比较陌生。 - BRDroid
1个回答

6

这是一个链接RxJava和Kotlin协程的集成库

rxSingle可以用来将挂起函数转换为Single。 OP希望得到一个Observable,因此我们可以调用toObservable()进行转换。

.flatMap {
    if (!hasOpenInopIncidents()) {
        rxSingle {
            callYourSuspendFunction()
        }.toObservable()
    } else {
        val incidentOpenResponse = GenericResponse(false)
        incidentOpenResponse.error = OPEN_INCIDENTS
        Observable.just(incidentOpenResponse)
    }
}

注意,两个分支中的Observable仅包含一个元素。我们可以使用Observable#concatMapSingle使这一事实更加明显。
.concatMapSingle {
    if (!hasOpenInopIncidents()) {
        rxSingle { callYourSuspendFunction() }
    } else {
        val incidentOpenResponse = GenericResponse(false)
        incidentOpenResponse.error = OPEN_INCIDENTS
        Single.just(incidentOpenResponse)
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接