RxJS - 从可观察对象中获取最后 n 个元素

15

我想从可观察对象中获取最后三个元素。假设我的时间轴如下所示:

--a---b-c---d---e---f-g-h-i------j->

其中:a、b、c、d、e、f、g、h、i、j是发出的值。

每当有一个新值被发出时,我希望立即获得它,因此它可能会像这样:

[a]
[a, b]
[a, b, c]
[b, c, d]
[c, d, e]
[d, e, f]
[e, f, g]
[f, g, h]
... and so on

我认为这非常有用。想象一下构建一个聊天窗口,你想显示最后10条消息。每当有新消息时,你都希望更新你的视图。

我的尝试:演示

2个回答

30
您可以使用scan来实现这个功能:
from(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u'])
  .pipe(
    scan((acc, val) => {
      acc.push(val);
      return acc.slice(-3);
    }, []),
  )
  .subscribe(console.log);

这将会输出:
[ 'a' ]
[ 'a', 'b' ]
[ 'a', 'b', 'c' ]
[ 'b', 'c', 'd' ]
[ 'c', 'd', 'e' ]
...
[ 's', 't', 'u' ]
bufferCount不会产生你想要的效果。它仅在每个缓冲区恰好为=== 3时才发出通知,这意味着您至少要发布3条消息才能收到任何通知。
2019年1月:已更新为RxJS 6。

正确答案请点赞。我有一个小写字母的问题。 - Suren Srapyan
如何将此转换为可重用的运算符? - Simon_Weaver
3
累积的数组不会一直增长吗?看起来像是内存泄漏。 - adam0101
1
这是一个很好的答案,但更简洁和功能性的版本可以使用scan((acc, val) => [...acc, val].slice(-3), []) - jjjjs
1
@adam0101 不会的。累加器数组永远不会超过4个元素。从扫描函数返回的结果是下一次运行时作为 acc 参数传递的内容。所以对于一个已经有3个元素的数组,它将 .push(val) 使其大小变成4,然后再通过 .slice(-3) 缩减到3个元素。这不是泄漏问题。 - Teodor Sandu
显示剩余2条评论

5
您可以查看Observable#bufferCount函数。一个区别是它想要至少三次发射(在此示例中为第一个参数)。

const source = Rx.Observable.interval(1000);
const example = source.bufferCount(3,1)
const subscribe = example.subscribe(val => console.log(val));
<script src="https://unpkg.com/@reactivex/rxjs@5.4.3/dist/global/Rx.js"></script>


谢谢你的回答!就像 @martin 所提到的,bufferCount 仅在每个缓冲区恰好 === 3 时才发出。 - feerlay
1
谢谢。我也提到了它。 - Suren Srapyan
你可以执行 source.next(null); source.next(null); source.next(null); - Simon_Weaver

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接