我希望只有在第一个发出的值时才执行tap()函数。
类似以下操作:
Observable
.pipe(
tap(() => { /* execute only when I get the first emitted value */ })
)
.subscribe(() => {
// .....
})
我希望只有在第一个发出的值时才执行tap()函数。
类似以下操作:
Observable
.pipe(
tap(() => { /* execute only when I get the first emitted value */ })
)
.subscribe(() => {
// .....
})
您可以在map操作符(如concatMap
)中使用索引。与其他方法不同,这种方法在所选择的索引上完全灵活。比如说,如果您想要在第二个发射上进行操作,则可以使用index === 1
,或者使用任何谓词,例如index % 2 === 0
。
// these are because of using rxjs from CDN in code snippet, ignore them
const {of, interval} = rxjs;
const {take, tap, concatMap} = rxjs.operators;
// main code
const stream = interval(250).pipe(take(4))
stream.pipe(
concatMap((value, index) => index === 0
? of(value).pipe(
tap(() => console.log('tap'))
)
: of(value)
)
)
.subscribe(x => console.log(x));
<script src="https://unpkg.com/@reactivex/rxjs@6.x/dist/global/rxjs.umd.js"></script>
如果我正确理解您的想法,您只想在流订阅开始时执行tap()
,而不是其他时间。 这是我的自定义运算符:
import { Observable, of } from 'rxjs';
import { switchMap, tap } from 'rxjs/operators';
export function startWithTap<T>(callback: () => void) {
return (source: Observable<T>) =>
of({}).pipe(tap(callback), switchMap((o) => source));
}
这个运算符的使用示例为:
this.api.getData().pipe(
startWithTap(() => this.loading.start()),
)
这是我的实际代码示例,当有人通过httpClient订阅由api服务创建的Observable时,加载将开始。
更新
使用此代码代替上述实现,因为这个只使用defer
,而不是使用of
、tap
和switchMap
:
export function startWithTap<T>(callback: () => void) {
return (source: Observable<T>) =>
defer(() => {
callback();
return source;
});
}
我喜欢jal's answer的方法,并建议将其封装在自己的运算符中:
export function tapOnce<T>(tapFn: (t: T) => void, tapIndex = 0): OperatorFunction<T, T> {
return source$ => source$.pipe(concatMap((value, index) => {
if (index === tapIndex) {
tapFn(value);
}
return of(value);
}));
}
使用方法如下:
stream.pipe(tapOnce(() => console.log('tapping once'), 1));
export function tapWhen<T>(tapFn: (t: T) => void, evaluateFn: (index: number, t: T) => boolean): OperatorFunction<T, T> {
return source$ => source$.pipe(concatMap((value, index) => {
if (evaluateFn(index, value)) {
tapFn(value);
}
return of(value);
}));
}
如果有人感兴趣,这里是一个超级简单的 tapN 实现方式。所以它会执行指定的回调函数,直到发射次数等于 nEmissions。如果只想对第一个元素执行 tap() 函数,可以使用 tapN(1),但也可以用例如 tapN(3) 来对前三个发射执行 tap。
/* Executes the specified callback function for each emission until the number of emissions is equal to nEmissions*/
export const tapN = <T>(nEmissions, callback: (T) => void) => (source$: Observable<T>): Observable<T> =>
defer(() => {
let counter = 0;
return source$.pipe(tap((item) => {
if (counter < nEmissions) {
callback(item);
counter++;
}
}));
});
Observable
.pipe(
tapN(1, () => { /* this code would be only executed on the first emitted value */ })
)
.subscribe(() => {
// .....
})
多播
。multicast(new Subject(), s => concat(
s.pipe(
take(1),
tap(v => console.log('tap', v)),
),
s
)
() => new Subject()
以使其正常工作。如果只提供一个Subject并多次订阅,将会混淆可观察到发出的值,详情请见:https://stackblitz.com/edit/rxjs-pul1gw?file=index.ts。 - fridoimport { timer, of, BehaviorSubject, interval } from 'rxjs';
import { tap, mapTo, share, shareReplay, } from 'rxjs/operators';
const source$ = timer(1000)
.pipe(
tap((v) => console.log('SIDE EFFECT')),
mapTo('RESULT')
)
const sharedSource$ = source$.pipe(share());
// or shareReplay(1) if you want to ensure every subscriber get the last value event if they will subscribe later;
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
sharedSource$.subscribe(console.log);
https://stackblitz.com/edit/typescript-qpnbkm?embed=1&file=index.ts
这是一个示例,类似于learn-rxjs
(更新我之前不正确的答案)
根据cartant的评论和提供的链接,他已经完成了创建一个执行此操作的运算符的工作,它在“rxjs-etc”包中。基于他的运算符的解决方案是安装“rxjs-etc”,然后:
import { initial } from 'rxjs-etc/operators';
observable$.pipe(
initial(src$ => src$.pipe(tap(() => {/* execute only on first value */})))
).
subscribe(() => {
// .....
})
这是一个关于编程的例子,可以在StackBlitz上进行操作。
stream$.pipe(first() /* sameas take(1)*/, tap(...)).subscribe(() => {})
- Maksim Romanenkofirst
和take
会在发出一个值后自动订阅该订阅。 - jenson-button-event