Rxjs - 重新订阅已取消订阅的Observable

6

我正在使用一个服务每2.5秒“ping”我的服务器,返回从服务器的响应时间。因此,我正在使用可观察对象。

我还在使用Angular 2和TypeScript。

现在,我想在按钮单击时停止服务(取消订阅)。这个功能可以正常工作!按钮应该是一个切换按钮,所以如果没有订阅,就订阅,反之亦然。但是重新订阅不起作用!

这是我的服务:

export class PingService {
  pingStream: Subject<number> = new Subject<number>();
  ping: number = 0;
  url: string = url.href;

  constructor(private _http: Http) {
    Observable.interval(2500)
      .subscribe((data) => {
        let timeStart: number = performance.now();

        this._http.get(this.url)
          .subscribe((data) => {
            let timeEnd: number = performance.now();

            let ping: number = timeEnd - timeStart;
            this.ping = ping;
            this.pingStream.next(ping);
          });
      });
  }
}

这是我的点击函数:

toggleSubscription() {   
      if (this.pingService.pingStream.isUnsubscribed) {
         this.pingService.pingStream.subscribe(ping => {
         this.ping = ping;
         NTWDATA.datasets[0].data.pop();
         NTWDATA.datasets[0].data.splice(0, 0, this.ping);
      })
      }
      else {
         this.pingService.pingStream.unsubscribe();
      }
   }

我将在我的应用程序组件的构造函数中订阅PingService。数据会显示在图表中。当我第一次点击按钮时,服务停止运行,不再更新数据。当我第二次点击时,什么也不会发生,尽管"this.pingService.pingStream.isUnsubscribed"返回true。

我的构造函数:

    constructor(private location: Location,
       private pingService: PingService) {

          this.pingService.pingStream.subscribe(ping => {
             this.ping = ping;
             NTWDATA.datasets[0].data.pop();
             NTWDATA.datasets[0].data.splice(0, 0, this.ping);
          })
   }

当我第一次点击按钮时,我也遇到了“ObjectUnsubscribedError”错误。

任何帮助都将不胜感激!谢谢!

1个回答

8

如果您正在使用RxJS,则不必订阅/取消订阅。只需考虑使用Rx流的另一种方法。思路是有两个流maintoggle流,所以只有当您的toggle流打开时,它们才会触发事件。

var mainStream = Rx.Observable.interval(100).map(() => '.');

var display = document.getElementById('display');
var toggle = document.getElementById('toggle');

var toggleStream = Rx.Observable
  .fromEvent(toggle, 'change')
  .map(e => e.target.checked);

var resultStream = toggleStream
  .filter(x => x === true)
  .startWith(true)
  .flatMap(() => mainStream.takeUntil(toggleStream));

resultStream.subscribe(x => display.innerText += x);
<!DOCTYPE html>
<html>

  <head>
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.1.0/rx.all.min.js"></script>
  </head>

  <body>
    <input type="checkbox" id="toggle" checked> Check/uncheck to start/stop
    <div id="display"></div>
    
    <script src="script.js"></script>
  </body>

</html>


谢谢,这是个好主意!我还没有深入了解rxjs,不然我很快就会注意到这个问题了。 :) 现在我想知道是否可以将不同的值设置到Observable.interval中。我希望用户能够按照他们的意愿设置间隔时间。但由于observable在我的app.component初始化时就开始发出数据(我是对的吗?),我不太确定这是否可能。 - Faigjaz
2
@Faigjaz 请看一下这个问题 https://dev59.com/pJHea4cB1Zd3GeqPsa2l - Max Brodin
嗨@Max,我有一个关于可观察对象的类似问题,这是链接https://stackoverflow.com/questions/62615131/how-to-re-subscribe-after-unsubscribing-an-observable,你能帮我解决吗? - Suraj Gupta

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接