Skip to content

Commit 1c7b356

Browse files
arturovtthePunderWoman
authored andcommitted
fix(core): release hasPendingTasks observers (#59723)
In this commit, we unsubscribe the `hasPendingTasks` subject to remove all active observers and enable granular garbage collection, as users may forget to unsubscribe manually when subscribing to `isStable`. PR Close #59723
1 parent a07ee60 commit 1c7b356

File tree

10 files changed

+87
-47
lines changed

10 files changed

+87
-47
lines changed

goldens/public-api/core/index.api.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ export class ApplicationRef {
143143
get destroyed(): boolean;
144144
detachView(viewRef: ViewRef): void;
145145
get injector(): EnvironmentInjector;
146-
readonly isStable: Observable<boolean>;
146+
get isStable(): Observable<boolean>;
147147
onDestroy(callback: () => void): VoidFunction;
148148
tick(): void;
149149
get viewCount(): number;

packages/common/http/test/transfer_cache_spec.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,9 @@ describe('TransferCache', () => {
103103

104104
@Injectable()
105105
class ApplicationRefPatched extends ApplicationRef {
106-
override isStable = new BehaviorSubject<boolean>(false);
106+
override get isStable() {
107+
return isStable;
108+
}
107109
}
108110

109111
TestBed.configureTestingModule({
@@ -120,7 +122,6 @@ describe('TransferCache', () => {
120122

121123
const appRef = TestBed.inject(ApplicationRef);
122124
appRef.bootstrap(SomeComponent);
123-
isStable = appRef.isStable as BehaviorSubject<boolean>;
124125
}),
125126
);
126127

@@ -346,7 +347,9 @@ describe('TransferCache', () => {
346347

347348
@Injectable()
348349
class ApplicationRefPatched extends ApplicationRef {
349-
override isStable = new BehaviorSubject<boolean>(false);
350+
override get isStable() {
351+
return new BehaviorSubject<boolean>(false);
352+
}
350353
}
351354

352355
TestBed.configureTestingModule({
@@ -381,7 +384,9 @@ describe('TransferCache', () => {
381384

382385
@Injectable()
383386
class ApplicationRefPatched extends ApplicationRef {
384-
override isStable = new BehaviorSubject<boolean>(false);
387+
override get isStable() {
388+
return new BehaviorSubject<boolean>(false);
389+
}
385390
}
386391

387392
TestBed.configureTestingModule({
@@ -479,7 +484,9 @@ describe('TransferCache', () => {
479484

480485
@Injectable()
481486
class ApplicationRefPatched extends ApplicationRef {
482-
override isStable = new BehaviorSubject<boolean>(false);
487+
override get isStable() {
488+
return new BehaviorSubject<boolean>(false);
489+
}
483490
}
484491

485492
TestBed.configureTestingModule({
@@ -527,7 +534,9 @@ describe('TransferCache', () => {
527534

528535
@Injectable()
529536
class ApplicationRefPatched extends ApplicationRef {
530-
override isStable = new BehaviorSubject<boolean>(false);
537+
override get isStable() {
538+
return new BehaviorSubject<boolean>(false);
539+
}
531540
}
532541

533542
TestBed.configureTestingModule({
@@ -577,7 +586,9 @@ describe('TransferCache', () => {
577586

578587
@Injectable()
579588
class ApplicationRefPatched extends ApplicationRef {
580-
override isStable = new BehaviorSubject<boolean>(false);
589+
override get isStable() {
590+
return new BehaviorSubject<boolean>(false);
591+
}
581592
}
582593

583594
TestBed.configureTestingModule({

packages/core/rxjs-interop/test/pending_until_event_spec.ts

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ describe('pendingUntilEvent', () => {
4040
it('should not block stability until subscription', async () => {
4141
const originalSource = new BehaviorSubject(0);
4242
const delayedSource = originalSource.pipe(delay(5), pendingUntilEvent(injector));
43-
expect(taskService.hasPendingTasks.value).toEqual(false);
43+
expect(taskService.hasPendingTasks).toEqual(false);
4444

4545
const emitPromise = firstValueFrom(delayedSource);
46-
expect(taskService.hasPendingTasks.value).toEqual(true);
46+
expect(taskService.hasPendingTasks).toEqual(true);
4747

4848
await expectAsync(emitPromise).toBeResolvedTo(0);
4949
await expectAsync(appRef.whenStable()).toBeResolved();
@@ -53,23 +53,23 @@ describe('pendingUntilEvent', () => {
5353
const source = of(1).pipe(pendingUntilEvent(injector));
5454

5555
// stable before subscription
56-
expect(taskService.hasPendingTasks.value).toEqual(false);
56+
expect(taskService.hasPendingTasks).toEqual(false);
5757
source.subscribe(() => {
5858
// unstable within synchronous subscription body
59-
expect(taskService.hasPendingTasks.value).toBe(true);
59+
expect(taskService.hasPendingTasks).toBe(true);
6060
});
6161
// stable after above synchronous subscription execution
6262
await expectAsync(appRef.whenStable()).toBeResolved();
6363
});
6464

6565
it('only blocks stability until first emit', async () => {
6666
const intervalSource = interval(5).pipe(pendingUntilEvent(injector));
67-
expect(taskService.hasPendingTasks.value).toEqual(false);
67+
expect(taskService.hasPendingTasks).toEqual(false);
6868

6969
await new Promise<void>(async (resolve) => {
7070
const subscription = intervalSource.subscribe(async (v) => {
7171
if (v === 0) {
72-
expect(taskService.hasPendingTasks.value).toBe(true);
72+
expect(taskService.hasPendingTasks).toBe(true);
7373
} else {
7474
await expectAsync(appRef.whenStable()).toBeResolved();
7575
}
@@ -78,22 +78,22 @@ describe('pendingUntilEvent', () => {
7878
resolve();
7979
}
8080
});
81-
expect(taskService.hasPendingTasks.value).toBe(true);
81+
expect(taskService.hasPendingTasks).toBe(true);
8282
});
8383
});
8484

8585
it('should unblock stability on complete (but no emit)', async () => {
8686
const sub = new Subject();
8787
sub.asObservable().pipe(pendingUntilEvent(injector)).subscribe();
88-
expect(taskService.hasPendingTasks.value).toBe(true);
88+
expect(taskService.hasPendingTasks).toBe(true);
8989
sub.complete();
9090
await expectAsync(appRef.whenStable()).toBeResolved();
9191
});
9292

9393
it('should unblock stability on unsubscribe before emit', async () => {
9494
const sub = new Subject();
9595
const subscription = sub.asObservable().pipe(pendingUntilEvent(injector)).subscribe();
96-
expect(taskService.hasPendingTasks.value).toBe(true);
96+
expect(taskService.hasPendingTasks).toBe(true);
9797
subscription.unsubscribe();
9898
await expectAsync(appRef.whenStable()).toBeResolved();
9999
});
@@ -111,12 +111,12 @@ describe('pendingUntilEvent', () => {
111111
.pipe(
112112
finalize(() => {
113113
finalizeExecuted = true;
114-
expect(taskService.hasPendingTasks.value).toBe(true);
114+
expect(taskService.hasPendingTasks).toBe(true);
115115
}),
116116
pendingUntilEvent(injector),
117117
)
118118
.subscribe();
119-
expect(taskService.hasPendingTasks.value).toBe(true);
119+
expect(taskService.hasPendingTasks).toBe(true);
120120
subscription.unsubscribe();
121121
await expectAsync(appRef.whenStable()).toBeResolved();
122122
expect(finalizeExecuted).toBe(true);
@@ -125,7 +125,7 @@ describe('pendingUntilEvent', () => {
125125
it('should not throw if application is destroyed before emit', async () => {
126126
const sub = new Subject<void>();
127127
sub.asObservable().pipe(pendingUntilEvent(injector)).subscribe();
128-
expect(taskService.hasPendingTasks.value).toBe(true);
128+
expect(taskService.hasPendingTasks).toBe(true);
129129
TestBed.resetTestingModule();
130130
await expectAsync(appRef.whenStable()).toBeResolved();
131131
sub.next();
@@ -141,7 +141,7 @@ describe('pendingUntilEvent', () => {
141141
catchError(() => EMPTY),
142142
)
143143
.subscribe();
144-
expect(taskService.hasPendingTasks.value).toBe(true);
144+
expect(taskService.hasPendingTasks).toBe(true);
145145
sub.error(new Error('error in pipe'));
146146
await expectAsync(appRef.whenStable()).toBeResolved();
147147
sub.next();
@@ -166,7 +166,7 @@ describe('pendingUntilEvent', () => {
166166
throw new Error('oh noes');
167167
},
168168
});
169-
expect(taskService.hasPendingTasks.value).toBe(true);
169+
expect(taskService.hasPendingTasks).toBe(true);
170170
const errorPromise = nextUncaughtError();
171171
sub.next();
172172
await expectAsync(errorPromise).toBeResolved();
@@ -229,13 +229,13 @@ describe('pendingUntilEvent', () => {
229229
const observable = sub.asObservable().pipe(delay(5), pendingUntilEvent(injector));
230230

231231
observable.subscribe();
232-
expect(taskService.hasPendingTasks.value).toBe(true);
232+
expect(taskService.hasPendingTasks).toBe(true);
233233
sub.next();
234234
observable.subscribe();
235235
// first subscription unblocks
236236
await new Promise((r) => setTimeout(r, 5));
237237
// still pending because the other subscribed after the emit
238-
expect(taskService.hasPendingTasks.value).toBe(true);
238+
expect(taskService.hasPendingTasks).toBe(true);
239239

240240
sub.next();
241241
await new Promise((r) => setTimeout(r, 3));
@@ -244,7 +244,7 @@ describe('pendingUntilEvent', () => {
244244
// second subscription unblocks
245245
await new Promise((r) => setTimeout(r, 2));
246246
// still pending because third subscription delay not finished
247-
expect(taskService.hasPendingTasks.value).toBe(true);
247+
expect(taskService.hasPendingTasks).toBe(true);
248248

249249
// finishes third subscription
250250
await new Promise((r) => setTimeout(r, 3));

packages/core/src/application/application_ref.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import '../util/ng_jit_mode';
1111
import '../util/ng_server_mode';
1212

1313
import {setActiveConsumer, setThrowInvalidWriteToSignalError} from '../../primitives/signals';
14-
import {Observable, Subject, Subscription} from 'rxjs';
14+
import {type Observable, Subject, type Subscription} from 'rxjs';
1515
import {map} from 'rxjs/operators';
1616

1717
import {ZONELESS_ENABLED} from '../change_detection/scheduling/zoneless_scheduling';
@@ -22,7 +22,7 @@ import {InjectionToken} from '../di/injection_token';
2222
import {Injector} from '../di/injector';
2323
import {EnvironmentInjector, type R3Injector} from '../di/r3_injector';
2424
import {formatRuntimeError, RuntimeError, RuntimeErrorCode} from '../errors';
25-
import {ErrorHandler, INTERNAL_APPLICATION_ERROR_HANDLER} from '../error_handler';
25+
import {INTERNAL_APPLICATION_ERROR_HANDLER} from '../error_handler';
2626
import {Type} from '../interface/type';
2727
import {ComponentFactory, ComponentRef} from '../linker/component_factory';
2828
import {ComponentFactoryResolver} from '../linker/component_factory_resolver';
@@ -334,12 +334,15 @@ export class ApplicationRef {
334334
*/
335335
public readonly components: ComponentRef<any>[] = [];
336336

337+
private internalPendingTask = inject(PendingTasksInternal);
338+
337339
/**
338340
* Returns an Observable that indicates when the application is stable or unstable.
339341
*/
340-
public readonly isStable: Observable<boolean> = inject(PendingTasksInternal).hasPendingTasks.pipe(
341-
map((pending) => !pending),
342-
);
342+
public get isStable(): Observable<boolean> {
343+
// This is a getter because it might be invoked after the application has been destroyed.
344+
return this.internalPendingTask.hasPendingTasksObservable.pipe(map((pending) => !pending));
345+
}
343346

344347
constructor() {
345348
// Inject the tracing service to initialize it.

packages/core/src/pending_tasks.ts

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* found in the LICENSE file at https://angular.dev/license
77
*/
88

9-
import {BehaviorSubject} from 'rxjs';
9+
import {BehaviorSubject, Observable} from 'rxjs';
1010

1111
import {inject} from './di/injector_compatibility';
1212
import {ɵɵdefineInjectable} from './di/interface/defs';
@@ -23,14 +23,35 @@ import {INTERNAL_APPLICATION_ERROR_HANDLER} from './error_handler';
2323
export class PendingTasksInternal implements OnDestroy {
2424
private taskId = 0;
2525
private pendingTasks = new Set<number>();
26-
private get _hasPendingTasks() {
27-
return this.hasPendingTasks.value;
26+
private destroyed = false;
27+
28+
private pendingTask = new BehaviorSubject<boolean>(false);
29+
30+
get hasPendingTasks(): boolean {
31+
// Accessing the value of a closed `BehaviorSubject` throws an error.
32+
return this.destroyed ? false : this.pendingTask.value;
33+
}
34+
35+
/**
36+
* In case the service is about to be destroyed, return a self-completing observable.
37+
* Otherwise, return the observable that emits the current state of pending tasks.
38+
*/
39+
get hasPendingTasksObservable(): Observable<boolean> {
40+
if (this.destroyed) {
41+
// Manually creating the observable pulls less symbols from RxJS than `of(false)`.
42+
return new Observable<boolean>((subscriber) => {
43+
subscriber.next(false);
44+
subscriber.complete();
45+
});
46+
}
47+
48+
return this.pendingTask;
2849
}
29-
hasPendingTasks = new BehaviorSubject<boolean>(false);
3050

3151
add(): number {
32-
if (!this._hasPendingTasks) {
33-
this.hasPendingTasks.next(true);
52+
// Emitting a value to a closed subject throws an error.
53+
if (!this.hasPendingTasks && !this.destroyed) {
54+
this.pendingTask.next(true);
3455
}
3556
const taskId = this.taskId++;
3657
this.pendingTasks.add(taskId);
@@ -43,16 +64,23 @@ export class PendingTasksInternal implements OnDestroy {
4364

4465
remove(taskId: number): void {
4566
this.pendingTasks.delete(taskId);
46-
if (this.pendingTasks.size === 0 && this._hasPendingTasks) {
47-
this.hasPendingTasks.next(false);
67+
if (this.pendingTasks.size === 0 && this.hasPendingTasks) {
68+
this.pendingTask.next(false);
4869
}
4970
}
5071

5172
ngOnDestroy(): void {
5273
this.pendingTasks.clear();
53-
if (this._hasPendingTasks) {
54-
this.hasPendingTasks.next(false);
74+
if (this.hasPendingTasks) {
75+
this.pendingTask.next(false);
5576
}
77+
// We call `unsubscribe()` to release observers, as users may forget to
78+
// unsubscribe manually when subscribing to `isStable`. We do not call
79+
// `complete()` because it is unsafe; if someone subscribes using the `first`
80+
// operator and the observable completes before emitting a value,
81+
// RxJS will throw an error.
82+
this.destroyed = true;
83+
this.pendingTask.unsubscribe();
5684
}
5785

5886
/** @nocollapse */

packages/core/test/acceptance/pending_tasks_spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ function applicationRefIsStable(applicationRef: ApplicationRef) {
122122
function hasPendingTasks(pendingTasks: PendingTasksInternal): Promise<boolean> {
123123
return of(EMPTY)
124124
.pipe(
125-
withLatestFrom(pendingTasks.hasPendingTasks),
125+
withLatestFrom(pendingTasks.hasPendingTasksObservable),
126126
map(([_, hasPendingTasks]) => hasPendingTasks),
127127
)
128128
.toPromise() as Promise<boolean>;

packages/core/testing/src/component_fixture.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ export class ComponentFixture<T> {
208208
* yet.
209209
*/
210210
isStable(): boolean {
211-
return !this.pendingTasks.hasPendingTasks.value;
211+
return !this.pendingTasks.hasPendingTasks;
212212
}
213213

214214
/**

packages/platform-browser/test/hydration_spec.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ describe('provideClientHydration', () => {
5050

5151
@Injectable()
5252
class ApplicationRefPatched extends ApplicationRef {
53-
override isStable = new BehaviorSubject<boolean>(false);
53+
override get isStable() {
54+
return new BehaviorSubject(false);
55+
}
5456
}
5557

5658
beforeEach(() => {

packages/platform-server/test/event_replay_spec.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,8 +419,6 @@ describe('event replay', () => {
419419
await Promise.resolve();
420420

421421
expect(logs).toEqual([
422-
'isStable=false',
423-
'isStable=true',
424422
'isStable=false',
425423
// In the end, the application became stable while being destroyed.
426424
'isStable=true',

packages/platform-server/test/full_app_hydration_spec.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7070,8 +7070,6 @@ describe('platform-server full application hydration integration', () => {
70707070
appRef.destroy();
70717071

70727072
expect(logs).toEqual([
7073-
'isStable=false',
7074-
'isStable=true',
70757073
'isStable=false',
70767074
// In the end, the application became stable while being destroyed.
70777075
'isStable=true',

0 commit comments

Comments
 (0)