map vs flatMap in reactor

60

我已经找到了很多关于RxJava的答案,但我想了解它在Reactor中是如何工作的。

我的当前理解非常模糊,我倾向于认为map是同步的,而flatMap是异步的,但我真的无法理解它。

这里有一个例子:

files.flatMap { it ->
    Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
        .map {destFile ->
            destFile.createNewFile()
            destFile    
        }               
        .flatMap(it::transferTo)
}.then()  

我有一些文件(一个Flux<FilePart>),我想将它复制到服务器上的某个UPLOAD_ROOT

这个例子来自一本书。

我可以将所有的.map更改为.flatMap,反之亦然,一切仍然正常。我想知道它们之间的区别是什么。


我在谈论 Reactor 项目。https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html - shredding
我从文档中了解到,两者都是迭代给定Flux的方式,map是同步的,而flatMap则不是。但我也明白,我在map中提供的函数是异步执行的,我不知道该使用哪一个。 - shredding
1
我会提供一个具体的例子进行更新。 - shredding
你提到找到了很多关于RxJava的答案。请注意,这与RxJava中的概念完全相同。 - Simon Baslé
加一个标签 kotlin 吧?因为你的示例是用这种语言而不是 Java。 - Inego
显示剩余3条评论
3个回答

83
  • map 用于同步、非阻塞的一对一转换
  • flatMap 用于异步(非阻塞)的一对多转换

它们在方法签名上的区别见仁见智:

  • map 接受一个 Function<T, U> 并返回一个 Flux<U>
  • flatMap 接受一个 Function<T, Publisher<V>> 并返回一个 Flux<V>

这是最重要的提示:你可以将 Function<T, Publisher<V>> 传递给 map,但是它不知道如何处理 Publishers,结果会得到 Flux<Publisher<V>>,即一系列惰性的 publishers。

另一方面,flatMap 需要每个 TPublisher<V>。它知道该怎么做:订阅并将其元素传播到输出序列中,因此返回类型是 Flux<V>:flatMap 将每个内部的 Publisher<V> 扁平化,输出序列中包含所有的 V

关于 1-N 方面:

对于每个 <T> 输入元素,flatMap 将其映射到一个 Publisher<V>。在某些情况下(例如 HTTP 请求),该 publisher 只会发出一个项目,在这种情况下,我们就接近异步 map

但这是退化的情况。一般情况下,Publisher 可以发出多个元素,而 flatMap 同样适用。

例如,假设你有一个响应式数据库,并且从一系列用户 ID 中进行 flatMap,使用请求返回用户的 Badge 集合。最终得到一个单个的 Flux<Badge>,其中包含了所有这些用户的徽章。

map 真的是同步和非阻塞的吗?

是的:从运算符应用的方式上看,它是同步的(一个简单的方法调用,然后运算符发出结果),但在函数本身不应该阻塞调用它的运算符的意义上,它是非阻塞的。换句话说,它不应该引入延迟。这是因为作为一个整体,Flux 仍然是异步的。如果它在中途阻塞,将会影响到其他 Flux 的处理,甚至是其他的 Flux

如果您的 map 函数具有阻塞性或引入延迟,但无法转换为返回一个 Publisher,则考虑使用 publishOn/subscribeOn 在单独的线程上进行偏移以抵消该阻塞工作。


你的意思是 map 是“阻塞”的,对吧?我也不太明白 1-to-N。你能举个例子说明一下什么情况下使用其中一个比另一个更好吗?我理解当我期望结果是异步的时候,我会使用 flatMap,因为它会在结果到达后将发布者扁平化 - 这样说对吗? - shredding
2
不,map函数应该是非阻塞的(除非你也使用publishOn/subscribeOn在单独的线程上偏移工作)。也就是说,它是同步执行的,但不应该有延迟。flatMap函数是异步的,确实会在结果可用时展开操作符。 - Simon Baslé
1
编辑了答案,以解释这两个方面和 flatMap 1-N 示例。 - Simon Baslé
1
这个答案是否已经过时了?我认为你所说的flatMap现在应该是“flatMapMany”,而flatMap则有不同的作用--https://github.com/reactor/reactor-core/issues/516 - Hazel T
1
现在Mono有了额外的细微差别,它既有flatMap(异步1对1转换),又有flatMapMany(异步1对n)。 - Simon Baslé

11

flatMap方法与map方法类似,但其提供的supplier应返回Mono<T>Flux<T>

使用map方法将导致Mono<Mono<T>>,而使用flatMap则会得到Mono<T>

例如,在需要调用返回Mono类型值的Java API从网络获取数据,然后进行另一个需要第一个调用结果的网络调用时,使用flatMap非常有用。

// Signature of the HttpClient.get method
Mono<JsonObject> get(String url);

// The two urls to call
String firstUserUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details/"; // needs the id at the end

// Example with map
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl).
  map(user -> HttpClient.get(userDetailsUrl + user.getId()));
// This results with a Mono<Mono<...>> because HttpClient.get(...)
// returns a Mono

// Same example with flatMap
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl).
  flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
// Now the result has the type we expected


此外,它允许精确处理错误:
public UserApi {
  
  private HttpClient httpClient;
    
  Mono<User> findUser(String username) {
    String queryUrl = "http://my-api-address/users/" + username;
    
    return Mono.fromCallable(() -> httpClient.get(queryUrl)).
      flatMap(response -> {
        if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found"));
        else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException());
        else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api"));
        return Mono.just(response.data);
      });
  }
                                           
}

7

Reactor 中 Map 的内部工作原理。

Map 内部工作原理

创建一个 Player 类。

@Data
@AllArgsConstructor
public class Player {
        String name;
        String name;
}

现在创建一些 Player 类的实例

Flux<Player> players = Flux.just(
        "Zahid Khan",
        "Arif Khan",
        "Obaid Sheikh")
        .map(fullname -> {
            String[] split = fullname.split("\\s");
            return new Player(split[0], split[1]);
        });

StepVerifier.create(players)
          .expectNext(new Player("Zahid", "Khan"))
          .expectNext(new Player("Arif", "Khan"))
          .expectNext(new Player("Obaid", "Sheikh"))
          .verifyComplete();

需要理解的关键是,map() 操作是同步执行的,在源 Flux 中每个项目发布时进行映射。如果您想要异步执行映射,应该考虑使用 flatMap() 操作。

flatMap 内部的工作原理。

flatMap 内部的工作原理。

Flux<Player> players = Flux.just(
      "Zahid Khan", 
      "Arif Khan", 
      "Obaid Sheikh")
      .flatMap(
            fullname -> 
                  Mono.just(fullname).map(p -> {
                        String[] split = p.split("\\s");
                        return new Player(split[0], split[1]);
        }).subscribeOn(Scheduler.parallel()));

        List<Player> playerList = Arrays.asList(
                  new Player("Zahid", "Khan"),
                  new Player("Arif", "Khan"), 
                  new Player("Obaid", "Sheikh"));

        StepVerifier.create(players).expectNextMatches(player ->         
                playerList.contains(player))    
                        .expectNextMatches(player ->  
                                playerList.contains(player))
                        .expectNextMatches(player -> 
                                playerList.contains(player))
                        .expectNextMatches(player -> 
                                playerList.contains(player))
                        .verifyComplete();


在Flatmap()内部,会执行一个map()操作,将String转换为Player。此外,subcribeOn()表示每个订阅应在并行线程中进行。如果没有subscribeOn(),则flatmap()作为同步操作。
map用于同步、非阻塞、一对一的转换,而flatMap用于异步(非阻塞)、一对多的转换。

根据图片,在您的flatmap示例中,可以有不同数量的输入和输出元素。能否提供一个例子?因为在您的示例中,输入元素和输出元素的数量相等(3个输入元素和3个输出元素)。 - gstackoverflow
好的回答,但是如果你引用了什么,请注明出处。你的引用来自《Spring实战(第五版)》。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接