如何正确停止rxjava Flowable?

4

I have the following code structure

Service

public Flowable entryFlow()
{
    return Flowable.fromIterable(this::getEntries)
}

消费者

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    disposable.dispose();
}

private void onError(Throwable e)
{
    subscriptionFinished();
}

private void subscriptionFinished()
{
    //
}

当调用stop方法时,我需要一种停止flowable获取和发射数据的方法。

通过以下操作,我注意到doOnCancel lambda并不总是被调用。

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .doOnCancel(this::snapshotFinished)
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    disposable.dispose();
}

替代方案可以是:
volatile stopped;

void start()
{
    disposable = service
        .entryFlow()
        .observeOn(Schedulers.computation())
        .subscribeOn(Schedulers.computation())
        .takeUntil(x -> stopped)
        .subscribe(
            entry -> ...,
            this::onError,
            this::subscriptionFinished);
}

void stop()
{
    stopped = true;
}

什么样的开始和结束实现方式被推荐,以便让Flowable停止发出事件并调用onComplete或类似方法(例如doOnCancel操作)?
稍后编辑:
为了让我的用户案例更简短,
只需要调用disposable.dispose来停止从iterable获取数据并将数据发送到source吗?我只有一个订阅者,并且需要在Flowable结束时无论原因如何都调用onComplete/onError/其他回调。
其他回调指的是doOnCancel/doFinally等之一。
谢谢

doOnCancel 不被调用的唯一原因是流已经在那个时候完成了。为什么你只需要对取消做出反应?你可以考虑使用 doFinally 来对终止和取消做出反应。 - akarnokd
doFinally听起来很有趣,但从javadoc中我理解它将会在onComplete / orError之外被调用。我不需要真正知道流结束的原因(完成,错误,取消),但我希望最终动作只执行一次。 - user4132657
在后续编辑部分中添加了更紧凑的问题描述。 - user4132657
1个回答

2
我建议使用dispose()方法。然后只需添加doOnDispose以触发您的副作用代码。

有更好的方法吗? - Omer
2
@Appyx,Flowable没有doOnDipose()方法。 - Hassan Alizadeh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接