RxJava2的Flowable在订阅者中没有调用onNext方法

3

我是rxjava的新手,遇到了Flowable的问题。当flatMap方法中调用onComplete方法时,订阅者中的onNext方法没有被调用。这里出了什么问题?有什么建议吗?

        ResourceSubscriber<List<Song>> subscriber = new ResourceSubscriber<List<Song>>() {
        @Override
        public void onStart() {
            Log.e(TAG, "onStart");
            mView.showProgress();
        }

        @Override
        public void onNext(List<Song> songs) {
            mView.onLocalMusicLoaded(songs);
            mView.emptyView(songs.isEmpty());
            Log.e(TAG, "onNext");
            //Not call here after onComplete called
        }

        @Override
        public void onError(Throwable t) {
            mView.hideProgress();
            Log.e(TAG, "onError: ", t);
        }

        @Override
        public void onComplete() {
            mView.hideProgress();
            Log.e(TAG, "onComplete");
        }
    };


    Disposable disposable = Flowable.just(cursor)
            .flatMap(new Function<Cursor, Publisher<List<Song>>>() {
                @Override
                public Publisher<List<Song>> apply(@NonNull Cursor cursor) throws Exception {
                    final List<Song> songs = new ArrayList<>();
                    if (cursor != null && cursor.getCount() > 0) {
                        cursor.moveToFirst();
                        do {
                            Song song = cursorToMusic(cursor);
                            songs.add(song);
                        } while (cursor.moveToNext());
                    }
                    return Flowable.create(new FlowableOnSubscribe<List<Song>>() {
                        @Override
                        public void subscribe(@NonNull FlowableEmitter<List<Song>> e) throws Exception {
                            e.onNext(songs);
                            e.onComplete();
                        }
                    }, BackpressureStrategy.LATEST);
                }
            })
            .doOnNext(new Consumer<List<Song>>() {
                @Override
                public void accept(@NonNull List<Song> songs) throws Exception {
                    Log.e(TAG, "onLoadFinished doOnNext: " + songs.size());
                    Collections.sort(songs, new Comparator<Song>() {
                        @Override
                        public int compare(Song left, Song right) {
                            return left.getDisplayName().compareTo(right.getDisplayName());
                        }
                    });
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(subscriber);

    mCompositeDisposable.add(disposable);
1个回答

2
ResourceSubscriber的Javadoc中得知:
默认情况下,onStart()请求Long.MAX_VALUE。为了请求自定义的正数,请覆盖该方法。使用受保护的request(long)来请求更多的项,使用dispose()onNext实现中取消序列。
    @Override
    public void onStart() {
        Log.e(TAG, "onStart");
        mView.showProgress();
        request(Long.MAX_VALUE);
    }

你成功找到了Flowable.just操作符,为什么要通过Flowable.create复制它来发出标量sogs列表?

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接