rxjs执行tap只在第一次时执行

26

我希望只有在第一个发出的值时才执行tap()函数。

类似以下操作:

Observable
  .pipe(
     tap(() => { /* execute only when I get the first emitted value */ })
  )
  .subscribe(() => {
     // .....
  })

在其他操作符中,有一个 .first() 操作符,你可以在 .pipe() 中使用它。 - Dasha Ermolova
请查看 https://github.com/cartant/rxjs-etc/blob/master/source/operators/initial.ts 和它的对应项 https://github.com/cartant/rxjs-etc/blob/master/source/operators/subsequent.ts。 - cartant
1
stream$.pipe(first() /* sameas take(1)*/, tap(...)).subscribe(() => {}) - Maksim Romanenko
1
firsttake 会在发出一个值后自动订阅该订阅。 - jenson-button-event
7个回答

33

您可以在map操作符(如concatMap)中使用索引。与其他方法不同,这种方法在所选择的索引上完全灵活。比如说,如果您想要在第二个发射上进行操作,则可以使用index === 1,或者使用任何谓词,例如index % 2 === 0

// these are because of using rxjs from CDN in code snippet, ignore them
const {of, interval} = rxjs;
const {take, tap, concatMap} = rxjs.operators;


// main code
const stream = interval(250).pipe(take(4))

stream.pipe(
  concatMap((value, index) => index === 0
    ? of(value).pipe(
        tap(() => console.log('tap'))
      )
    : of(value)
  )
)
.subscribe(x => console.log(x));
<script src="https://unpkg.com/@reactivex/rxjs@6.x/dist/global/rxjs.umd.js"></script>


17

如果我正确理解您的想法,您只想在流订阅开始时执行tap(),而不是其他时间。 这是我的自定义运算符:

import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';

export function startWithTap<T>(callback: () => void) {
  return (source: Observable<T>) =>
    of({}).pipe(tap(callback), switchMap((o) => source));
}

这个运算符的使用示例为:

this.api.getData().pipe(
  startWithTap(() => this.loading.start()),
)

这是我的实际代码示例,当有人通过httpClient订阅由api服务创建的Observable时,加载将开始。


更新

使用此代码代替上述实现,因为这个只使用defer,而不是使用oftapswitchMap

export function startWithTap<T>(callback: () => void) {
  return (source: Observable<T>) =>
    defer(() => {
      callback();
      return source;
    });
}

为什么更新后的解决方案更好? - enno.void
2
@enno.void 更新的解决方案只使用了一个rxjs操作符,与之前使用三个操作符的方式相同,因此更简洁。 - Goga Koreli
1
这个解决方案实际上与 OP 的要求不同。它在有订阅时调用回调函数,而不是在第一次发射时调用。 - Denis Loh
这其实是真的,但有些人想要这种行为,并最终在这个SO问题上得到了解答。 - Goga Koreli
在这里补充一个要注意的地方:这两种解决方案都不能延迟原始的可观察对象。然而,jBuchholz的答案可以实现延迟,所以如果延迟是你需要的功能,我建议选择那个答案。 - undefined

8

我喜欢jal's answer的方法,并建议将其封装在自己的运算符中:

export function tapOnce<T>(tapFn: (t: T) => void, tapIndex = 0): OperatorFunction<T, T> {
  return source$ => source$.pipe(concatMap((value, index) => {
    if (index === tapIndex) {
      tapFn(value);
    }
    return of(value);
  }));
}

使用方法如下:

stream.pipe(tapOnce(() => console.log('tapping once'), 1));

这甚至可以被更进一步地抽象为一个运算符,该运算符接受一个函数来确定它是否应该根据给定的值/索引进行轻敲操作。
export function tapWhen<T>(tapFn: (t: T) => void, evaluateFn: (index: number, t: T) => boolean): OperatorFunction<T, T> {
  return source$ => source$.pipe(concatMap((value, index) => {
    if (evaluateFn(index, value)) {
      tapFn(value);
    }
    return of(value);
  }));
}

4

如果有人感兴趣,这里是一个超级简单的 tapN 实现方式。所以它会执行指定的回调函数,直到发射次数等于 nEmissions。如果只想对第一个元素执行 tap() 函数,可以使用 tapN(1),但也可以用例如 tapN(3) 来对前三个发射执行 tap。

/* Executes the specified callback function for each emission until the number of emissions is equal to nEmissions*/
export const tapN = <T>(nEmissions, callback: (T) => void) => (source$: Observable<T>): Observable<T> =>
    defer(() => {
        let counter = 0;
        return source$.pipe(tap((item) => {
            if (counter < nEmissions) {
                callback(item);
                counter++;
            }
        }));
    });

您的代码中:
Observable
  .pipe(
     tapN(1, () => { /* this code would be only executed on the first emitted value */ })
  )
  .subscribe(() => {
     // .....
  })

3
除了已经提到的选项,您还可以使用多播
multicast(new Subject(), s => concat(
  s.pipe(
    take(1),
    tap(v => console.log('tap', v)),
  ),
  s
)

实时演示:https://stackblitz.com/edit/rxjs-shvuxm


你需要提供一个主题工厂函数 () => new Subject() 以使其正常工作。如果只提供一个Subject并多次订阅,将会混淆可观察到发出的值,详情请见:https://stackblitz.com/edit/rxjs-pul1gw?file=index.ts。 - frido

0
您可以像下面这样共享您的主要Observable:
import { timer, of, BehaviorSubject, interval } from 'rxjs';
import { tap, mapTo, share, shareReplay, } from 'rxjs/operators';

const source$ = timer(1000)
.pipe(
  tap((v) => console.log('SIDE EFFECT')),
  mapTo('RESULT')
)
const sharedSource$ = source$.pipe(share());
// or shareReplay(1) if you want to ensure every subscriber get the last value event if they will subscribe later;

sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);

https://stackblitz.com/edit/typescript-qpnbkm?embed=1&file=index.ts

这是一个示例,类似于learn-rxjs


-1

(更新我之前不正确的答案)

根据cartant的评论和提供的链接,他已经完成了创建一个执行此操作的运算符的工作,它在“rxjs-etc”包中。基于他的运算符的解决方案是安装“rxjs-etc”,然后:

import { initial } from 'rxjs-etc/operators';

observable$.pipe(
    initial(src$ => src$.pipe(tap(() => {/* execute only on first value */})))
).
subscribe(() => {
    // ..... 
})

这是一个关于编程的例子,可以在StackBlitz上进行操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接