如何停止和恢复Observable.interval的发射操作?

43

这将每5秒发出一个“tick”信号。

Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
            .subscribe(tick -> Log.d(TAG, "tick = "+tick));

要阻止它,您可以使用

Schedulers.shutdown();

但后来所有的调度器都停止了,而且无法稍后恢复。我该如何“优雅地”停止和恢复发射?


我认为最好的答案应该是这个:https://dev59.com/oZTfa4cB1Zd3GeqPUrYJ?answertab=votes#tab-top,其中使用了Scan(),因此不需要外部累加值(如最佳答案中所建议的)。 - Pedro Lopes
7个回答

44

这里是一个可能的解决方案:

class TickHandler {

    private AtomicLong lastTick = new AtomicLong(0L);
    private Subscription subscription;

    void resume() {
        System.out.println("resumed");
        subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io())
                                 .map(tick -> lastTick.getAndIncrement())
                                 .subscribe(tick -> System.out.println("tick = " + tick));
    }

    void stop() {
        if (subscription != null && !subscription.isUnsubscribed()) {
            System.out.println("stopped");
            subscription.unsubscribe();
        }
    }
}

如果我正确理解您的问题,Observable.interval 是一个冷的可观察对象,因此取消订阅它会停止其发射。同时,您可能可以依赖于 Subscription 的实现(或 RxJava 2 中的 Disposable)在 unsubscribe() (dispose()) 上也丢弃对可观察对象的引用。或者,您可以将 subscription 引用设置为 null,并使其与其中存储的可观察对象一起成为 GC-eligible。 - AndroidEx
1
这里有一个好奇的问题。将lastTick声明为原子变量很重要吗?我认为所有的volatile和原子操作都是由Rx直接处理的。 - Dan Chaltiel
2
一般来说,当多个线程写入一个依赖于字段先前值的字段值时,volatile 提供的保证不够强,需要通过可用的 Java 并发 API 进行同步或安排访问。更多关于 volatile 的信息请参见 http://tutorials.jenkov.com/java-concurrency/volatile.html#when-is-volatile-enough。 - AndroidEx
实际上,RxJava 中并没有暂停的功能,只需取消订阅,然后再重新订阅即可。 - thecr0w

30

前段时间,我也在寻找类似 RX “计时器” 解决方案,但是没有一个能够满足我的期望。因此,我想和大家分享我的解决方案:

AtomicLong elapsedTime = new AtomicLong();
AtomicBoolean resumed = new AtomicBoolean();
AtomicBoolean stopped = new AtomicBoolean();

public Flowable<Long> startTimer() { //Create and starts timper
    resumed.set(true);
    stopped.set(false);
    return Flowable.interval(1, TimeUnit.SECONDS)
            .takeWhile(tick -> !stopped.get())
            .filter(tick -> resumed.get())
            .map(tick -> elapsedTime.addAndGet(1000));
}

public void pauseTimer() {
    resumed.set(false);
}

public void resumeTimer() {
    resumed.set(true);
}

public void stopTimer() {
    stopped.set(true);
}

public void addToTimer(int seconds) {
    elapsedTime.addAndGet(seconds * 1000);
}

3
我无法相信没有人给这个答案点赞。被接受的答案在每次暂停后创建一个新的Observable,而且不适应lambda中的最终字段(您必须使用类字段)。这是一种更好的解决方案。 - Myles Bennett
2
这是唯一一个可以在生产代码中使用的答案。 - Andrii Kovalchuk
1
我很好奇为什么选择了Flowable而不是Observable。这需要订阅用户明确请求值,对吗? - Robert Lewis
@RobertLewis 不需要请求值。Flowable 默认支持背压,这就是原因。 - Artur Szymański
1
我有点困惑。我的理解是,Flowable在调用request(n)之前不会发出任何东西。为什么不使用Observable呢?请解释一下。 - Robert Lewis

11
val switch = new java.util.concurrent.atomic.AtomicBoolean(true)
val tick = new java.util.concurrent.atomic.AtomicLong(0L)

val suspendableObservable = 
  Observable.
    interval(5 seconds).
    takeWhile(_ => switch.get()).
    repeat.
    map(_ => tick.incrementAndGet())

您可以将switch设置为false来暂停计时,将其设置为true以恢复计时。


这个重复有钥匙。谢谢。 - VictorPurMar

2
抱歉这里使用的是RxJS而不是RxJava,但概念相同。这段内容源自learn-rxjs.io,在codepen上也有演示。
其思路是有两个点击事件流,其中一个是startClick$,另一个是stopClick$。在stopClick$上每次点击都会映射到一个空的observable流,而在startClick$上的每个点击都会映射到interval$流。然后,将这两个结果流合并成一个嵌套的observable流。换言之,每次点击时,merge都会从这两种类型中选择一种新的observable流进行发射。所得到的observable流经过switchMap后,开始监听最新的observable流,并停止监听先前的observable流。而switchmap还会将新的observable流与其现有流中的值合并。
经过switch后,scan只会看到由interval$发出的“increment”值,当单击“停止”时不会看到任何值。
在第一次单击之前,startWith将从$interval开始发出值,以推动事情的发展:
const start = 0;
const increment = 1;
const delay = 1000;
const stopButton = document.getElementById('stop');
const startButton = document.getElementById('start');
const startClick$ = Rx.Observable.fromEvent(startButton, 'click');
const stopClick$ = Rx.Observable.fromEvent(stopButton, 'click');
const interval$ = Rx.Observable.interval(delay).mapTo(increment);
const setCounter = newValue => document.getElementById("counter").innerHTML = newValue;
setCounter(start);

const timer$ = Rx.Observable

    // a "stop" click will emit an empty observable,
    // and a "start" click will emit the interval$ observable.  
    // These two streams are merged into one observable.
    .merge(stopClick$.mapTo(Rx.Observable.empty()), 
           startClick$.mapTo(interval$))

    // until the first click occurs, merge will emit nothing, so 
    // use the interval$ to start the counter in the meantime
    .startWith(interval$)

    // whenever a new observable starts, stop listening to the previous
    // one and start emitting values from the new one
    .switchMap(val => val)

    // add the increment emitted by the interval$ stream to the accumulator
    .scan((acc, curr) => curr + acc, start)

    // start the observable and send results to the DIV
    .subscribe((x) => setCounter(x));

这里是HTML代码

<html>
<body>
  <div id="counter"></div>
  <button id="start">
    Start
  </button>
  <button id="stop">
    Stop
  </button>
</body>
</html>

1
这是另一种方法,我认为。
当您检查源代码时,您会发现interval()使用类OnSubscribeTimerPeriodically。以下是关键代码。
@Override
public void call(final Subscriber<? super Long> child) {
    final Worker worker = scheduler.createWorker();
    child.add(worker);
    worker.schedulePeriodically(new Action0() {
        long counter;
        @Override
        public void call() {
            try {
                child.onNext(counter++);
            } catch (Throwable e) {
                try {
                    worker.unsubscribe();
                } finally {
                    Exceptions.throwOrReport(e, child);
                }
            }
        }

    }, initialDelay, period, unit);
}

因此,如果你想取消循环,那么在onNext()中抛出一个新的异常如何?以下是示例代码。

Observable.interval(1000, TimeUnit.MILLISECONDS)
            .subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.i("abc", "onNext");
                    if (aLong == 5) throw new NullPointerException();
                }
            }, new Action1<Throwable>() {
                @Override
                public void call(Throwable throwable) {
                    Log.i("abc", "onError");
                }
            }, new Action0() {
                @Override
                public void call() {
                    Log.i("abc", "onCompleted");
                }
            });

然后你会看到这个:
08-08 11:10:46.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:47.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:48.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:49.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:50.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.008 28146-28181/net.bingyan.test I/abc: onNext
08-08 11:10:51.018 28146-28181/net.bingyan.test I/abc: onError             

4
异常应该仅用于处理异常事件,而不是实现逻辑!这会创建不必要的对象,计算负担很重,并且语法上容易混淆。 - Luca S.

1
你可以使用 takeWhile 并循环直到条件为真。
Observable.interval(1, TimeUnit.SECONDS)
        .takeWhile {
            Log.i(TAG, " time " + it)
            it != 30L
        }
        .subscribe(object : Observer<Long> {
            override fun onComplete() {
                Log.i(TAG, "onComplete " + format.format(System.currentTimeMillis()))
            }

            override fun onSubscribe(d: Disposable) {
                Log.i(TAG, "onSubscribe " + format.format(System.currentTimeMillis()))
            }

            override fun onNext(t: Long) {
                Log.i(TAG, "onNext " + format.format(System.currentTimeMillis()))
            }

            override fun onError(e: Throwable) {
                Log.i(TAG, "onError")
                e.printStackTrace()
            }

        });

0

@AndroidEx,这是一个很棒的答案。我用了一种稍微不同的方法:

private fun disposeTask() {
    if (disposeable != null && !disposeable.isDisposed)
      disposeable.dispose()
  }

 private fun runTask() {
    disposeable = Observable.interval(0, 30, TimeUnit.SECONDS)
.flatMap {
        apiCall.runTaskFromServer()
.map{

when(it){
is ResponseClass.Success ->{
keepRunningsaidTasks()
}
is ResponseClass.Failure ->{
disposeTask() //this will stop the task in instance of a network failure.
}
}

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接