RxJS Observable配合Subject,通过定时器轮询和combineLatest不触发

4
我写了一个函数来轮询一个API,该API还能够进行分页。这里使用Subject Observable进行分页,使用timer方法进行轮询(我也尝试过使用相同结果的interval)。
下面是我的代码:
  getItems(pagination: Subject<Pagination>): Observable<ListResult<Item>> {
    let params: URLSearchParams = new URLSearchParams();

    return Observable
      .timer(0, 5000)
      .combineLatest(
        pagination,
        (timer, pagination) => pagination
      )
      .startWith({offset: 0, limit: 3})
      .switchMap(pagination => {
        params.set('skip', pagination.offset.toString());
        params.set('limit', pagination.limit.toString());
        return this.authHttp.get(`${environment.apiBase}/items`, {search: params})
      })
      .map(response => response.json() as ListResult<Item>)
      .catch(this.handleError);
  }

预期的行为应该是: 每5秒发送一个HTTP请求,当用户更改页面时也会发送请求。
实际情况是: 首先发送第一个HTTP请求,但在使用分页之前不会向服务器发送其他请求。 在第一次使用分页后,轮询开始工作。
这是我第一次使用Observables,所以我很确定我错过了什么,但我看不出可能是什么。
我还尝试了这种方法(也许在startWith中缺少计时器计数器),但它没有改变任何东西。
[...]
  .combineLatest(
    pagination
  )
  .startWith([0, {offset: 0, limit: 3}])
[...]

combineLatest 只有在两个 observables 都至少发出一个值后才会起作用,可以使用 RxJS 的 marble diagram 进行实验。我认为你需要先将 startsWith 应用于分页,然后再与计时器组合。 - Adam
1
这就是马丁在他的解决方案中建议的(你们两个都是正确的)。 我知道combineLatest确实会等待所有的Observables,但我不知道startWith会自动启动两个Observables。我错了 ;) - Gesh
2个回答

2
combineLatest() 运算符要求所有源 Observable 至少发出一项。
因为您使用了 .startWith(),所以您的演示只会发出一个请求。由于 pagination 是一个 Subject 并且可能永远不会发出任何项,combineLatest() 永远不会发出任何东西。
因此,一种选择是移动 .startWith()
.combineLatest(
  pagination.startWith({offset: 0, limit: 3}),
  (timer, pagination) => pagination
)

但是也许这并没有什么帮助,因为你忽略了从 timer() 返回的所有项目,只使用了 pagination。 所以也许你可以只使用 merge() 来刷新其中一个来源的列表。然后 timer() 独立地增加偏移量。

Observable
  .timer(0, 1000)
  .map(i => { return {offset: i*3, limit: 3}})
  .merge(pagination.startWith({offset: 0, limit: 3}))
  .switchMap(pagination => {
    return Observable.of(pagination).delay(200)
  })
  .subscribe(val => console.log(val));

你的第一个答案可行! 我刚把startWith移到了分页Subject Observable中,它起作用了。当我在我的解决方案中使用startWith时,计时器实际上起作用了,但是分页Observable从未被触发,而combineLatest一直等待。聪明而简单的解决方案。 谢谢。 - Gesh

-1

请确保您按照以下步骤操作

创建可观察数据

您有一个Observable变量,并通过observable.next(value);推送数据

将数据放入Subject或Behavior Subject中

在您想要获取数据的位置,声明一个Subject类型的变量,并通过订阅来访问它。

private val: Subject = YourObservable;
private uiValue: string;

val.subscribe((val) => {
   uiVal = val;
});

现在你可以在任何地方使用uiVal,它将在每个间隔中更新。
你可以通过仅使用console.log来测试这两个片段,以确保一切正常运行。

正如我在描述中所写的那样,我使用了两者并得到了相同的结果。 - Gesh
我刚才试过了。可悲的是没有改变任何东西。 - Gesh
@Gesh 编辑了答案,请确保在 observable.next 和 subscribe 侧面都正确执行。 - Aniruddha Das

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接