如何通过Runnable创建Observable?

3
有时,我想在我的Observable序列中触发Runnable,但是Runnable不报告进度。 我编写了一个简单的工厂将Runnable对象包装成Observable:
public static <T> Observable<T> fromRunnable(final Runnable action) {
    if (action == null) {
        throw new NullPointerException("action");
    }
    return Observable.fromPublisher(subscriber -> {
        try {
            action.run();
            subscriber.onComplete();
        } catch (final Throwable throwable) {
            subscriber.onError(throwable);
        }
    });
}

使用方法:

Observable.concat(
    someTask, 
    MoreObservables.fromRunnable(() -> {
        System.out.println("Done. ");
    }));

但是RxJava 2已经提供了这个功能吗?
1个回答

6

没有Observable的工厂方法,但是可以从Runnable创建Completable。所以你可以先创建一个Completable,然后将其转换为Observable

Observable.concat(
    someTask, 
    Completable.fromRunnable(() -> {
        System.out.println("Done");
    }).toObservable()
);

更新: 处理异常

Completable.fromRunnable 内部会捕获其 Runnable 中的异常,并将其作为 onError 发射到流中。然而,如果您正在使用 Java,则必须自己处理 run() 方法中的受检异常。为避免这种情况,您可以使用 Callable 来替代 Runnable,因为它的 call() 方法声明可以抛出异常。Completable.fromCallable() 同样将异常封装为 onError 发射:

Observable.concat(
    someTask, 
    Completable.fromCallable(() -> {
        System.out.println("Done");
        return null;
    }).toObservable()
);

此外,Callable 可以用来创建只发出单个项目的 ObservableSingle

P.S. 查看源代码,这些方法非常简单明了。

P.P.S. Kotlin 没有受检异常 ;)。


更新 2

还有一个用于创建 CompletablefromAction 工厂方法。它接受 Action 对象。

类似于 Runnable 的函数接口,但允许抛出已检查的异常。

因此,代码可以简化为:

Observable.concat(
    someTask, 
    Completable.fromAction(() -> {
        System.out.println("Done");
    }).toObservable()
);

有没有其他选项可以捕获Runnable可能抛出的任何Throwable - sdgfsdh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接