Skip to content

Commit 57a240b

Browse files
alxhubthePunderWoman
authored andcommitted
refactor(core): support an opt-in sync version of toObservable (#60640)
This commit adds a flag `forceSyncFirstEmit` which opts in to the pending new behavior for `toObservable`, which emits the first value synchronously. This flag is only really meant for use during a short migration period while we update g3, and is not meant for prolonged usage. As a result, it's marked deprecated. PR Close #60640
1 parent d025f37 commit 57a240b

File tree

3 files changed

+154
-0
lines changed

3 files changed

+154
-0
lines changed

goldens/public-api/core/rxjs-interop/index.api.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ export function toObservable<T>(source: Signal<T>, options?: ToObservableOptions
3939

4040
// @public
4141
export interface ToObservableOptions {
42+
// @deprecated
43+
forceSyncFirstEmit?: true;
4244
injector?: Injector;
4345
}
4446

packages/core/rxjs-interop/src/to_observable.ts

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
Signal,
1616
untracked,
1717
} from '../../src/core';
18+
import {SIGNAL, ReactiveNode} from '../../primitives/signals';
1819
import {Observable, ReplaySubject} from 'rxjs';
1920

2021
/**
@@ -30,6 +31,16 @@ export interface ToObservableOptions {
3031
* will be used.
3132
*/
3233
injector?: Injector;
34+
35+
/**
36+
* Temporary option for forcing a synchronous emit of the signal's initial value.
37+
*
38+
* This will eventually become the default behavior, but is opt-in to allow a short migration
39+
* period.
40+
*
41+
* @deprecated will become default behavior
42+
*/
43+
forceSyncFirstEmit?: true;
3344
}
3445

3546
/**
@@ -42,6 +53,10 @@ export interface ToObservableOptions {
4253
* @developerPreview
4354
*/
4455
export function toObservable<T>(source: Signal<T>, options?: ToObservableOptions): Observable<T> {
56+
if (options?.forceSyncFirstEmit === true) {
57+
return toObservableNext(source, options);
58+
}
59+
4560
!options?.injector && assertInInjectionContext(toObservable);
4661
const injector = options?.injector ?? inject(Injector);
4762
const subject = new ReplaySubject<T>(1);
@@ -67,3 +82,88 @@ export function toObservable<T>(source: Signal<T>, options?: ToObservableOptions
6782

6883
return subject.asObservable();
6984
}
85+
86+
/**
87+
* New version of `toObservable` with always-synchronous first emit.
88+
*
89+
* This will eventually replace the other implementation.
90+
*/
91+
function toObservableNext<T>(source: Signal<T>, options?: ToObservableOptions): Observable<T> {
92+
!options?.injector && assertInInjectionContext(toObservable);
93+
const injector = options?.injector ?? inject(Injector);
94+
95+
return new Observable<T>((subscriber) => {
96+
let firstVersion: number = -1;
97+
let firstValue: T;
98+
try {
99+
firstValue = untracked(source);
100+
} catch (err) {
101+
// A failure on the first read just errors the observable without
102+
// creating an effect.
103+
subscriber.error(err);
104+
return;
105+
}
106+
// We cache the `version` of the first value. This lets us avoid emitting
107+
// this value a second time during the `effect`.
108+
firstVersion = signalVersion(source);
109+
110+
// Emit the first value synchronously on subscription.
111+
subscriber.next(firstValue);
112+
113+
// Create an effect that will watch the signal for future changes.
114+
let firstEmit = true;
115+
const ref = effect(
116+
() => {
117+
let value: T;
118+
try {
119+
// Read the value (& track it).
120+
value = source();
121+
} catch (err) {
122+
// Errors cause the Observable stream to terminate.
123+
untracked(() => subscriber.error(err));
124+
cleanup(false);
125+
return;
126+
}
127+
128+
// Skip the emit of the value if it hasn't changed since the
129+
// synchronous emit.
130+
if (firstEmit) {
131+
firstEmit = false;
132+
if (signalVersion(source) === firstVersion) {
133+
return;
134+
}
135+
}
136+
137+
untracked(() => subscriber.next(value));
138+
},
139+
{injector, manualCleanup: true},
140+
);
141+
142+
const cleanup = (fromInjector: boolean) => {
143+
ref.destroy();
144+
145+
if (fromInjector) {
146+
// If we're cleaning up because the injector is destroyed, then our
147+
// subscription is still active and we need to complete it.
148+
subscriber.complete();
149+
} else {
150+
// Otherwise, remove the cleanup function. This both prevents the
151+
// complete() event from being produced and allows memory to be
152+
// reclaimed.
153+
removeInjectorCleanupFn();
154+
}
155+
};
156+
157+
const removeInjectorCleanupFn = injector.get(DestroyRef).onDestroy(() => {
158+
// Cleaning up from the `DestroyRef` means the stream is still active, so
159+
// we should emit completion.
160+
cleanup(true);
161+
});
162+
163+
return () => cleanup(false);
164+
});
165+
}
166+
167+
function signalVersion(source: Signal<unknown>): number {
168+
return (source[SIGNAL] as ReactiveNode).version;
169+
}

packages/core/rxjs-interop/test/to_observable_spec.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,4 +176,56 @@ describe('toObservable()', () => {
176176
flushEffects();
177177
expect(emits()).toBe(1);
178178
});
179+
180+
describe('synchronous emit', () => {
181+
it('emits synchronously when requested', () => {
182+
const counter = signal(0);
183+
const log: number[] = [];
184+
toObservable(counter, {injector, forceSyncFirstEmit: true}).subscribe((value) =>
185+
log.push(value),
186+
);
187+
188+
expect(log).toEqual([0]);
189+
190+
// Expect no emit after the effect.
191+
flushEffects();
192+
expect(log).toEqual([0]);
193+
194+
counter.set(1);
195+
flushEffects();
196+
expect(log).toEqual([0, 1]);
197+
});
198+
199+
it('emits new values if they happen before the first effect run', () => {
200+
const counter = signal(0);
201+
const log: number[] = [];
202+
toObservable(counter, {injector, forceSyncFirstEmit: true}).subscribe((value) =>
203+
log.push(value),
204+
);
205+
206+
expect(log).toEqual([0]);
207+
counter.set(1);
208+
209+
// Expect new emit after the effect.
210+
flushEffects();
211+
expect(log).toEqual([0, 1]);
212+
});
213+
214+
it('emits new values based on version, not identity', () => {
215+
// Disable value equality checking.
216+
const counter = signal(0, {equal: () => false});
217+
const log: number[] = [];
218+
toObservable(counter, {injector, forceSyncFirstEmit: true}).subscribe((value) =>
219+
log.push(value),
220+
);
221+
222+
expect(log).toEqual([0]);
223+
// Because of the `equal` function, this will be treated as a new value.
224+
counter.set(0);
225+
226+
// Expect new emit after the effect.
227+
flushEffects();
228+
expect(log).toEqual([0, 0]);
229+
});
230+
});
179231
});

0 commit comments

Comments
 (0)