< p >
简短总结:mergeMap
比
map
更强大,理解
mergeMap
是访问Rx完整功能所必需的条件。
相似点
both mergeMap
and map
acts on a single stream (vs. zip
, combineLatest
)
both mergeMap
and map
can transform elements of a stream (vs. filter
, delay
)
区别
map
mergeMap
可以改变源流的大小;对于每个元素,可能会创建/发射任意数量(0、1或多个)的新元素。
它提供了完全的异步控制——无论是何时创建/发射新元素还是应该同时处理源流中多少个元素;例如,假设源流发射了10个元素,但maxConcurrency
设置为2,那么前两个元素将立即被处理,并缓冲剩余的8个;一旦已处理的一个complete
d,源流的下一个元素将被处理,以此类推——这有点棘手,但请看下面的示例。
所有其他操作符都可以使用mergeMap
和Observable
构造函数来实现。
可用于递归异步操作
返回值必须是Observable类型(或Rx必须知道如何从中创建Observable - 例如promise、array)
id
: x => Rx.Observable.of(x)
数组类比
let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]
这个比喻并不完全准确,基本上它对应于将 .mergeMap
的 maxConcurrency
设置为1。在这种情况下,元素将按照上面的顺序排序,但通常情况下可能不是这样的。我们唯一保证的是新元素的发射将按其在底层流中的位置排序。例如:[3,1,2,4,9,1]
和 [2,3,1,1,9,4]
是有效的,但 [1,1,4,2,3,9]
不是(因为4
在底层流中在2
之后被发出)。
使用 mergeMap
的一些示例:
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}
Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log('mapWithMergeMap', x))
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty());
}
Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log('filterWithMergeMap', x))
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}
Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log('delayWithMergeMap', x))
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}
count(1, 3, 1000).subscribe(x => console.log('count', x))
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))
const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log('emitted', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log('processed', x))
.subscribe()
setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>
mergeMap
,会对性能产生影响吗?还是会有其他问题?我没有尝试过,只是有些疑惑。 - k11k2