-
Notifications
You must be signed in to change notification settings - Fork 7.6k
2.x: Fix cancel/dispose upon upstream switch for some operators #6258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## 2.x #6258 +/- ##
============================================
+ Coverage 98.22% 98.28% +0.05%
- Complexity 6203 6210 +7
============================================
Files 667 667
Lines 44889 44898 +9
Branches 6216 6219 +3
============================================
+ Hits 44093 44126 +33
+ Misses 254 242 -12
+ Partials 542 530 -12
Continue to review full report at Codecov.
|
|
David, can you please allow 2-3 days window for review on this one? |
|
Sure Artem. |
| child.onNext(t); | ||
| public void onNext(Object t) { | ||
| Subscription s = get(); | ||
| if (s != SubscriptionHelper.CANCELLED) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offtopic: I'm wondering if we should drop SubscriptionHelper.isCancelled()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to inline all of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR extends the
SubscriptionArbiterto optionally allow or disallow cancelling the currentSubscriptionif it is replaced by a new one. Some operators do not need to cancel the currentSubscription:concat,concatMap,repeat,repeatWhen,retryandretryWhen.In addition
repeatWhenandretryWhenwere cancelling when the handler sequence itself terminated. The code has been updated to disconnect the upstream upon the completion/failure but before signaling the handler.The Reactive Streams specification also disallows synchronous cancellation after the terminal event anyway.
Others may actually need to cancel, such as
Timeout.Observables don't have a specific arbiter, they use theDisposableHelpermethods and the relevant ones were changed toreplace()instead of the disposingsetcall.Some tests actually checking if the dispose/cancel happens and had to be updated.
The
Flowable.delaySubscription(Publisher)also usedSubscriptionArbiterbut it was unnecessary. The code has been replaced with a more apt deferred requesting scheme as the downstream requests need to be delayed until the main subscription happens, the other publisher is always consumed unbounded.Resolves: #6259