mergeMap操作符的使用场景是什么?

54

我完全不明白 mergeMap 的目的。我听到了两种解释:

  1. 它就像 .NET LINQ 中的 SelectAll() - 不对。
  2. 它是 RxJS 中 mergemap 的结合 - 不对(或者我无法复制这个)。

考虑以下代码:

link text

    var obs1 = new Rx.Observable.interval(1000);
    var obs2 = new Rx.Observable.interval(1000);
    
    //Just a merge and a map, works fine
    obs1.merge(obs2).map(x=> x+'a').subscribe(
      next => console.log(next)
    )
    
    //Who know what - seems to do the same thing as a plain map on 1 observable
    obs1.mergeMap(val => Rx.Observable.of(val + `B`))
        .subscribe(
          next => console.log(next)
        )

最后一个标记为“谁知道呢”的部分对obs1上的地图除了展示以外没有任何作用 - 这有什么意义呢?

mergeMap到底是做什么的?它有哪些有效的使用案例?(最好附带一些代码实例)

这些文章并没有帮助我解决问题(上面提到的mergeMap代码来自于以下链接之一):12

2个回答

122
< p >简短总结:mergeMapmap更强大,理解mergeMap是访问Rx完整功能所必需的条件。


相似点

  • both mergeMap and map acts on a single stream (vs. zip, combineLatest)

  • both mergeMap and map can transform elements of a stream (vs. filter, delay)

区别

map

  • 不能改变源流的大小(假设:map本身不会throw);对于源中的每个元素,只能发射一个mapped元素;map不能忽略元素(例如filter);

  • 在默认调度程序的情况下,转换同步发生;要保持100%的清晰度:源流可以异步地传递其元素,但是每个下一个元素都会立即被映射并重新发射;map不能像delay一样在时间上移动元素。

  • 返回值没有任何限制

  • id: x => x

mergeMap

  • 可以改变源流的大小;对于每个元素,可能会创建/发射任意数量(0、1或多个)的新元素。

  • 它提供了完全的异步控制——无论是何时创建/发射新元素还是应该同时处理源流中多少个元素;例如,假设源流发射了10个元素,但maxConcurrency设置为2,那么前两个元素将立即被处理,并缓冲剩余的8个;一旦已处理的一个completed,源流的下一个元素将被处理,以此类推——这有点棘手,但请看下面的示例。

  • 所有其他操作符都可以使用mergeMapObservable构造函数来实现。

  • 可用于递归异步操作

  • 返回值必须是Observable类型(或Rx必须知道如何从中创建Observable - 例如promise、array)

  • id: x => Rx.Observable.of(x)

数组类比

let array = [1,2,3]
fn             map                    mergeMap
x => x*x       [1,4,9]                error /*expects array as return value*/
x => [x,x*x]   [[1,1],[2,4],[3,9]]    [1,1,2,4,3,9]

这个比喻并不完全准确,基本上它对应于将 .mergeMapmaxConcurrency 设置为1。在这种情况下,元素将按照上面的顺序排序,但通常情况下可能不是这样的。我们唯一保证的是新元素的发射将按其在底层流中的位置排序。例如:[3,1,2,4,9,1][2,3,1,1,9,4] 是有效的,但 [1,1,4,2,3,9] 不是(因为4在底层流中在2之后被发出)。

使用 mergeMap 的一些示例:

// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
  return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}

Rx.Observable.range(1, 3)
  .mapWithMergeMap(x => x * x)
  .subscribe(x => console.log('mapWithMergeMap', x))

// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
  return this.mergeMap(x =>
    filterFn(x) ?
    Rx.Observable.of(x) :
    Rx.Observable.empty()); // return no element
}

Rx.Observable.range(1, 3)
  .filterWithMergeMap(x => x === 3)
  .subscribe(x => console.log('filterWithMergeMap', x))

// implement .delay with .mergeMap 
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
  return this.mergeMap(x =>
    Rx.Observable.create(obs => {
      // setTimeout is naive - one should use scheduler instead
      const token = setTimeout(() => {
        obs.next(x);
        obs.complete();
      }, delayMs)
      return () => clearTimeout(token);
    }))
}

Rx.Observable.range(1, 3)
  .delayWithMergeMap(500)
  .take(2)
  .subscribe(x => console.log('delayWithMergeMap', x))

// recursive count
const count = (from, to, interval) => {
  if (from > to) return Rx.Observable.empty();
  return Rx.Observable.timer(interval)
    .mergeMap(() =>
      count(from + 1, to, interval)
      .startWith(from))
}

count(1, 3, 1000).subscribe(x => console.log('count', x))

// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
  Rx.Observable.if(
    () => from > to,
    Rx.Observable.empty(),
    Rx.Observable.timer(interval)
    .mergeMap(() => countMoreRxWay(from + 1, to, interval)
      .startWith(from)))

const maxConcurrencyExample = () =>
  Rx.Observable.range(1,7)
    .do(x => console.log('emitted', x))
    .mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
    .do(x => console.log('processed', x))
    .subscribe()

setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>


如果我对每个基本的CRUD操作都使用mergeMap,会对性能产生影响吗?还是会有其他问题?我没有尝试过,只是有些疑惑。 - k11k2
@k11k2 抱歉回复晚了 - 大约3年前我也有类似的疑虑,从那时起,我一直将整个应用程序构建为单个可观察对象,并且Rx性能从未成为问题。 - artur grzesiak
14
这就是它的工作原理。RxJs文档中完全无法阅读的解释应该用这个替代。http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#instance-method-mergeMap什么情况啊? - Michahell

24

.mergeMap() 可以将高阶 Observable 扁平化成单一的流。例如:

Rx.Observable.from([1,2,3,4])
  .map(i => getFreshApiData())
  .subscribe(val => console.log('regular map result: ' + val));

//vs

Rx.Observable.from([1,2,3,4])
  .mergeMap(i => getFreshApiData())
  .subscribe(val => console.log('mergeMap result: ' + val));

function getFreshApiData() {
  return Rx.Observable.of('retrieved new data')
    .delay(1000);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

查看我在另一个问题中的答案,详细解释了.xxxMap()运算符:Rxjs - 如何从数组中提取多个值并将它们同步地反馈到可观测流中


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接