[data] Unify bundle queue abstraction#59093
[data] Unify bundle queue abstraction#59093iamjustinhsu wants to merge 58 commits intoray-project:masterfrom
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a BaseBundleQueue abstraction to unify the various queue implementations within Ray Data operators. This is a significant and valuable refactoring that simplifies operator design and centralizes queue metrics. The overall approach is solid, and the new queue implementations in fifo.py, hash_link.py, and ordered.py are well-designed.
However, I've found several issues with the implementation that need to be addressed:
- There are a few critical correctness bugs, including a missing import in
bundle_queue/__init__.pyand incorrect buffer handling inOutputSplitter. - The rebundling queues (
RebundleQueueandExactRebundleQueue) have issues: one violates thepeek_nextcontract by having side effects, and the other has a metric leak inget_next_with_original. - The transition to queue-based metrics is incomplete in
MapOperator, leading to redundant and confusing metric tracking. - There are also some minor issues with incorrect type hints and misleading docstrings.
I've left detailed comments on each of these points. Addressing them will ensure the new abstraction is robust and correctly implemented.
python/ray/data/_internal/execution/bundle_queue/bundler_exact.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/output_splitter.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/bundle_queue/bundler_exact.py
Outdated
Show resolved
Hide resolved
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
python/ray/data/_internal/execution/bundle_queue/bundler_exact.py
Outdated
Show resolved
Hide resolved
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
omatthew98
left a comment
There was a problem hiding this comment.
Mostly left comments surrounding the high level method names / APIs. I think solid start but some opportunities for tightening the naming up since we are already doing a big shake up. LMK if you need another review.
| @@ -1,75 +0,0 @@ | |||
| import abc | |||
There was a problem hiding this comment.
Is there a reason to fully scrap BundleQueue in favor of BaseBundleQueue?
There was a problem hiding this comment.
Oh, it's mostly a rename into the base.py (i'm gonna make clear what is renamed and what is not after this comment), the only difference is that the base also contains a method called done_adding_bundles. This method is used for BlockRefBundler, StreamingRepartitionRefBunlder, and OrderedOutputQueue
| """Protocol for storing bundles AND supporting remove(bundle) | ||
| and contains(bundle) operations quickly.""" | ||
|
|
||
| def __contains__(self, bundle: RefBundle) -> bool: |
There was a problem hiding this comment.
Is __contains__ not supported for the other queues? Should we maybe specify in the comment that this should both implement the method and be efficient or there might be degraded performance?
There was a problem hiding this comment.
yea it can be supported for most queues, but I opted to not move it into the base implementation because
- our existing code infrequently uses
__contains__ BlockRefBundlerandStreamingRefBundlerhave weird semantics for__contains__, because one moment the bundle is there, and the next it's gone due to "rebundling".
On that note of bullet # 2, rather than haveSupportsRebundlingbe a protocol, we can have it becomeSupportsRebundling(BaseBundleQueue), which only supports rebundling, and won't contain a__contains__method. Haven't thought too deeply into this approach
There was a problem hiding this comment.
Ok yeah the current approach seems reasonable to me then. I think fine to keep the protocols as is rather than creating a more piecemeal approach where we have very tiny protocols.
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
2ddb01b to
1a58f57
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| "This is a bug in the Ray Data code." | ||
| ) | ||
| remaining_bundle: Optional[RefBundle] = None | ||
| if rows_needed < last_pending_bundle.num_rows(): |
There was a problem hiding this comment.
TypeError when comparing with None num_rows result
Low Severity
The comparison rows_needed < last_pending_bundle.num_rows() at line 183 and the assertion rows_needed == last_pending_bundle.num_rows() at line 187 directly call num_rows() which can return Optional[int]. If num_rows() returns None, these comparisons would raise a TypeError. While the built-in strategies (EstimateSize, ExactMultipleSize) protect against this via earlier assertions, custom RebundlingStrategy implementations could trigger this error. The comparisons could defensively handle None using or 0 like other parts of the codebase.
Additional Locations (1)
There was a problem hiding this comment.
if it's None, that means we default to 0. The bundle can't have 0 rows because we know it triggered a ready bundle.
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Old PR: #59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Limark Dcunha <limarkdcunha@gmail.com>
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: jeffery4011 <jefferyshen1015@gmail.com>
## Description Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple ref bundle queues. Old PR: #59093 There are about ~400 lines of tests, so not actually that big ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple ref bundle queues. Old PR: ray-project#59093 There are about ~400 lines of tests, so not actually that big ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple ref bundle queues. Old PR: ray-project#59093 There are about ~400 lines of tests, so not actually that big ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: jinbum-kim <jinbum9958@gmail.com>
## Description Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple ref bundle queues. Old PR: ray-project#59093 There are about ~400 lines of tests, so not actually that big ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: 400Ping <jiekaichang@apache.org>
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple ref bundle queues. Old PR: ray-project#59093 There are about ~400 lines of tests, so not actually that big ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple ref bundle queues. Old PR: ray-project#59093 There are about ~400 lines of tests, so not actually that big ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Currently we support many types of bundle queues inside operators:
BlockRefBundlerFIFOBundleQueueUnorderedOutputQueueOrderedOutputQueueList[RefBundle]Deque[RefBundle]StreamingRepartitionRefBundlerAll these independent classes makes it very awkward to record queue metrics within operators. In fact, operators themselves have to manage it themselves. This PR introduces a base class called
BaseBundleQueuethat hopes to unify all the independent classes together.NOTE:
I mostly refractored and moved stuff into new files, so that's why the lines are long. However, I did make some changes to all the existing bundle queues, namely their
get_next()method to support apeakmethod in the base class. I will add tests for these once finalized.With this approach, I hope to
InternalQueueOperatorMixin, that will automatically get size of queues, as well as clearing queues for free in the mixinBaseBundleQueuethat storesRefBundles.Future Work
This PR is a stepping stone to completely refractor how metrics are collected, stored, and exported. I'd like to refractor
OpRuntimeMetricsbecause it assumes that all operators areMapOperators, which isn't true. After this PR, I have a few proposals to refractorOpRuntimeMetricsBaseOpMetricsclass, so that individual operators can overrideBaseOpMetrics. For operators with internal queues, we can automatically hook inQueueOpMetrics(BaseOpMetrics)too_StatsActordirectly through a something like aMetricsRegistryinterface. This will require a lot of work, and I will have to repurpose the_StatsActorto read from that Registry._StatsActorand_StatsManager, but I'm not sure if that is very clean.Update on 12/2
Here is a summary of extensions

Update on 12/29
New summary of extensions

Tests
TBD
Related issues
None
Additional information
None