RXJS可观察对象取消订阅内部可观察对象。

4
this.route.params被销毁并且this.sub被取消订阅后,"interval run"仍然在记录日志。这与我的直觉相反,即在取消订阅后,所有操作都应该停止。这里发生了什么,以及处理它的最佳方法是什么?我应该添加一个takeUntil,还是应该使用concatMap之外的其他东西?(我正在轮询新照片)
 constructor(private authService:AuthService,
            private route:ActivatedRoute,
            private router:Router,
            private photosApi:PhotosApi) {
}

ngOnInit() {
    this.sub = this.route.params
        .switchMap(()=>{
            console.log('switchmap Ruun')
            return Observable.interval(2000).startWith(0).concatMap(()=>{
                console.log('interval run')
                return this.photosApi.apiPhotosGet(this.authService.getAuth(), 0, 40)
            })
        })
        .subscribe((data:PagedResultPatientPhotoModel) => {
                this.processPhotos(data)
            }).add(()=>console.log('unsubscribe happened'))

编辑:我尝试重新编写以使其更清晰,使用concatMap仅进行一次调用,但我不确定为什么它在第一次调用后停止工作:

this.intervalSub = Observable.interval(3000).startWith(0).concatMap(()=>{
        console.log('getting photos interval', this.route.params)
        return this.route.params.switchMap(()=>{
            console.log('getting photos')
            return this.photosApi.apiPhotosGet(this.authService.getAuth(), 0, 40)
        })
    }).subscribe((data:PagedResultPatientPhotoModel) => {
                this.processPhotos(data)
            }).add(()=>console.log('unsubscribe happened'))

如果api调用可以在3000毫秒内运行完,那么以下使用switchMap的内容是有效的,否则它们会相互取消:

this.intervalSub = Observable.interval(3000).startWith(0).switchMap(()=>{
        console.log('getting photos interval', this.route.params)
        return this.route.params.switchMap(()=>{
            console.log('getting photos')
            return this.photosApi.apiPhotosGet(this.authService.getAuth(), 0, 40)
        })
    }).subscribe((data:PagedResultPatientPhotoModel) => {
                this.processPhotos(data)
            }).add(()=>console.log('unsubscribe happened'))

我很好奇为什么switchMap(和mergeMap)每次都可以工作,但concatMap只能工作一次,看起来像是RXJS中的一个错误。

我决定使用mergeMap,因为我觉得它不太可能出现请求返回顺序错误的情况,即使出现这种情况,几秒钟后也会自行纠正。


似乎switchMap只能取消相同底层流中的请求,不能向内传播。这是关于Angular2中switchMap无法取消先前HTTP调用的问题的讨论。 - Rusty Rob
1个回答

3

问题不在于 switchMap,它的行为符合预期。在下面的代码片段中,当取消订阅组合的可观察对象时,内部的 interval 可观察对象也会被取消订阅:

const source = new Rx.Subject();

const subscription = source
  .switchMap(() => {
    console.log('> switchmap');
    return Rx.Observable.interval(500);
  })
  .subscribe((value) => console.log(value));

source.next(1);
setTimeout(() => source.next(2), 2000);

setTimeout(() => {
  console.log('> unsubscribe');
  subscription.unsubscribe();
}, 5000);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

你的问题在于调用订阅的add方法。这会创建一个额外的订阅,而正是这个订阅被分配给了sub

最初的订阅 - 在其中添加了额外的订阅 - 没有被分配给任何变量,并且从未取消订阅。而正是这个订阅看到了interval继续运行。

以下代码段展示了问题:

const source = new Rx.Subject();

const subscription1 = source
  .switchMap(() => {
    console.log('> switchmap');
    return Rx.Observable.interval(500);
  })
  .subscribe((value) => console.log(value));

const subscription2 = subscription1.add(() => console.log('> unsubscribed'));

source.next(1);
setTimeout(() => source.next(2), 2000);

setTimeout(() => {
  console.log('> unsubscribe 2');
  subscription2.unsubscribe();
}, 5000);
setTimeout(() => {
  console.log('> unsubscribe 1');
  subscription1.unsubscribe();
}, 6000);
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

请注意,调用subscription1unsubscribe方法将会导致subscription2也被取消订阅,但是反过来不成立。

啊,对了,我忘记添加了 .add,并且没有意识到 .add 返回一个单独的订阅。很令人困惑,谢谢你澄清了这一点。 - Rusty Rob

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接