Skip to content

Commit 8997bdc

Browse files
alxhubatscott
authored andcommitted
feat(core): prototype implementation of @angular/core/rxjs-interop (#49154)
This commit adds the basic sketch for the implementation of `fromObservable` and `fromSignal`, the two basic primitives which form the RxJS interop layer with signals. PR Close #49154
1 parent 8b7707d commit 8997bdc

File tree

9 files changed

+479
-1
lines changed

9 files changed

+479
-1
lines changed

goldens/public-api/core/rxjs-interop/index.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,24 @@
44
55
```ts
66

7+
import { Injector } from '@angular/core';
8+
import { Observable } from 'rxjs';
9+
import { Signal } from '@angular/core';
10+
11+
// @public
12+
export function fromObservable<T>(source: Observable<T>): Signal<T>;
13+
14+
// @public
15+
export function fromObservable<T, U extends T | null | undefined>(source: Observable<T>, initialValue: U): Signal<T | U>;
16+
17+
// @public
18+
export function fromSignal<T>(source: Signal<T>, options?: FromSignalOptions): Observable<T>;
19+
20+
// @public
21+
export interface FromSignalOptions {
22+
injector?: Injector;
23+
}
24+
725
// (No @packageDocumentation comment for this package)
826

927
```
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* @license
3+
* Copyright Google LLC All Rights Reserved.
4+
*
5+
* Use of this source code is governed by an MIT-style license that can be
6+
* found in the LICENSE file at https://angular.io/license
7+
*/
8+
9+
import {assertInInjectionContext, computed, DestroyRef, inject, signal, Signal, WritableSignal} from '@angular/core';
10+
import {Observable} from 'rxjs';
11+
12+
/**
13+
* Get the current value of an `Observable` as a reactive `Signal`.
14+
*
15+
* `fromObservable` returns a `Signal` which provides synchronous reactive access to values produced
16+
* by the given `Observable`, by subscribing to that `Observable`. The returned `Signal` will always
17+
* have the most recent value emitted by the subscription, and will throw an error if the
18+
* `Observable` errors.
19+
*
20+
* The subscription will last for the lifetime of the current injection context. That is, if
21+
* `fromObservable` is called from a component context, the subscription will be cleaned up when the
22+
* component is destroyed. When called outside of a component, the current `EnvironmentInjector`'s
23+
* lifetime will be used (which is typically the lifetime of the application itself).
24+
*
25+
* If the `Observable` does not produce a value before the `Signal` is read, the `Signal` will throw
26+
* an error. To avoid this, use a synchronous `Observable` (potentially created with the `startWith`
27+
* operator) or pass an initial value to `fromObservable` as the second argument.
28+
*
29+
* `fromObservable` must be called in an injection context.
30+
*/
31+
export function fromObservable<T>(source: Observable<T>): Signal<T>;
32+
33+
/**
34+
* Get the current value of an `Observable` as a reactive `Signal`.
35+
*
36+
* `fromObservable` returns a `Signal` which provides synchronous reactive access to values produced
37+
* by the given `Observable`, by subscribing to that `Observable`. The returned `Signal` will always
38+
* have the most recent value emitted by the subscription, and will throw an error if the
39+
* `Observable` errors.
40+
*
41+
* The subscription will last for the lifetime of the current injection context. That is, if
42+
* `fromObservable` is called from a component context, the subscription will be cleaned up when the
43+
* component is destroyed. When called outside of a component, the current `EnvironmentInjector`'s
44+
* lifetime will be used (which is typically the lifetime of the application itself).
45+
*
46+
* Before the `Observable` emits its first value, the `Signal` will return the configured
47+
* `initialValue`. If the `Observable` is known to produce a value before the `Signal` will be read,
48+
* `initialValue` does not need to be passed.
49+
*
50+
* `fromObservable` must be called in an injection context.
51+
*
52+
* @developerPreview
53+
*/
54+
export function fromObservable<T, U extends T|null|undefined>(
55+
// fromObservable(Observable<Animal>) -> Signal<Cat>
56+
source: Observable<T>, initialValue: U): Signal<T|U>;
57+
export function fromObservable<T, U = never>(source: Observable<T>, initialValue?: U): Signal<T|U> {
58+
assertInInjectionContext(fromObservable);
59+
60+
// Note: T is the Observable value type, and U is the initial value type. They don't have to be
61+
// the same - the returned signal gives values of type `T`.
62+
let state: WritableSignal<State<T|U>>;
63+
if (initialValue === undefined && arguments.length !== 2) {
64+
// No initial value was passed, so initially the signal is in a `NoValue` state and will throw
65+
// if accessed.
66+
state = signal({kind: StateKind.NoValue});
67+
} else {
68+
// An initial value was passed, so use it.
69+
state = signal<State<T|U>>({kind: StateKind.Value, value: initialValue!});
70+
}
71+
72+
const sub = source.subscribe({
73+
next: value => state.set({kind: StateKind.Value, value}),
74+
error: error => state.set({kind: StateKind.Error, error}),
75+
// Completion of the Observable is meaningless to the signal. Signals don't have a concept of
76+
// "complete".
77+
});
78+
79+
// Unsubscribe when the current context is destroyed.
80+
inject(DestroyRef).onDestroy(sub.unsubscribe.bind(sub));
81+
82+
// The actual returned signal is a `computed` of the `State` signal, which maps the various states
83+
// to either values or errors.
84+
return computed(() => {
85+
const current = state();
86+
switch (current.kind) {
87+
case StateKind.Value:
88+
return current.value;
89+
case StateKind.Error:
90+
throw current.error;
91+
case StateKind.NoValue:
92+
// TODO(alxhub): use a RuntimeError when we finalize the error semantics
93+
throw new Error(`fromObservable() signal read before the Observable emitted`);
94+
}
95+
});
96+
}
97+
98+
const enum StateKind {
99+
NoValue,
100+
Value,
101+
Error,
102+
}
103+
104+
interface NoValueState {
105+
kind: StateKind.NoValue;
106+
}
107+
108+
interface ValueState<T> {
109+
kind: StateKind.Value;
110+
value: T;
111+
}
112+
113+
interface ErrorState {
114+
kind: StateKind.Error;
115+
error: unknown;
116+
}
117+
118+
type State<T> = NoValueState|ValueState<T>|ErrorState;
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* @license
3+
* Copyright Google LLC All Rights Reserved.
4+
*
5+
* Use of this source code is governed by an MIT-style license that can be
6+
* found in the LICENSE file at https://angular.io/license
7+
*/
8+
9+
import {assertInInjectionContext, effect, inject, Injector, Signal} from '@angular/core';
10+
import {Observable} from 'rxjs';
11+
12+
/**
13+
* Options for `fromSignal`.
14+
*
15+
* @developerPreview
16+
*/
17+
export interface FromSignalOptions {
18+
/**
19+
* The `Injector` to use when creating the effect.
20+
*
21+
* If this isn't specified, the current injection context will be used.
22+
*/
23+
injector?: Injector;
24+
}
25+
26+
/**
27+
* Exposes the value of an Angular `Signal` as an RxJS `Observable`.
28+
*
29+
* The signal's value will be propagated into the `Observable`'s subscribers using an `effect`.
30+
*
31+
* `fromSignal` must be called in an injection context.
32+
*
33+
* @developerPreview
34+
*/
35+
export function fromSignal<T>(
36+
source: Signal<T>,
37+
options?: FromSignalOptions,
38+
): Observable<T> {
39+
!options?.injector && assertInInjectionContext(fromSignal);
40+
const injector = options?.injector ?? inject(Injector);
41+
42+
// Creating a new `Observable` allows the creation of the effect to be lazy. This allows for all
43+
// references to `source` to be dropped if the `Observable` is fully unsubscribed and thrown away.
44+
return new Observable(observer => {
45+
const watcher = effect(() => {
46+
try {
47+
observer.next(source());
48+
} catch (err) {
49+
observer.error(err);
50+
}
51+
}, {injector, manualCleanup: true});
52+
return () => watcher.destroy();
53+
});
54+
}

packages/core/rxjs-interop/src/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@
66
* found in the LICENSE file at https://angular.io/license
77
*/
88

9-
export {};
9+
export {fromObservable} from './from_observable';
10+
export {fromSignal, FromSignalOptions} from './from_signal';
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
load("//tools:defaults.bzl", "jasmine_node_test", "karma_web_test_suite", "ts_library")
2+
load("//tools/circular_dependency_test:index.bzl", "circular_dependency_test")
3+
4+
circular_dependency_test(
5+
name = "circular_deps_test",
6+
entry_point = "angular/packages/core/rxjs-interop/index.mjs",
7+
deps = ["//packages/core/rxjs-interop"],
8+
)
9+
10+
ts_library(
11+
name = "test_lib",
12+
testonly = True,
13+
srcs = glob(["**/*.ts"]),
14+
deps = [
15+
"//packages:types",
16+
"//packages/core",
17+
"//packages/core/rxjs-interop",
18+
"//packages/core/src/signals",
19+
"//packages/core/testing",
20+
"//packages/private/testing",
21+
"@npm//rxjs",
22+
],
23+
)
24+
25+
jasmine_node_test(
26+
name = "test",
27+
bootstrap = ["//tools/testing:node"],
28+
deps = [
29+
":test_lib",
30+
],
31+
)
32+
33+
karma_web_test_suite(
34+
name = "test_web",
35+
deps = [
36+
":test_lib",
37+
],
38+
)
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* @license
3+
* Copyright Google LLC All Rights Reserved.
4+
*
5+
* Use of this source code is governed by an MIT-style license that can be
6+
* found in the LICENSE file at https://angular.io/license
7+
*/
8+
9+
import {EnvironmentInjector, Injector, runInInjectionContext} from '@angular/core';
10+
import {fromObservable} from '@angular/core/rxjs-interop';
11+
import {BehaviorSubject, Subject} from 'rxjs';
12+
13+
import {test} from './util';
14+
15+
describe('fromObservable()', () => {
16+
it('should reflect the last emitted value of an Observable', test(() => {
17+
const counter$ = new BehaviorSubject(0);
18+
const counter = fromObservable(counter$);
19+
20+
expect(counter()).toBe(0);
21+
counter$.next(1);
22+
expect(counter()).toBe(1);
23+
counter$.next(3);
24+
expect(counter()).toBe(3);
25+
}));
26+
27+
it('should notify when the last emitted value of an Observable changes', test(() => {
28+
let seenValue: number = 0;
29+
const counter$ = new BehaviorSubject(1);
30+
const counter = fromObservable(counter$);
31+
32+
expect(counter()).toBe(1);
33+
34+
counter$.next(2);
35+
expect(counter()).toBe(2);
36+
}));
37+
38+
it('should propagate an error returned by the Observable', test(() => {
39+
const counter$ = new BehaviorSubject(1);
40+
const counter = fromObservable(counter$);
41+
42+
expect(counter()).toBe(1);
43+
44+
counter$.error('fail');
45+
expect(counter).toThrow('fail');
46+
}));
47+
48+
it('should unsubscribe when the current context is destroyed', test(() => {
49+
const counter$ = new BehaviorSubject(0);
50+
const injector = Injector.create({providers: []}) as EnvironmentInjector;
51+
const counter = runInInjectionContext(injector, () => fromObservable(counter$));
52+
53+
expect(counter()).toBe(0);
54+
counter$.next(1);
55+
expect(counter()).toBe(1);
56+
57+
// Destroying the injector should unsubscribe the Observable.
58+
injector.destroy();
59+
60+
// The signal should have the last value observed.
61+
expect(counter()).toBe(1);
62+
63+
// And this value should no longer be updating (unsubscribed).
64+
counter$.next(2);
65+
expect(counter()).toBe(1);
66+
}));
67+
68+
describe('with no initial value', () => {
69+
it('should throw if called before a value is emitted', test(() => {
70+
const counter$ = new Subject<number>();
71+
const counter = fromObservable(counter$);
72+
73+
expect(() => counter()).toThrow();
74+
counter$.next(1);
75+
expect(counter()).toBe(1);
76+
}));
77+
78+
it('should not throw if a value is emitted before called', test(() => {
79+
const counter$ = new Subject<number>();
80+
const counter = fromObservable(counter$);
81+
82+
counter$.next(1);
83+
expect(() => counter()).not.toThrow();
84+
}));
85+
});
86+
87+
describe('with an initial value', () => {
88+
it('should return the initial value if called before a value is emitted', test(() => {
89+
const counter$ = new Subject<number>();
90+
const counter = fromObservable(counter$, null);
91+
92+
expect(counter()).toBeNull();
93+
counter$.next(1);
94+
expect(counter()).toBe(1);
95+
}));
96+
97+
it('should not return the initial value if called after a value is emitted', test(() => {
98+
const counter$ = new Subject<number>();
99+
const counter = fromObservable(counter$, null);
100+
101+
counter$.next(1);
102+
expect(counter()).not.toBeNull();
103+
}));
104+
});
105+
});

0 commit comments

Comments
 (0)