RxJava中的Observable转换为Completable,如何避免使用toBlocking()方法

4

我目前在使用Kotlin开发Android应用,使用RxJava技术。但是我遇到了一个问题,如果不使用toBlocking()方法就无法解决。

我在员工服务中有一个返回Observable>类型的方法:

fun all(): Observable<List<Employee>>

这很好,因为这个Observable在每次员工更改时都会发出新的员工列表。但是我想从员工生成PDF文件,显然不需要每次员工更改时都运行。此外,我想从我的PDF生成方法返回一个Completable对象。我想在我的PDF中添加一个标题,然后迭代员工并计算每个员工的工资,这也返回一个Observable,这就是我现在正在使用toBlocking的地方。我的当前方法是这样的:
private fun generatePdf(outputStream: OutputStream): Completable {
    return employeeService.all().map { employees ->
        try {
                addHeaderToPDF()
                for (i in employees) {
                    val calculated = employeeService.calculateWage(i.id).toBlocking().first()
                    // Print calculated to PDF....
                }
                addFooterToPDF()
                return @map Completable.complete()
            }
            catch (e: Exception) {
                return @map Completable.error(e)
            }
        }.first().toCompletable()

有没有使用RxJava让这段代码更加简洁的方法?
提前感谢!

不要使用for循环,也不要使用forEach。基本上所有的东西都必须是可观察的,包括addHeaderToPDFaddFooterToPDFcalculateWage应该返回一个Single,它要么返回一个值,要么出现错误。 - Eugen Pechanec
calculateWage 是一个 Observable,因为它会在员工数据发生变化时发出重新计算后的工资值。你能否给我展示一个正确的示例?我无法理清这个难题。 :( - szantogab
明白了,一开始就是一个可观察对象。我正在设计一个简洁的方案,请保持关注... - Eugen Pechanec
1
阅读此文:http://konmik.com/post/when_to_not_use_rxjava/ - Eugen Pechanec
2个回答

3

免责声明:本答案仍在完善中。


基本前提:如果在流中存在“阻塞”,那么您的操作是错误的。

注意:没有状态可以离开可观察的lambda表达式。

步骤1:将整个数据集作为流

输入是员工的流。对于每个员工,您需要获取一个工资。让我们将其组成一个流。

/**
 * @param employeesObservable
 * Stream of employees we're interested in.
 * @param wageProvider
 * Transformation function which takes an employee and returns a [Single] of their wage.
 * @return
 * Observable stream spitting individual [Pair]s of employees and their wages.
 */
fun getEmployeesAndWagesObservable(
        employeesObservable: Observable<Employee>,
        wageProvider: Function<Employee, Single<Int>>
): Observable<Pair<Employee, Int>>? {
    val employeesAndWagesObservable: Observable<Pair<Employee, Int>>

    // Each Employee from the original stream will be converted
    // to a Single<Pair<Employee, Int>> via flatMapSingle operator.
    // Remember, we need a stream and Single is a stream.
    employeesAndWagesObservable = employeesObservable.flatMapSingle { employee ->
        // We need to get a source of wage value for current employee.
        // That source emits a single Int or errors.
        val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee)

        // Once the wage from said source is loaded...
        val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage ->
            // ... construct a Pair<Employee, Int>
            employee to wage
        }

        // This code is not executed now. It will be executed for each Employee
        // after the original Observable<Employee> starts spitting out items.
        // After subscribing to the resulting observable.
        return@flatMapSingle employeeAndWageSingle
    }

    return employeesAndWagesObservable
}

订阅后会发生什么:

  1. 从源中获取一个员工。
  2. 获取员工的工资。
  3. 输出一对员工和他们的工资。

这将重复进行,直到 employeesObservable 发出 onComplete 或出现错误时发出 onError 信号。

使用的操作符:

  • flatMapSingle:将实际值转换为某些转换值的新 Single 流。
  • map:将实际值转换为其他实际值(无嵌套流)。

以下是如何将其连接到您的代码:

fun doStuff() {
    val employeesObservable = employeeService.all()
    val wageProvider = Function<Employee, Single<Int>> { employee ->
        // Don't listen to changes. Take first wage and use that.
        employeeService.calculateWage(employee.id).firstOrError()
    }

    val employeesAndWagesObservable = 
            getEmployeesAndWagesObservable(employeesObservable, wageProvider)

    // Subscribe...
}

使用的运算符:

  • first:从可观察对象中获取第一个项目并将其转换为单个流。
  • timeout:如果您通过网络获取工资,则最好 .timeout 它。

下一步

选项1:结束

不要订阅,调用

val blockingIterable = employeesAndWagesObservable.blockingIterable()
blockingIterable.forEach { ... }

同步处理每个项目。坐下来,想想接下来的步骤,看演示文稿,读例子。

选项2:添加图层

  1. .map每个Pair<Employee, Int>到一些抽象的PDF建筑块。
  2. 通过Observable.fromCallable { ... }将头部和底部打印机转换为Observables,它们也返回PDF构建块。
  3. 通过Observable.concat(headerObs, employeeDataObs, footerObs)以顺序方式合并所有这些内容。
  4. .subscribe此结果并开始将PDF构建块写入PDF编写器。
  5. 待办事项:
    • 找出一种在订阅时懒惰地初始化PDF编写器的方法(不是在构建流之前),
    • 在出现错误时删除输出,
    • 在完成或出现错误时关闭输出流。

不错,但在这种情况下使用可观察对象你能获得什么收益呢? - kar
1
@KarsenGauss 输入是一个可观察对象和每个员工的另一个可观察对象。在经过一周的rx之后,我尽可能地使用rx方式。这就是目标:不混合阻塞和非阻塞代码,尽可能长时间地使用流。 正如提到的另一步,将更多的阻塞代码转换为rx。 - Eugen Pechanec
1
你的回答很好,别误会。我只是想强调有时候退一步考虑非响应式方法可能是值得的。我认为对于@szantogab的问题,响应式解决方案并没有太多好处。 - kar
1
@KarsenGauss 你和我一样 :D 不过有一种方法可以在最后将流式传输到PDF文件中(对于syantogab而言)。并且在此过程中更好地掌握rx(对我而言)。 - Eugen Pechanec

0
我想到了这个:
    return employeeService.all().first()
            .doOnSubscribe { addHeaderToPDF() }
            .flatMapIterable { it }
            .flatMap { employeeService.calculateWage(it.id).first() }
            .doOnNext { printEmployeeWage(it) }
            .doOnCompleted { addFooterToPDF }
            .toCompletable()

这样做是正确的吗?:)


与此同时,我发现这样做行不通,因为在doOnNextdoOnCompleted闭包中,我需要访问在doOnSubscribe闭包中创建的PDF文件。怎么办?:S - szantogab

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接