RxJava - flatmap和concatMap有什么区别,为什么订阅时顺序相同?

3
根据这个帖子,conCatMap和flatMap的区别仅在于发出项目的顺序不同。 因此,我进行了测试并创建了一个简单的整数流,想要查看它们发出的顺序。 我创建了一个小的observable,它将接受1-5范围内的数字并将它们乘以二。 很容易。
以下是使用flatMap的代码:
myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from flatMap:"+integer);
        }
    });

使用concatMap的完全相同的代码:

myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from concatmap:"+integer);
        }
    });

当我在日志中看到打印输出时,它们的顺序是相同的,为什么?我以为只有concatMap会保留顺序?

1个回答

10

你所看到的是一种巧合。每次 flatMap 返回一个值时,它都在与上一个相同的线程上执行。

我修改了你的示例以利用多线程:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .flatMap(integer -> Observable.just(integer)
                .observeOn(Schedulers.computation())
                .flatMap(i -> {
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                        return Observable.just(2 * i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        return Observable.error(e);
                    }
                }))
        .subscribe(System.out::println,
                Throwable::printStackTrace,
                () -> System.out.println("onCompleted"));

我正在通过随机延迟每个2 * i的值,以强制不同的顺序。同时,在此之前,我添加了observeOn(Schedulers.computation()),以使下一个操作符(flatMap)在计算线程池上运行 -- 这就是多线程的魔法。

这是我在Android上的示例输出:

I/System.out: 6
I/System.out: 4
I/System.out: 12
I/System.out: 14
I/System.out: 8
I/System.out: 2
I/System.out: 16
I/System.out: 20
I/System.out: 10
I/System.out: 18
I/System.out: onCompleted

如果我在just后使用concatMap代替flatMap,那么我将得到一个正确排序的输出。

Thomas Nield有一篇很棒的文章对此进行了适当的解释。


请问您能修复一下链接吗?它应该是https://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html吗? - Devs love ZenUML

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接