Skip to content

Commit f1048a8

Browse files
author
Spencer
authored
[core/utils] add shareWeakReplay() operator (#23333) (#23370)
1 parent 61b67e3 commit f1048a8

3 files changed

Lines changed: 310 additions & 0 deletions

File tree

src/core/public/utils/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,3 +18,4 @@
1818
*/
1919

2020
export { modifyUrl } from './modify_url';
21+
export { shareWeakReplay } from './share_weak_replay';
Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
import * as Rx from 'rxjs';
21+
import { map, materialize, take, toArray } from 'rxjs/operators';
22+
23+
import { shareWeakReplay } from './share_weak_replay';
24+
25+
let completedCounts = 0;
26+
27+
function counter({ async = true }: { async?: boolean } = {}) {
28+
let subCounter = 0;
29+
30+
function sendCount(subscriber: Rx.Subscriber<string>) {
31+
let notifCounter = 0;
32+
const sub = ++subCounter;
33+
34+
while (!subscriber.closed) {
35+
subscriber.next(`${sub}:${++notifCounter}`);
36+
}
37+
38+
completedCounts += 1;
39+
}
40+
41+
return new Rx.Observable<string>(subscriber => {
42+
if (!async) {
43+
sendCount(subscriber);
44+
return;
45+
}
46+
47+
const id = setTimeout(() => sendCount(subscriber));
48+
return () => clearTimeout(id);
49+
});
50+
}
51+
52+
async function record<T>(observable: Rx.Observable<T>) {
53+
return observable
54+
.pipe(
55+
materialize(),
56+
map(n => (n.kind === 'N' ? `N:${n.value}` : n.kind === 'E' ? `E:${n.error.message}` : 'C')),
57+
toArray()
58+
)
59+
.toPromise();
60+
}
61+
62+
afterEach(() => {
63+
completedCounts = 0;
64+
});
65+
66+
it('multicasts an observable to multiple children, unsubs once all children do, and resubscribes on next subscription', async () => {
67+
const shared = counter().pipe(shareWeakReplay(1));
68+
69+
await expect(Promise.all([record(shared.pipe(take(1))), record(shared.pipe(take(2)))])).resolves
70+
.toMatchInlineSnapshot(`
71+
Array [
72+
Array [
73+
"N:1:1",
74+
"C",
75+
],
76+
Array [
77+
"N:1:1",
78+
"N:1:2",
79+
"C",
80+
],
81+
]
82+
`);
83+
84+
await expect(Promise.all([record(shared.pipe(take(3))), record(shared.pipe(take(4)))])).resolves
85+
.toMatchInlineSnapshot(`
86+
Array [
87+
Array [
88+
"N:2:1",
89+
"N:2:2",
90+
"N:2:3",
91+
"C",
92+
],
93+
Array [
94+
"N:2:1",
95+
"N:2:2",
96+
"N:2:3",
97+
"N:2:4",
98+
"C",
99+
],
100+
]
101+
`);
102+
103+
expect(completedCounts).toBe(2);
104+
});
105+
106+
it('resubscribes if parent errors', async () => {
107+
let errorCounter = 0;
108+
const shared = counter().pipe(
109+
map((v, i) => {
110+
if (i === 3) {
111+
throw new Error(`error ${++errorCounter}`);
112+
}
113+
return v;
114+
}),
115+
shareWeakReplay(2)
116+
);
117+
118+
await expect(Promise.all([record(shared), record(shared)])).resolves.toMatchInlineSnapshot(`
119+
Array [
120+
Array [
121+
"N:1:1",
122+
"N:1:2",
123+
"N:1:3",
124+
"E:error 1",
125+
],
126+
Array [
127+
"N:1:1",
128+
"N:1:2",
129+
"N:1:3",
130+
"E:error 1",
131+
],
132+
]
133+
`);
134+
135+
await expect(Promise.all([record(shared), record(shared)])).resolves.toMatchInlineSnapshot(`
136+
Array [
137+
Array [
138+
"N:2:1",
139+
"N:2:2",
140+
"N:2:3",
141+
"E:error 2",
142+
],
143+
Array [
144+
"N:2:1",
145+
"N:2:2",
146+
"N:2:3",
147+
"E:error 2",
148+
],
149+
]
150+
`);
151+
152+
expect(completedCounts).toBe(2);
153+
});
154+
155+
it('resubscribes if parent completes', async () => {
156+
const shared = counter().pipe(
157+
take(4),
158+
shareWeakReplay(4)
159+
);
160+
161+
await expect(Promise.all([record(shared.pipe(take(1))), record(shared)])).resolves
162+
.toMatchInlineSnapshot(`
163+
Array [
164+
Array [
165+
"N:1:1",
166+
"C",
167+
],
168+
Array [
169+
"N:1:1",
170+
"N:1:2",
171+
"N:1:3",
172+
"N:1:4",
173+
"C",
174+
],
175+
]
176+
`);
177+
178+
await expect(Promise.all([record(shared.pipe(take(2))), record(shared)])).resolves
179+
.toMatchInlineSnapshot(`
180+
Array [
181+
Array [
182+
"N:2:1",
183+
"N:2:2",
184+
"C",
185+
],
186+
Array [
187+
"N:2:1",
188+
"N:2:2",
189+
"N:2:3",
190+
"N:2:4",
191+
"C",
192+
],
193+
]
194+
`);
195+
196+
expect(completedCounts).toBe(2);
197+
});
198+
199+
it('supports parents that complete synchronously', async () => {
200+
const next = jest.fn();
201+
const complete = jest.fn();
202+
const shared = counter({ async: false }).pipe(
203+
take(3),
204+
shareWeakReplay(1)
205+
);
206+
207+
shared.subscribe({ next, complete });
208+
expect(next.mock.calls).toMatchInlineSnapshot(`
209+
Array [
210+
Array [
211+
"1:1",
212+
],
213+
Array [
214+
"1:2",
215+
],
216+
Array [
217+
"1:3",
218+
],
219+
]
220+
`);
221+
expect(complete).toHaveBeenCalledTimes(1);
222+
223+
next.mockClear();
224+
complete.mockClear();
225+
226+
shared.subscribe({ next, complete });
227+
expect(next.mock.calls).toMatchInlineSnapshot(`
228+
Array [
229+
Array [
230+
"2:1",
231+
],
232+
Array [
233+
"2:2",
234+
],
235+
Array [
236+
"2:3",
237+
],
238+
]
239+
`);
240+
expect(complete).toHaveBeenCalledTimes(1);
241+
242+
expect(completedCounts).toBe(2);
243+
});
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
import * as Rx from 'rxjs';
21+
import { takeUntil } from 'rxjs/operators';
22+
23+
/**
24+
* Just like the [`shareReplay()`](https://rxjs-dev.firebaseapp.com/api/operators/shareReplay) operator from
25+
* RxJS except for a few key differences:
26+
*
27+
* - If all downstream subscribers unsubscribe the source subscription will be unsubscribed.
28+
*
29+
* - Replay-ability is only maintained while the source is active, if it completes or errors
30+
* then complete/error is sent to the current subscribers and the replay buffer is cleared.
31+
*
32+
* - Any subscription after the the source completes or errors will create a new subscription
33+
* to the source observable.
34+
*
35+
* @param bufferSize Optional, default is `Number.POSITIVE_INFINITY`
36+
*/
37+
export function shareWeakReplay<T>(bufferSize?: number): Rx.MonoTypeOperatorFunction<T> {
38+
return (source: Rx.Observable<T>) => {
39+
let subject: Rx.ReplaySubject<T> | undefined;
40+
const stop$ = new Rx.Subject();
41+
42+
return new Rx.Observable(observer => {
43+
if (!subject) {
44+
subject = new Rx.ReplaySubject<T>(bufferSize);
45+
}
46+
47+
subject.subscribe(observer).add(() => {
48+
if (!subject) {
49+
return;
50+
}
51+
52+
if (subject.observers.length === 0) {
53+
stop$.next();
54+
}
55+
56+
if (subject.closed || subject.isStopped) {
57+
subject = undefined;
58+
}
59+
});
60+
61+
if (subject && subject.observers.length === 1) {
62+
source.pipe(takeUntil(stop$)).subscribe(subject);
63+
}
64+
});
65+
};
66+
}

0 commit comments

Comments
 (0)