我对之前的解决方案有一些问题和顾虑。以下是我的解决方案:
- 适用于 Promises 和 Observables
- 提供了选项,可以确定何时完成任务的 Observables(例如完成/错误、首次发出、其他)
- 提供了一个选项,可以在任务完成时间过长时发出警告
- Angular UDK 似乎不会考虑在组件外部启动的任务(例如,由NGXS启动)。这提供了一个 awaitMacroTasks() 方法,可以从组件中调用以解决此问题。
Gist
import { Inject, Injectable, InjectionToken, OnDestroy, Optional } from "@angular/core";
import { BehaviorSubject, Observable, of, Subject, Subscription } from "rxjs";
import { finalize, switchMap, takeUntil, takeWhile, tap } from "rxjs/operators";
export const MACRO_TASK_WRAPPER_OPTIONS = new InjectionToken<MacroTaskWrapperOptions>("MacroTaskWrapperOptions");
export interface MacroTaskWrapperOptions {
wrapMacroTaskTooLongWarningThreshold?: number;
}
@Injectable({ providedIn: "root" })
export class MacroTaskWrapperService implements OnDestroy {
wrapMacroTaskTooLongWarningThreshold: number;
constructor(@Inject(MACRO_TASK_WRAPPER_OPTIONS) @Optional() options?: MacroTaskWrapperOptions) {
this.wrapMacroTaskTooLongWarningThreshold =
options && options.wrapMacroTaskTooLongWarningThreshold != null ? options.wrapMacroTaskTooLongWarningThreshold : 10000;
}
ngOnDestroy() {
this.macroTaskCount.next(0);
this.macroTaskCount.complete();
}
awaitMacroTasks$(label: string, stackTrace?: string): Observable<number> {
return this._wrapMacroTaskObservable(
"__awaitMacroTasks__" + label,
of(null)
.pipe(switchMap(() => this.macroTaskCount))
.pipe(takeWhile(v => v > 0)),
null,
"complete",
false,
stackTrace,
);
}
awaitMacroTasks(label: string, stackTrace?: string): Subscription {
return this.awaitMacroTasks$(label, stackTrace).subscribe();
}
awaitMacroTasksLogged(label: string, stackTrace?: string): Subscription {
console.error("MACRO START");
return this.awaitMacroTasks$(label, stackTrace).subscribe(() => {}, () => {}, () => console.error("MACRO DONE"));
}
wrapMacroTask<T>(
label: string,
request: Promise<T>,
warnIfTakingTooLongThreshold?: number | null,
isDoneOn?: IWaitForObservableIsDoneOn<T> | null,
stackTrace?: string | null,
): Promise<T>;
wrapMacroTask<T>(
label: string,
request: Observable<T>,
warnIfTakingTooLongThreshold?: number | null,
isDoneOn?: IWaitForObservableIsDoneOn<T> | null,
stackTrace?: string | null,
): Observable<T>;
wrapMacroTask<T>(
label: string,
request: Promise<T> | Observable<T>,
warnIfTakingTooLongThreshold?: number | null,
isDoneOn?: IWaitForObservableIsDoneOn<T> | null,
stackTrace?: string | null,
): Promise<T> | Observable<T> {
if (request instanceof Promise) {
return this.wrapMacroTaskPromise(label, request, warnIfTakingTooLongThreshold, stackTrace);
} else if (request instanceof Observable) {
return this.wrapMacroTaskObservable(label, request, warnIfTakingTooLongThreshold, isDoneOn, stackTrace);
}
if ("then" in request && typeof (request as any).then === "function") {
return this.wrapMacroTaskPromise(label, request, warnIfTakingTooLongThreshold, stackTrace);
} else {
return this.wrapMacroTaskObservable(label, request as Observable<T>, warnIfTakingTooLongThreshold, isDoneOn, stackTrace);
}
}
async wrapMacroTaskPromise<T>(
label: string,
request: Promise<T>,
warnIfTakingTooLongThreshold?: number | null,
stackTrace?: string | null,
): Promise<T> {
if (typeof warnIfTakingTooLongThreshold !== "number") {
warnIfTakingTooLongThreshold = this.wrapMacroTaskTooLongWarningThreshold;
}
let hasTakenTooLong = false;
let takingTooLongTimeout: any = null;
if (warnIfTakingTooLongThreshold! > 0 && takingTooLongTimeout == null) {
takingTooLongTimeout = setTimeout(() => {
hasTakenTooLong = true;
clearTimeout(takingTooLongTimeout);
takingTooLongTimeout = null;
console.warn(
`wrapMacroTaskPromise: Promise is taking too long to complete. Longer than ${warnIfTakingTooLongThreshold}ms.`,
);
console.warn("Task Label: ", label);
if (stackTrace) {
console.warn("Task Stack Trace: ", stackTrace);
}
}, warnIfTakingTooLongThreshold!);
}
const task: MacroTask = Zone.current.scheduleMacroTask("wrapMacroTaskPromise", () => {}, {}, () => {}, () => {});
this.macroTaskStarted();
const endTask = () => {
task.invoke();
this.macroTaskEnded();
if (takingTooLongTimeout != null) {
clearTimeout(takingTooLongTimeout);
takingTooLongTimeout = null;
}
if (hasTakenTooLong) {
console.warn("Long Running Macro Task is Finally Complete: ", label);
}
};
try {
const result = await request;
endTask();
return result;
} catch (ex) {
endTask();
throw ex;
}
}
wrapMacroTaskObservable<T>(
label: string,
request: Observable<T>,
warnIfTakingTooLongThreshold?: number | null,
isDoneOn?: IWaitForObservableIsDoneOn<T> | null,
stackTrace?: string | null,
): Observable<T> {
return this._wrapMacroTaskObservable(label, request, warnIfTakingTooLongThreshold, isDoneOn, true, stackTrace);
}
protected _wrapMacroTaskObservable<T>(
label: string,
request: Observable<T>,
warnIfTakingTooLongThreshold?: number | null,
isDoneOn?: IWaitForObservableIsDoneOn<T> | null,
isCounted: boolean = true,
stackTrace?: string | null,
): Observable<T> {
return of(null).pipe(
switchMap(() => {
let counts = 0;
let emitPredicate: (d: T) => boolean;
if (isDoneOn == null || isDoneOn === "complete") {
emitPredicate = alwaysFalse;
} else if (isDoneOn === "first-emit") {
emitPredicate = makeEmitCountPredicate(1);
} else if ("emitCount" in isDoneOn) {
emitPredicate = makeEmitCountPredicate(isDoneOn.emitCount);
} else if ("emitPredicate" in isDoneOn) {
emitPredicate = isDoneOn.emitPredicate;
} else {
console.warn("wrapMacroTaskObservable: Invalid isDoneOn value given. Defaulting to 'complete'.", isDoneOn);
emitPredicate = alwaysFalse;
}
if (typeof warnIfTakingTooLongThreshold !== "number") {
warnIfTakingTooLongThreshold = this.wrapMacroTaskTooLongWarningThreshold;
}
let task: MacroTask | null = null;
let takingTooLongTimeout: any = null;
let hasTakenTooLong = false;
const endTask = () => {
if (task != null) {
task.invoke();
task = null;
if (hasTakenTooLong) {
console.warn("Long Running Macro Task is Finally Complete: ", label);
}
}
this.macroTaskEnded(counts);
counts = 0;
if (takingTooLongTimeout != null) {
clearTimeout(takingTooLongTimeout);
takingTooLongTimeout = null;
}
};
const unsubSubject = new Subject();
function unsub() {
unsubSubject.next();
unsubSubject.complete();
}
return of(null)
.pipe(
tap(() => {
if (task == null) {
task = Zone.current.scheduleMacroTask("wrapMacroTaskObservable", () => {}, {}, () => {}, unsub);
}
if (isCounted) {
this.macroTaskStarted();
counts++;
}
if (warnIfTakingTooLongThreshold! > 0 && takingTooLongTimeout == null) {
takingTooLongTimeout = setTimeout(() => {
hasTakenTooLong = true;
clearTimeout(takingTooLongTimeout);
takingTooLongTimeout = null;
console.warn(
`wrapMacroTaskObservable: Observable is taking too long to complete. Longer than ${warnIfTakingTooLongThreshold}ms.`,
);
console.warn("Task Label: ", label);
if (stackTrace) {
console.warn("Task Stack Trace: ", stackTrace);
}
}, warnIfTakingTooLongThreshold!);
}
}),
)
.pipe(switchMap(() => request.pipe(takeUntil(unsubSubject))))
.pipe(
tap(v => {
if (task != null) {
if (emitPredicate(v)) {
endTask();
}
}
}),
)
.pipe(
finalize(() => {
endTask();
unsubSubject.complete();
}),
);
}),
);
}
protected macroTaskCount = new BehaviorSubject(0);
protected macroTaskStarted(counts: number = 1) {
const nextTaskCount = this.macroTaskCount.value + counts;
this.macroTaskCount.next(nextTaskCount);
}
protected macroTaskEnded(counts: number = 1) {
const nextTaskCount = this.macroTaskCount.value - counts;
this.macroTaskCount.next(nextTaskCount);
}
}
export type IWaitForObservableIsDoneOn<T = any> =
| "complete"
| "first-emit"
| { emitCount: number }
| { emitPredicate: (d: T) => boolean };
function makeEmitCountPredicate(emitCount: number) {
let count = 0;
return () => {
count++;
return count >= emitCount;
};
}
function alwaysFalse() {
return false;
}