Reworking Multicasting: share, connect, and makeConnectable#5634
Reworking Multicasting: share, connect, and makeConnectable#5634benlesh merged 8 commits intoReactiveX:masterfrom
Conversation
|
Related: #3833 |
90a9b23 to
a4c11a3
Compare
|
Rebased |
|
Core team to review... revisit next meeting. |
|
This is blocked on #5729 |
There was a problem hiding this comment.
LGTM. Just some questions, a bunch of nitpicks and some names of which I am not a fan. 😅
Also, after this is rebased, the skipped firehose tests should be unskipped and made to pass. A contributor opened a PR to address the skipped tests, but I blocked the PR, as the changes were based on the old multicast infrastructure - see #5834
src/internal/operators/connect.ts
Outdated
| return lift(source, function (this: Subscriber<R>, source: Observable<T>) { | ||
| const subscriber = this; | ||
| let subject: Subject<T>; | ||
| try { | ||
| subject = connector(); | ||
| } catch (err) { | ||
| subscriber.error(err); | ||
| return; | ||
| } | ||
|
|
||
| let result: Observable<R>; | ||
| try { | ||
| result = from(setup(subject.asObservable())); | ||
| } catch (err) { | ||
| subscriber.error(err); | ||
| return; | ||
| } | ||
|
|
||
| const subscription = result.subscribe(subscriber); | ||
| subscription.add(source.subscribe(subject)); | ||
| return subscription; | ||
| }); |
There was a problem hiding this comment.
nitpick: This PR predates the smallification, so it should be updated to use operate. And the source.subscribe(subject) subscription should be added to the subscriber, etc.
src/internal/operators/connect.ts
Outdated
| setup, | ||
| }: { | ||
| connector?: () => Subject<T>; | ||
| setup: (shared: Observable<T>) => ObservableInput<R>; |
There was a problem hiding this comment.
nitpick: I'm not a fan of the setup property's name. TBH, I preferred selector. My main beef with setup is that, IMO, it doesn't really explain what the parameter does. Elsewhere in the API functions that are used in this manner are called selectors and I am a fan of consistency and precedent.
There was a problem hiding this comment.
selector, IMO, also doesn't explain what the parameter does, either. I mean, this sets up the multicast as much as it selects the output. Happy to bikeshed the name, but overall, I guess I'm just not sure what to name this. project is used elsewhere to mean a synchronous map-type function, and I'm not thrilled with its use in things like mergeMap.
There was a problem hiding this comment.
Maybe cast? or multicast? or??
There was a problem hiding this comment.
(I'm open to moving to back to the selector, though). I just don't like that it doesn't completely imply what's going on here. catchError is a little different, because it's only called when an error happens, and you have to "select" what to observe after that error shuts things down.
This function is required to setup the multicast. It also selects what sort of values are emitted from the result. I guess I'm now unhappy with either setup or select haha. But I do know I want it to be terse.
There was a problem hiding this comment.
This function is required to setup the multicast. It also selects what sort of values are emitted from the result.
Well, the function is "required to setup" the multicast, but it doesn't actually set it up. It receives the shared/multicast observable and then "selects what sort of values are emitted from the result".
That said, cast or multicast are okay, I suppose, but my preference is still for selector.
|
|
||
| result.connect = function () { | ||
| if (!connection) { | ||
| connection = defer(() => source).subscribe(connector); |
There was a problem hiding this comment.
question: Is there a reason for using defer here instead of from? source is a parameter and is never reassigned, so I don't see why this needs to be lazy.
| * @returns A "connectable" observable, that has a `connect()` method, that you must call to | ||
| * connect the source to all consumers through the subject provided as the connector. | ||
| */ | ||
| export function makeConnectable<T>(source: ObservableInput<T>, connector: Subject<T> = new Subject<T>()): ConnectableObservableLike<T> { |
There was a problem hiding this comment.
nitpick: Not a fan of the makeConnectable name. IMO, this seems odd, when looking at the other parts of the API. I mean, it's timer, not makeTimer or createTimer. My preference would be for it to be named connectable.
src/internal/operators/share.ts
Outdated
|
|
||
| function shareSubjectFactory() { | ||
| return new Subject<any>(); | ||
| interface ShareOptions<T, R> { |
There was a problem hiding this comment.
(very minor) nitpick: In other parts of the API, these types/parameters are referred to as 'config'. I think it should be one or the other and should be consistent throughout the API, but - again - this is very minor.
src/internal/operators/share.ts
Outdated
| hasCompleted = hasErrored = false; | ||
| }; | ||
|
|
||
| return lift(source, function (this: Subscriber<T | R>, source: Observable<T>) { |
There was a problem hiding this comment.
nitpick: With the changes made in the smallification, this should now used operate.
src/internal/operators/share.ts
Outdated
| subject = connector!(); | ||
| } | ||
|
|
||
| const castSubscription = subject.subscribe(subscriber); |
There was a problem hiding this comment.
nitpick: This seems like a weird variable name, IMO. Where is the cast?
src/internal/operators/share.ts
Outdated
| let { connector, resetOnComplete = true, resetOnError = true, resetOnUnsubscribe = true } = options; | ||
| if (!connector) { | ||
| connector = () => new Subject<T>(); | ||
| } |
There was a problem hiding this comment.
isnt this more consistent?
const {
connector = () => new Subject<T>(),
resetOnComplete = true,
resetOnError = true,
resetOnUnsubscribe = true,
} = options || {};
also shouldnt this be moved into the share functions block insteadof the operator function block? maybe even:
export function share<T, R>({
connector = () => new Subject<T>(),
resetOnComplete = true,
resetOnError = true,
resetOnUnsubscribe = true,
}: ShareOptions<T, R> = {}): OperatorFunction<T, T | R> {
// ...
}
also the type param R doesnt seem to be used really?
src/internal/operators/share.ts
Outdated
| * If false, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject | ||
| * will remain connected to the source, and new subscriptions to the result will be connected through that same subject. | ||
| */ | ||
| resetOnUnsubscribe?: boolean; |
There was a problem hiding this comment.
i was under the assumption that a complete/error will always result in an implicit unsubscribe afterwards. its not reflected in this description that an unsubscribe that follows a complete/error, will not result in a reset.
There was a problem hiding this comment.
This means that the next time the resulting observable is subscribed to, a new subject will be created and the source will be subscribed to again.
This should also be added to the resetOnError/resetOnComplete descriptions. Maybe its worth to describe a reset independantly of these 3 configs in the share operator itself, so that the description of the reset params can be reduced to actual differences between them and therefore causing less mental friction.
src/internal/operators/share.ts
Outdated
|
|
||
| function shareSubjectFactory() { | ||
| return new Subject<any>(); | ||
| interface ShareOptions<T, R> { |
There was a problem hiding this comment.
is it worth to add a resetNotifier$ in order to have more fine grained control over when resets happen?
There was a problem hiding this comment.
You could technically do that by composing takeUntil
source$.pipe(
takeUntil(resetNotifier$),
share(),
repeat(),
)
just throwing in other alternatives: |
a4c11a3 to
b43d28f
Compare
Adds a feature that allows two observables to be tested for equality of output.
…Like`. BREAKING CHANGE: The TypeScript type `Subscribable` now only supports what is a valid return for `[Symbol.observable]()`. BREAKING CHANGE: The TypeScript type `Observer` no longer incorrectly has an optional `closed` property.
b43d28f to
d760a6c
Compare
|
Okay, I've made a few changes to this. Per @cartant's request, OTHER VERY IMPORTANT THINGS:
TODO
|
cartant
left a comment
There was a problem hiding this comment.
Some things that might or might not be issues and some nitpicks and comments/thoughts, etc. No major issues, AFAICT.
src/internal/operators/connect.ts
Outdated
| setup, | ||
| }: { | ||
| connector?: () => Subject<T>; | ||
| setup: (shared: Observable<T>) => ObservableInput<R>; |
There was a problem hiding this comment.
This function is required to setup the multicast. It also selects what sort of values are emitted from the result.
Well, the function is "required to setup" the multicast, but it doesn't actually set it up. It receives the shared/multicast observable and then "selects what sort of values are emitted from the result".
That said, cast or multicast are okay, I suppose, but my preference is still for selector.
|
Requesting rereview. Note that the change to make |
|
🎉 |
I'd like to do this instead of what I was doing over at #5432 ..
Overview
shareconfigurable, such that it can be used to create things likeshareReplayor people can configure the behavior however they like. If they want ashareBehaviororshareLast, they can easily make it work however they like.connectoperator that does the "selector" version ofmulticast,publish, et al. (this also fixes a broken error behavior inpublishReplay).makeConnectablefunction that creates aConnectableObservableLike(instead ofConnectableObservable).What this sets us up for:
We'd have
share(NOTE: I'll leaveshareReplayas just a wrapper around this, as seen in this PR)connectconnectableInstead of
sharemulticastpublishpublishLastpublishReplaypublishBehaviorrefCountshareReplayConnectableObservableAll "operators" at that point would actually only return
Observableand neverConnectableObservable, nor would any requireConnectableObservableas a source (such as withrefCount).TODO:
Other thoughts
It's possible that a better API for
makeConnectablemight be to return a tuple with aconnectfunction. Like so:How this "replaces" certain things
Well, the I find the prevalence of
publishReplay(1), refCount()very disturbing. For one thing, I doubt many folks using this are aware of what this does completely. It's not retryable, it's not replayable, etc."But Ben, that's more code!"... yes, and it's explicit code that tells you exactly what you're doing there. That's not retryable, it's not repeatable... it will never reset internally. It also allows the author more control without having to know implementation details of
publishReplay. Needing to know implementation details of our multicasting operators has been a sore point of this library, IMO.NO WORRIES:
a$.pipe(share())will work exactly as it does now. (Notice those tests didn't change)