RxJS如何完成Observable?

6

为了学习rxjs,我正在尝试使用它。

我的代码:

// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';

const source = interval(1000);

const example = source.pipe(
  map(value => value +1),
  map(value => {
    if(value === 40) {
      finish();
    }
    else if (value % 5 === 0){
      return 'can devide by 5 we did some magic';
  }else{
     return value;
  } })
);
const subscribe = example.subscribe(
  val => console.log(val), 
  error => console.log("Error handled: " , error), 
  () => console.log('resolved'));

我的想法是运行它40次,然后完成这个observable(也可以是另一个要求,例如查看值是否在10:00时为10(主要目标是使用值进行评估并强制完成))。 我正在寻找替代placeholder finish()的方法,因为finish不存在。如何到达subscribe方法的resolve函数() => console.log('resolved')

我找到了How can I complete Observable in RxJS ,但答案是2015年的,我认为现在有当前rxjs版本的答案。

3个回答

8

实际上还是一样的,你只需要使用管道操作符。你可以在这里查看示例。

import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const source = interval(1000);
const timer$ = timer(5000);
const example = source.pipe(takeUntil(timer$));
const subscribe = example.subscribe(val => console.log(val));

谢谢你的回答,假设我想要做一些基于时间的事情,例如在10:00时如果我们有一个值为10的特定if情况,那么我们可以做类似source.pipe(takeUntil(value === 10 && time === 10:00)这样的操作吗?问题的主要目标是看看是否可以在出现特定情况时“强制”结束。 - Sven van den Boogaart
是的,您可以通过在observable中使用takeUntil和takeUntil take作为参数来实现。 - Tony Ngo
你可以尝试使用@malbarmawi的回答。在这种情况下手动取消订阅也可以。 - Tony Ngo
我喜欢 takeUntil 运算符。 - Muhammed Albarmavi

3

两个答案都提到了 takeUntil 和 take,都是正确的方法,但另外一种方法是使用订阅对象进行退订,这只是另一个选项。

const subx= example.subscribe(val =>  { 
   console.log(val); 
   if (val == 40) {
    subx.unsubscribe() 
   }
 });

演示

更新信息

如果你有很多订阅者并且想要添加条件,那么使用complete操作符可以完成任务。

const source = interval(1000).pipe(take(5)); // 

source.pipe(map(res => res * 10)).subscribe(val => {
  console.log("", val);
});

source.subscribe(val => {
  console.log(val);
});

演示


谢谢,这解决了问题,我会接受的。如果有一种方法可以从可观察到的对象中发送完整的内容,那就太好了。假设我现在有1000个订阅者,现在我必须向所有观察者添加一个比较,而不是在可观察对象中添加1次并广播完成。 - Sven van den Boogaart
@SvenvandenBoogaart 在这种情况下,您可以使用take运算符,我会更新我的答案。 - Muhammed Albarmavi
@malbarmawi,这并不能解决我想要根据状态完成的情况,例如,如果在10:00时值为10。第一个解决方案到目前为止是最好的。 - Sven van den Boogaart

2
我的想法是将其运行40次。为此,您可以添加take(40)。一般来说,有几个像take这样的操作符可以完成可观察对象。请查看https://www.learnrxjs.io/operators/filtering/take.html
// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';

const source = interval(1000);

const example = source.pipe(
  take(40),
  map(value => value +1),
  map(value => {
    if(value === 40) {
      finish();
    }
    else if (value % 5 === 0){
      return 'can devide by 5 we did some magic';
  }else{
     return value;
  } })
);
const subscribe = example.subscribe(
  val => console.log(val), 
  error => console.log("Error handled: " , error), 
  () => console.log('resolved'));

这只是为了使示例简单,选择了40,但可能还有其他要求。例如,如果计数器在10:00时为10,则可以完成。因此,我们需要对该值进行评估。 - Sven van den Boogaart
4
"Finish"一词的起源是哪里? - Tony Ngo
@TonyNgo 我正在寻找如何发送一个完成信号,这个力是伪代码,它并不存在。我想在订阅者的完成方法中结束。 - Sven van den Boogaart
在这种情况下,您可以使用另一个take操作符 - takeUntil。例如:takeUntil(timer(targetDate - new Date())) - MoxxiManagarm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接