我是一个在RXJS库中的新手用户,正在尝试弄清楚如何正确地使用Observable和Subjects。 我正在尝试将其与模式设计Observer进行对比。 某个时候,我有一个问题,即来自RXJS库的Observable实例是否是Observer模式设计的特殊情况?
我是一个在RXJS库中的新手用户,正在尝试弄清楚如何正确地使用Observable和Subjects。 我正在尝试将其与模式设计Observer进行对比。 某个时候,我有一个问题,即来自RXJS库的Observable实例是否是Observer模式设计的特殊情况?
Observable
是指,根据定义,会随着时间推移而发出数据的实体。这听起来有点模糊,同时也非常有趣。new Observable(subscriber => {})
创建一个 Observable
时,你就定义了一个源,或者说是链表的头节点。此外,你是否曾经想过为什么这个参数被称为 subscriber
或 observer
?我也会尝试分享我的看法。Observable.pipe()
来创建主链表。pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
return operations.length ? pipeFromArray(operations)(this) : this;
}
export function pipeFromArray<T, R>(fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
if (fns.length === 0) {
return identity as UnaryFunction<any, any>;
}
if (fns.length === 1) {
return fns[0];
}
return function piped(input: T): R {
return fns.reduce((prev: any, fn: UnaryFunction<T, R>) => fn(prev), input as any);
};
}
protected lift<R>(operator?: Operator<T, R>): Observable<R> {
const observable = new Observable<R>();
observable.source = this;
observable.operator = operator;
return observable;
}
如您所知,在 RxJS 中有许多操作符。一个 operator
是一个函数,它返回另一个函数,该函数的参数是一个类型为 T
的 Observable
,其返回值也是一个类型为 R
的 Observable
。
例如,map()
:
export function map<T, R>(project: (value: T, index: number) => R, thisArg?: any): OperatorFunction<T, R> {
return function mapOperation(source: Observable<T>): Observable<R> {
if (typeof project !== 'function') {
throw new TypeError('argument is not a function. Are you looking for `mapTo()`?');
}
return lift(source, new MapOperator(project, thisArg));
};
}
所以,当你有
const src$ = new Observable(s => /* ... */)
.pipe(
map(/* ... */)
)
几件事情将会发生:
Observable
实例;提供的回调函数(在这种情况下是s => ...
)将被存储在_subscribe
属性中。pipe()
被调用;它将返回fns[0]
,在这种情况下是mapOperation
函数。mapOperation
将使用Observable
实例作为其参数(来自pipeFromArray(operations)(this)
);当调用时,它将调用source.lift(new MapOperator(project, thisArg));
;Observable.lift()
是向此链表添加节点的方法;正如您所看到的,一个节点(除了HEAD
)包含source
和代表它的operator
。src$
时,将基于此列表创建另一个列表。在这个列表中,每个节点都将是一个 Subscriber
。创建这个列表的依据是每个 operator
必须具有一个 call
方法。export interface Operator<T, R> {
call(subscriber: Subscriber<R>, source: any): TeardownLogic;
}
export class MapOperator<T, R> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => R, private thisArg: any) {
}
call(subscriber: Subscriber<R>, source: any): any {
return source.subscribe(new MapSubscriber(subscriber, this.project, this.thisArg));
}
}
Subscriber
节点之间的关系是在Observable.subscribe()
中建立的。
在这种情况下,来自new Observable(s => ...)
(以上示例)的s
参数将是MapSubscriber
。
看起来我偏离了问题,但通过上述解释,我想证明这里并没有太多的Observer
模式。
可以使用Subject
实现此模式,扩展Observable
:
export class Subject<T> extends Observable<T> implements SubscriptionLike { }
这意味着您可以使用Subject.pipe(...)
和Subject.subscribe(subscriber)
。为了实现这种模式,Subject
采用了一种自定义_subscribe
方法:
_subscribe(subscriber: Subscriber<T>): Subscription {
if (this.closed) {
throw new ObjectUnsubscribedError();
} else if (this.hasError) {
subscriber.error(this.thrownError);
return Subscription.EMPTY;
} else if (this.isStopped) {
subscriber.complete();
return Subscription.EMPTY;
} else {
// !!!
this.observers.push(subscriber);
return new SubjectSubscription(this, subscriber);
}
}
Subject
类跟踪其观察者(订阅者),因此当它发出一个值时,使用Subject.next()
,所有观察者都将收到它:next(value: T) {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
if (!this.isStopped) {
const { observers } = this;
const len = observers.length;
const copy = observers.slice();
for (let i = 0; i < len; i++) {
copy[i].next(value!);
}
}
}
src$.pipe(subjectInstance);