Skip to content

Commit bc2ad7b

Browse files
alxhubAndrewKushnir
authored andcommitted
feat(core): support streaming resources (#59573)
This commit adds support for creating `resource()`s with streaming response data. A streaming resource is defined by a `stream` option instead of a `loader`, with `stream` being a function returning `Promise<Signal<{value: T}|{error: unknown}>>`. Once the streaming loader resolves to a `Signal`, it can continue to update that signal over time, and the values (or errors) will be delivered to via the resource's state. `rxResource()` is updated to leverage this new functionality to handle multiple responses from the underlying Observable. PR Close #59573
1 parent 67be7d2 commit bc2ad7b

File tree

7 files changed

+302
-79
lines changed

7 files changed

+302
-79
lines changed

goldens/public-api/core/index.api.md

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ export interface AttributeDecorator {
184184
new (name: string): Attribute;
185185
}
186186

187+
// @public
188+
export interface BaseResourceOptions<T, R> {
189+
equal?: ValueEqualityFn<T>;
190+
injector?: Injector;
191+
request?: () => R;
192+
}
193+
187194
// @public
188195
export function booleanAttribute(value: unknown): boolean;
189196

@@ -1436,6 +1443,11 @@ export class PlatformRef {
14361443
// @public
14371444
export type Predicate<T> = (value: T) => boolean;
14381445

1446+
// @public
1447+
export interface PromiseResourceOptions<T, R> extends BaseResourceOptions<T, R> {
1448+
loader: ResourceLoader<T, R>;
1449+
}
1450+
14391451
// @public
14401452
export function provideAppInitializer(initializerFn: () => Observable<unknown> | Promise<unknown> | void): EnvironmentProviders;
14411453

@@ -1610,13 +1622,8 @@ export interface ResourceLoaderParams<R> {
16101622
request: Exclude<NoInfer<R>, undefined>;
16111623
}
16121624

1613-
// @public
1614-
export interface ResourceOptions<T, R> {
1615-
equal?: ValueEqualityFn<T>;
1616-
injector?: Injector;
1617-
loader: ResourceLoader<T, R>;
1618-
request?: () => R;
1619-
}
1625+
// @public (undocumented)
1626+
export type ResourceOptions<T, R> = PromiseResourceOptions<T, R> | StreamingResourceOptions<T, R>;
16201627

16211628
// @public
16221629
export interface ResourceRef<T> extends WritableResource<T> {
@@ -1633,6 +1640,13 @@ export enum ResourceStatus {
16331640
Resolved = 4
16341641
}
16351642

1643+
// @public
1644+
export type ResourceStreamingLoader<T, R> = (param: ResourceLoaderParams<R>) => PromiseLike<Signal<{
1645+
value: T;
1646+
} | {
1647+
error: unknown;
1648+
}>>;
1649+
16361650
// @public
16371651
export const RESPONSE_INIT: InjectionToken<ResponseInit | null>;
16381652

@@ -1747,6 +1761,11 @@ export interface StaticClassSansProvider {
17471761
// @public
17481762
export type StaticProvider = ValueProvider | ExistingProvider | StaticClassProvider | ConstructorProvider | FactoryProvider | any[];
17491763

1764+
// @public
1765+
export interface StreamingResourceOptions<T, R> extends BaseResourceOptions<T, R> {
1766+
stream: ResourceStreamingLoader<T, R>;
1767+
}
1768+
17501769
// @public
17511770
export abstract class TemplateRef<C> {
17521771
abstract createEmbeddedView(context: C, injector?: Injector): EmbeddedViewRef<C>;

goldens/public-api/core/rxjs-interop/index.api.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
55
```ts
66

7+
import { BaseResourceOptions } from '@angular/core';
78
import { DestroyRef } from '@angular/core';
89
import { Injector } from '@angular/core';
910
import { MonoTypeOperatorFunction } from 'rxjs';
1011
import { Observable } from 'rxjs';
1112
import { OutputOptions } from '@angular/core';
1213
import { OutputRef } from '@angular/core';
1314
import { ResourceLoaderParams } from '@angular/core';
14-
import { ResourceOptions } from '@angular/core';
1515
import { ResourceRef } from '@angular/core';
1616
import { Signal } from '@angular/core';
1717
import { Subscribable } from 'rxjs';
@@ -30,7 +30,7 @@ export function pendingUntilEvent<T>(injector?: Injector): MonoTypeOperatorFunct
3030
export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T | undefined>;
3131

3232
// @public
33-
export interface RxResourceOptions<T, R> extends Omit<ResourceOptions<T, R>, 'loader'> {
33+
export interface RxResourceOptions<T, R> extends BaseResourceOptions<T, R> {
3434
// (undocumented)
3535
loader: (params: ResourceLoaderParams<R>) => Observable<T>;
3636
}

packages/core/rxjs-interop/src/rx_resource.ts

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,21 @@
88

99
import {
1010
assertInInjectionContext,
11-
ResourceOptions,
1211
resource,
1312
ResourceLoaderParams,
1413
ResourceRef,
14+
Signal,
15+
signal,
16+
BaseResourceOptions,
1517
} from '@angular/core';
16-
import {Observable, Subject} from 'rxjs';
17-
import {take, takeUntil} from 'rxjs/operators';
18+
import {Observable, Subscription} from 'rxjs';
1819

1920
/**
2021
* Like `ResourceOptions` but uses an RxJS-based `loader`.
2122
*
2223
* @experimental
2324
*/
24-
export interface RxResourceOptions<T, R> extends Omit<ResourceOptions<T, R>, 'loader'> {
25+
export interface RxResourceOptions<T, R> extends BaseResourceOptions<T, R> {
2526
loader: (params: ResourceLoaderParams<R>) => Observable<T>;
2627
}
2728

@@ -35,22 +36,37 @@ export function rxResource<T, R>(opts: RxResourceOptions<T, R>): ResourceRef<T |
3536
opts?.injector || assertInInjectionContext(rxResource);
3637
return resource<T, R>({
3738
...opts,
38-
loader: (params) => {
39-
const cancelled = new Subject<void>();
40-
params.abortSignal.addEventListener('abort', () => cancelled.next());
41-
42-
// Note: this is identical to `firstValueFrom` which we can't use,
43-
// because at the time of writing, `core` still supports rxjs 6.x.
44-
return new Promise<T>((resolve, reject) => {
45-
opts
46-
.loader(params)
47-
.pipe(take(1), takeUntil(cancelled))
48-
.subscribe({
49-
next: resolve,
50-
error: reject,
51-
complete: () => reject(new Error('Resource completed before producing a value')),
52-
});
39+
stream: (params) => {
40+
let sub: Subscription;
41+
42+
// Track the abort listener so it can be removed if the Observable completes (as a memory
43+
// optimization).
44+
const onAbort = () => sub.unsubscribe();
45+
params.abortSignal.addEventListener('abort', onAbort);
46+
47+
// Start off stream as undefined.
48+
const stream = signal<{value: T} | {error: unknown}>({value: undefined as T});
49+
let resolve: ((value: Signal<{value: T} | {error: unknown}>) => void) | undefined;
50+
const promise = new Promise<Signal<{value: T} | {error: unknown}>>((r) => (resolve = r));
51+
52+
function send(value: {value: T} | {error: unknown}): void {
53+
stream.set(value);
54+
resolve?.(stream);
55+
resolve = undefined;
56+
}
57+
58+
sub = opts.loader(params).subscribe({
59+
next: (value) => send({value}),
60+
error: (error) => send({error}),
61+
complete: () => {
62+
if (resolve) {
63+
send({error: new Error('Resource completed before producing a value')});
64+
}
65+
params.abortSignal.removeEventListener('abort', onAbort);
66+
},
5367
});
68+
69+
return promise;
5470
},
5571
});
5672
}

packages/core/rxjs-interop/test/rx_resource_spec.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* found in the LICENSE file at https://angular.dev/license
77
*/
88

9-
import {of, Observable} from 'rxjs';
9+
import {of, Observable, BehaviorSubject} from 'rxjs';
1010
import {TestBed} from '@angular/core/testing';
1111
import {ApplicationRef, Injector, signal} from '@angular/core';
1212
import {rxResource} from '@angular/core/rxjs-interop';
@@ -55,6 +55,27 @@ describe('rxResource()', () => {
5555
await appRef.whenStable();
5656
expect(unsub).toBe(true);
5757
});
58+
59+
it('should stream when the loader returns multiple values', async () => {
60+
const injector = TestBed.inject(Injector);
61+
const appRef = TestBed.inject(ApplicationRef);
62+
const response = new BehaviorSubject(1);
63+
const res = rxResource({
64+
loader: () => response,
65+
injector,
66+
});
67+
await appRef.whenStable();
68+
expect(res.value()).toBe(1);
69+
70+
response.next(2);
71+
expect(res.value()).toBe(2);
72+
73+
response.next(3);
74+
expect(res.value()).toBe(3);
75+
76+
response.error('fail');
77+
expect(res.error()).toBe('fail');
78+
});
5879
});
5980

6081
async function waitFor(fn: () => boolean): Promise<void> {

packages/core/src/resource/api.ts

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,12 +160,21 @@ export interface ResourceLoaderParams<R> {
160160
*/
161161
export type ResourceLoader<T, R> = (param: ResourceLoaderParams<R>) => PromiseLike<T>;
162162

163+
/**
164+
* Streaming loader for a `Resource`.
165+
*
166+
* @experimental
167+
*/
168+
export type ResourceStreamingLoader<T, R> = (
169+
param: ResourceLoaderParams<R>,
170+
) => PromiseLike<Signal<{value: T} | {error: unknown}>>;
171+
163172
/**
164173
* Options to the `resource` function, for creating a resource.
165174
*
166175
* @experimental
167176
*/
168-
export interface ResourceOptions<T, R> {
177+
export interface BaseResourceOptions<T, R> {
169178
/**
170179
* A reactive function which determines the request to be made. Whenever the request changes, the
171180
* loader will be triggered to fetch a new value for the resource.
@@ -174,11 +183,6 @@ export interface ResourceOptions<T, R> {
174183
*/
175184
request?: () => R;
176185

177-
/**
178-
* Loading function which returns a `Promise` of the resource's value for a given request.
179-
*/
180-
loader: ResourceLoader<T, R>;
181-
182186
/**
183187
* Equality function used to compare the return value of the loader.
184188
*/
@@ -189,3 +193,33 @@ export interface ResourceOptions<T, R> {
189193
*/
190194
injector?: Injector;
191195
}
196+
197+
/**
198+
* Options to the `resource` function, for creating a resource.
199+
*
200+
* @experimental
201+
*/
202+
export interface PromiseResourceOptions<T, R> extends BaseResourceOptions<T, R> {
203+
/**
204+
* Loading function which returns a `Promise` of the resource's value for a given request.
205+
*/
206+
loader: ResourceLoader<T, R>;
207+
}
208+
209+
/**
210+
* Options to the `resource` function, for creating a resource.
211+
*
212+
* @experimental
213+
*/
214+
export interface StreamingResourceOptions<T, R> extends BaseResourceOptions<T, R> {
215+
/**
216+
* Loading function which returns a `Promise` of a signal of the resource's value for a given
217+
* request, which can change over time as new values are received from a stream.
218+
*/
219+
stream: ResourceStreamingLoader<T, R>;
220+
}
221+
222+
/**
223+
* @experimental
224+
*/
225+
export type ResourceOptions<T, R> = PromiseResourceOptions<T, R> | StreamingResourceOptions<T, R>;

0 commit comments

Comments
 (0)