Skip to content

Commit f5970bf

Browse files
mpodlasincartant
authored andcommitted
fix(mergeAll): add source subscription to composite before actually subscribing
Add subscriptions for source Observables to mergeAll composite subscription before actually subscribing to any of these Observables, so that if source Observable emits synchronously and consumer of mergeAll unsubscribes at that moment (for example `take` operator), subscription to source is unsubscribed as well and Observable stops emitting. Closes #2476
1 parent f9318d8 commit f5970bf

3 files changed

Lines changed: 52 additions & 13 deletions

File tree

spec/operators/mergeAll-spec.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { expect } from 'chai';
2-
import { mergeAll, mergeMap } from 'rxjs/operators';
2+
import { mergeAll, mergeMap, take } from 'rxjs/operators';
33
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
44
import { throwError, from, of, Observable } from 'rxjs';
55

@@ -413,6 +413,36 @@ describe('mergeAll oeprator', () => {
413413
() => { done(new Error('should not be called')); });
414414
});
415415

416+
it('should finalize generators when merged if the subscription ends', () => {
417+
const iterable = {
418+
finalized: false,
419+
next() {
420+
return {value: 'duck', done: false};
421+
},
422+
return() {
423+
this.finalized = true;
424+
},
425+
[Symbol.iterator]() {
426+
return this;
427+
}
428+
};
429+
430+
const results: string[] = [];
431+
432+
const iterableObservable = from<string>(iterable as any);
433+
of(iterableObservable).pipe(
434+
mergeAll(),
435+
take(3)
436+
).subscribe(
437+
x => results.push(x),
438+
null,
439+
() => results.push('GOOSE!')
440+
);
441+
442+
expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']);
443+
expect(iterable.finalized).to.be.true;
444+
});
445+
416446
type(() => {
417447
/* tslint:disable:no-unused-variable */
418448
const source1 = of(1, 2, 3);

src/internal/operators/mergeMap.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ export class MergeMapSubscriber<T, R> extends OuterSubscriber<T, R> {
139139
}
140140

141141
private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
142-
this.add(subscribeToResult<T, R>(this, ish, value, index));
142+
const innerSubscriber = new InnerSubscriber(this, undefined, undefined);
143+
this.add(innerSubscriber);
144+
subscribeToResult<T, R>(this, ish, value, index, innerSubscriber);
143145
}
144146

145147
protected _complete(): void {
Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
1-
21
import { ObservableInput } from '../types';
32
import { Subscription } from '../Subscription';
43
import { InnerSubscriber } from '../InnerSubscriber';
54
import { OuterSubscriber } from '../OuterSubscriber';
5+
import { Subscriber } from '../Subscriber';
66
import { subscribeTo } from './subscribeTo';
77

8-
export function subscribeToResult<T, R>(outerSubscriber: OuterSubscriber<T, R>,
9-
result: any,
10-
outerValue?: T,
11-
outerIndex?: number): Subscription;
12-
export function subscribeToResult<T>(outerSubscriber: OuterSubscriber<any, any>,
13-
result: ObservableInput<T>,
14-
outerValue?: T,
15-
outerIndex?: number): Subscription | void {
16-
const destination = new InnerSubscriber(outerSubscriber, outerValue, outerIndex);
17-
8+
export function subscribeToResult<T, R>(
9+
outerSubscriber: OuterSubscriber<T, R>,
10+
result: any,
11+
outerValue?: T,
12+
outerIndex?: number,
13+
destination?: Subscriber<any>
14+
): Subscription;
15+
export function subscribeToResult<T, R>(
16+
outerSubscriber: OuterSubscriber<T, R>,
17+
result: any,
18+
outerValue?: T,
19+
outerIndex?: number,
20+
destination: Subscriber<any> = new InnerSubscriber(outerSubscriber, outerValue, outerIndex)
21+
): Subscription | void {
22+
if (destination.closed) {
23+
return;
24+
}
1825
return subscribeTo(result)(destination);
1926
}

0 commit comments

Comments
 (0)