RxJava中flatmap和switchmap有什么区别?

174

RxJava文档中,switchmap的定义相当模糊,并将其链接到与flatmap相同的页面。这两个操作符之间有什么区别?


1
关于it链接到与flatmap相同的页面。 这是真的。 但是向下滚动到特定语言信息部分并打开有趣的操作符。 我认为这应该自动从TOC完成,但是...您还可以在[javadoc](http://reactivex.io/RxJava/javadoc/rx/Observable.html#switchMap(rx.functions.Func1))中看到相同的图片。 - Ruslan Stelmachenko
8个回答

200
根据文档(http://reactivex.io/documentation/operators/flatmap.html),switchMap类似于flatMap,但它只会从新的可观察对象中发出项目,直到从源可观察对象发出新事件。以下是该操作符的示意图:
switchMap的示意图中,第二个原始发射(绿色大理石)没有发出第二个映射发射(绿色方块),因为第三个原始发射(蓝色大理石)已经开始并发出了其第一个映射发射(蓝色钻石)。换句话说,只有第一个映射的绿色发射发生;没有发出绿色方块,因为蓝色钻石超过了它。
而对于flatMap,所有映射的结果都会被发出,即使它们是“过时”的。换句话说,第一个和第二个映射的绿色发射都会发生,如果它们使用一致的映射函数,则会发出绿色方块;由于它们没有使用一致的映射函数,因此你看到第二个绿色钻石,即使它是在第一个蓝色钻石之后发出的。
其中switchMap操作符示意图如下: 在switchMap中,如果原始可观察对象发出新内容,则先前的映射结果将不再产生;这是一个避免过时结果的有效方法flatMap操作符示意图如下:

在 switchMap 中,如果原始可观测对象发出新的值,则先前的发射将不再产生映射的可观测对象;这是一种有效的避免过时结果的方法


4
谢谢,这张图很有帮助。你知道一个使用switchMap的实际例子吗? - Julian Go
2
@JulianGo 这里有一个例子:https://github.com/samuelgruetter/rx-playground/blob/master/RxScalaPaint/src/main/scala/Paint.scala#L36 它使用了 .map(func).switch,但这与 .switchMap(func) 是相同的。 - Samuel Gruetter
2
如果有人仍然需要一个 switchMap 的真实世界示例,他可以跟随此链接 [link](https://angular.io/docs/ts/latest/guide/router.html#!#routing-module),并了解何时使用 swicthMap 而不是 flatMap。 - hermannovich
2
关于使用RxJs5中的SwitchMap,可以参考Ben Lesh在这里25-26分钟的演示 - https://www.youtube.com/watch?v=3LKMwkuK0ZE。对我来说,FlatMap已经理解了... - arcseldon
14
大理石图表展示得好吗?什么?我猜如果你已经了解switchmap,可能会明白。 - Post Impatica
显示剩余10条评论

183

我在实现“即时搜索”时遇到了这个问题 - 即当用户在文本框中输入时,结果会随着每次按键而几乎即时地出现。解决方法似乎是:

  1. 拥有一个主题,例如PublishSubject of String
  2. 在文本框更改回调中,调用.onNext(text)
  3. 应用.debounce过滤器以限制服务器查询速率
  4. 应用.switchMap执行服务器查询-使用搜索术语并返回SearchResponse的Observable
  5. 使用消耗SearchResponse并更新UI的方法应用.subscribe。

使用flatMap时,搜索结果可能会过期,因为搜索响应可能无序返回。为了解决这个问题,应该使用switchMap,因为它确保一旦提供新的可观察对象,旧的可观察对象就会取消订阅。

因此,总之,当所有结果都很重要且不考虑时间时应使用flatMap,并且只有最后一个Observale的结果很重要时才应使用switchMap。


你可以在 GitHub 上查看此示例 - Cabezas
最佳示例。谢谢。 - RamPrakash

103

没有 flatMap 的讨论是不完整的,需要与 switchMap、concatMap 和 concatMapEager 进行比较和对比。

所有这些方法都接受一个 Func1,将流转换为 Observable,并发出它们;区别在于返回的 Observable 何时被订阅和取消订阅,以及这些 Observable 的发射是否由相关的 ____Map 操作符发出。

  • flatMap 订阅尽可能多的发射的 Observable(这是一个平台相关的数字,例如在 Android 上较低),当顺序不重要且您希望尽快发出时使用。
  • concatMap 订阅第一个 Observable,并仅在前一个完成后才订阅下一个 Observable。当顺序很重要且您想节省资源时使用。一个完美的例子是通过首先检查缓存来延迟网络调用。这通常可以跟随一个 .first().takeFirst() 以避免做不必要的工作。

    http://blog.danlew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/

  • concatMapEager 的工作方式与之相似,但会订阅尽可能多的 Observable(取决于平台),但只有在前一个 Observable 完成后才会发出。非常适合需要大量并行处理的情况,但是(与 flatMap 不同)您希望保持原始顺序。

  • switchMap 将订阅它遇到的最后一个 Observable,并取消订阅所有先前的 Observable。这非常适合类似搜索建议的情况:一旦用户更改了搜索查询,旧请求就不再有任何兴趣,因此它被取消订阅,并且良好的 API 端点将取消网络请求。
如果你返回的Observable不在另一个线程上进行subscribeOn,那么上述所有方法的行为可能都会非常相似。有趣且有用的行为出现在允许嵌套的Observable在它们自己的线程上操作时。然后,您可以从并行处理中获得很多好处,并智能地取消订阅或不订阅对您的订阅者不感兴趣的Observable。
amb也可能是感兴趣的。给定任意数量的Observable,它发出第一个发出任何内容的Observable发出的相同项。当您有多个可能/应该返回相同内容的源并且想要性能时,这可能非常有用。例如排序,您可以将快速排序与归并排序amb起来,并使用哪个更快。

3
如果你返回的Observable没有在另一个线程上进行subscribeOn操作,那么所有上述方法的行为可能大致相同。之前我遇到的所有“switchMap vs flatMap”的解释都忽略了这个重要方面,现在一切都更清晰了。谢谢。 - Andy Res

61

switchMap 在 RxJS 4 中曾经被称为 flatMapLatest

它基本上只是从 最新 的 Observable 中传递事件并取消订阅先前的 Observable。


@EpicPandaForce 尽管它与 combineLatest 不一致,但只要源可观察对象发出信号(不仅仅是发出一次),它就会发出最新的值。 - Michael Fry
2
部分原因它被称为switchMap是因为你可以通过使用o.map(...).switch()自己实现这个操作符。虽然我想象中它可能会被称为mapSwitch,但这似乎不太容易发音。 - Niall Connaughton

14

Map(映射)、FlatMap(扁平映射)、ConcatMap(连接映射)SwitchMap(切换映射)都会对Observable发出的数据应用一个函数或修改数据。

  • Map 修改源Observable发出的每个项目,并发出修改后的项目。

  • FlatMap、SwitchMapConcatMap也会在每个发出的项目上应用一个函数,但不同于返回已修改的项目,它们返回可再次发出数据的Observable本身。

  • FlatMapConcatMap 的工作基本相同。它们合并多个Observables发出的项目并返回单个Observable。

  • FlatMapConcatMap 的区别在于发出项目的顺序。
  • FlatMap 可以交错发出项目,即发出项目的顺序是不确定的。
  • ConcatMap 保留项目的顺序。但ConcatMap的主要缺点是必须等待每个Observable完成其工作,因此不能维护异步性。
  • SwitchMapFlatMapConcatMap有所不同。SwitchMap会在新项开始发出时取消订阅先前的源Observable,因此始终发出来自当前Observable的项目。

2

这里有一个长达101行的例子,它为我解释了这个事情。

就像所说的那样:它获取最后一个observable(如果你愿意的话,就是最慢的observable),并忽略其余的observable。

因此:

Time | scheduler | state
----------------------------
0    | main      | Starting
84   | main      | Created
103  | main      | Subscribed
118  | Sched-C-0 | Going to emmit: A
119  | Sched-C-1 | Going to emmit: B
119  | Sched-C-0 | Sleep for 1 seconds for A
119  | Sched-C-1 | Sleep for 2 seconds for B
1123 | Sched-C-0 | Emitted (A) in 1000 milliseconds
2122 | Sched-C-1 | Emitted (B) in 2000 milliseconds
2128 | Sched-C-1 | Got B processed
2128 | Sched-C-1 | Completed

你会发现 A 被忽略了。


0

代码示例

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class SwitchMapTest {
    Logger logger = LogManager.getLogger();

    @Test
    public void main() throws InterruptedException {
        log("main thread");
        CountDownLatch latch = new CountDownLatch(1);
        var disposable = Observable
                .create(emitter -> {
                    IntStream.range(0, 4)
                            .peek(i -> {
                                log("sleep emit");
                                sleep(TimeUnit.SECONDS, 1);
                            })
                            .forEach(emitter::onNext);
                    emitter.onComplete();
                })
                .subscribeOn(Schedulers.io())
                .switchMap(o ->
                        Observable.create(emitter -> {
                                    IntStream.range(0, 2).forEach(value -> {
                                        log("sleep switch");
                                        sleep(TimeUnit.MILLISECONDS, 900);
                                        emitter.onNext("original " + o + " | switchMap " + value);
                                    });
                                    emitter.onComplete();
                                })
                                .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor(r -> {
                                    Thread thread = new Thread(r);
                                    thread.setDaemon(true);
                                    return thread;
                                })))
                )
                .observeOn(Schedulers.newThread())
                .subscribe(this::log, throwable -> logger.throwing(throwable), () -> {
                    log("complete");
                    latch.countDown();
                });
        boolean await = latch.await(10, TimeUnit.SECONDS);
        assertTrue(await);
        disposable.dispose();
    }

    private void sleep(@NotNull TimeUnit timeUnit, int timeout) {
        try {
            timeUnit.sleep(timeout);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    void log(Object message) {
        logger.debug(message);
    }
}

log4j2.xml

<Configuration status="WARN">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%-8r %d{HH:mm:ss.SSS} [%-32t] %-5level %logger{36} - %msg%n"/>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="ALL">
            <AppenderRef ref="Console"/>
        </Root>
    </Loggers>
</Configuration>

控制台

720      21:44:46.566 [Test worker                     ] DEBUG SwitchMapTest - main thread
787      21:44:46.633 [RxCachedThreadScheduler-1       ] DEBUG SwitchMapTest - sleep emit
1789     21:44:47.635 [RxCachedThreadScheduler-1       ] DEBUG SwitchMapTest - sleep emit
1790     21:44:47.636 [Thread-3                        ] DEBUG SwitchMapTest - sleep switch
2695     21:44:48.541 [Thread-3                        ] DEBUG SwitchMapTest - sleep switch
2695     21:44:48.541 [RxNewThreadScheduler-1          ] DEBUG SwitchMapTest - original 0 | switchMap 0
2792     21:44:48.638 [RxCachedThreadScheduler-1       ] DEBUG SwitchMapTest - sleep emit
2792     21:44:48.638 [Thread-4                        ] DEBUG SwitchMapTest - sleep switch
3693     21:44:49.539 [Thread-4                        ] DEBUG SwitchMapTest - sleep switch
3693     21:44:49.539 [RxNewThreadScheduler-1          ] DEBUG SwitchMapTest - original 1 | switchMap 0
3796     21:44:49.642 [RxCachedThreadScheduler-1       ] DEBUG SwitchMapTest - sleep emit
3797     21:44:49.643 [Thread-5                        ] DEBUG SwitchMapTest - sleep switch
4699     21:44:50.545 [Thread-5                        ] DEBUG SwitchMapTest - sleep switch
4699     21:44:50.545 [RxNewThreadScheduler-1          ] DEBUG SwitchMapTest - original 2 | switchMap 0
4802     21:44:50.648 [Thread-6                        ] DEBUG SwitchMapTest - sleep switch
5706     21:44:51.552 [Thread-6                        ] DEBUG SwitchMapTest - sleep switch
5706     21:44:51.552 [RxNewThreadScheduler-1          ] DEBUG SwitchMapTest - original 3 | switchMap 0
6612     21:44:52.458 [RxNewThreadScheduler-1          ] DEBUG SwitchMapTest - original 3 | switchMap 1
6612     21:44:52.458 [RxNewThreadScheduler-1          ] DEBUG SwitchMapTest - complete

0

如果你正在寻找示例代码

/**
 * We switch from original item to a new observable just using switchMap.
 * It´s a way to replace the Observable instead just the item as map does
 * Emitted:Person{name='Pablo', age=0, sex='no_sex'}
 */
@Test
public void testSwitchMap() {
    Observable.just(new Person("Pablo", 34, "male"))
              .switchMap(person -> Observable.just(new Person("Pablo", 0, "no_sex")))
              .subscribe(System.out::println);

}

您可以在这里查看更多示例 https://github.com/politrons/reactive


5
但是你错过了 switchMap 的一个关键特点,它与 flatMap 不同之处在于,只有最近的 Observable 会被考虑,同时会取消订阅之前的 Observable。 - Artem Novikov
4
在这个例子中,当你把 switchMap 替换成 flatMap 时,它会完全相同地工作。 - Piotr Wittchen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接