Skip to content

[data] Unify bundle queue abstraction#59093

Open
iamjustinhsu wants to merge 58 commits intoray-project:masterfrom
iamjustinhsu:jhsu/unify-internal-queue-abstraction
Open

[data] Unify bundle queue abstraction#59093
iamjustinhsu wants to merge 58 commits intoray-project:masterfrom
iamjustinhsu:jhsu/unify-internal-queue-abstraction

Conversation

@iamjustinhsu
Copy link
Copy Markdown
Contributor

@iamjustinhsu iamjustinhsu commented Dec 1, 2025

Description

Currently we support many types of bundle queues inside operators:

  • BlockRefBundler
  • FIFOBundleQueue
  • UnorderedOutputQueue
  • OrderedOutputQueue
  • List[RefBundle]
  • Deque[RefBundle]
  • StreamingRepartitionRefBundler

All these independent classes makes it very awkward to record queue metrics within operators. In fact, operators themselves have to manage it themselves. This PR introduces a base class called BaseBundleQueue that hopes to unify all the independent classes together.

class BaseBundleQueue:
    """Base class for storing bundles."""

    @abc.abstractmethod
    def add(self, bundle: RefBundle, **kwargs: Any) -> None:
        ...

    @abc.abstractmethod
    def get_next(self) -> RefBundle:
        ...

    @abc.abstractmethod
    def peek_next(self) -> Optional[RefBundle]:
        ...

    @abc.abstractmethod
    def has_next(self) -> bool:
        ...

    @abc.abstractmethod
    def clear(self):
        ...

    @abc.abstractmethod
    def finalize(self, **kwargs: Any):
        ...

NOTE:

I mostly refractored and moved stuff into new files, so that's why the lines are long. However, I did make some changes to all the existing bundle queues, namely their get_next() method to support a peak method in the base class. I will add tests for these once finalized.

With this approach, I hope to

  • Simplify design of future operators with InternalQueueOperatorMixin, that will automatically get size of queues, as well as clearing queues for free in the mixin
  • Dedicated queue metrics, so that operators themselves don't need to track
  • Lower cognitive overhead of trying to understand what type of queue is being used within each operator. You now know it's always a BaseBundleQueue that stores RefBundles.

Future Work

This PR is a stepping stone to completely refractor how metrics are collected, stored, and exported. I'd like to refractor OpRuntimeMetrics because it assumes that all operators are MapOperators, which isn't true. After this PR, I have a few proposals to refractor OpRuntimeMetrics

  1. Create a BaseOpMetrics class, so that individual operators can override BaseOpMetrics. For operators with internal queues, we can automatically hook in QueueOpMetrics(BaseOpMetrics) too
  2. Allow classes themselves to report to _StatsActor directly through a something like a MetricsRegistry interface. This will require a lot of work, and I will have to repurpose the _StatsActor to read from that Registry.
  3. I'm also thinking we can just completely remove _StatsActor and _StatsManager, but I'm not sure if that is very clean.

Update on 12/2

Here is a summary of extensions
bundle_queue

Update on 12/29

New summary of extensions
image

Tests

TBD

Related issues

None

Additional information

None

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner December 1, 2025 19:30
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a BaseBundleQueue abstraction to unify the various queue implementations within Ray Data operators. This is a significant and valuable refactoring that simplifies operator design and centralizes queue metrics. The overall approach is solid, and the new queue implementations in fifo.py, hash_link.py, and ordered.py are well-designed.

However, I've found several issues with the implementation that need to be addressed:

  • There are a few critical correctness bugs, including a missing import in bundle_queue/__init__.py and incorrect buffer handling in OutputSplitter.
  • The rebundling queues (RebundleQueue and ExactRebundleQueue) have issues: one violates the peek_next contract by having side effects, and the other has a metric leak in get_next_with_original.
  • The transition to queue-based metrics is incomplete in MapOperator, leading to redundant and confusing metric tracking.
  • There are also some minor issues with incorrect type hints and misleading docstrings.

I've left detailed comments on each of these points. Addressing them will ensure the new abstraction is robust and correctly implemented.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Dec 2, 2025
Copy link
Copy Markdown
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly left comments surrounding the high level method names / APIs. I think solid start but some opportunities for tightening the naming up since we are already doing a big shake up. LMK if you need another review.

@@ -1,75 +0,0 @@
import abc
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to fully scrap BundleQueue in favor of BaseBundleQueue?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's mostly a rename into the base.py (i'm gonna make clear what is renamed and what is not after this comment), the only difference is that the base also contains a method called done_adding_bundles. This method is used for BlockRefBundler, StreamingRepartitionRefBunlder, and OrderedOutputQueue

"""Protocol for storing bundles AND supporting remove(bundle)
and contains(bundle) operations quickly."""

def __contains__(self, bundle: RefBundle) -> bool:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is __contains__ not supported for the other queues? Should we maybe specify in the comment that this should both implement the method and be efficient or there might be degraded performance?

Copy link
Copy Markdown
Contributor Author

@iamjustinhsu iamjustinhsu Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea it can be supported for most queues, but I opted to not move it into the base implementation because

  • our existing code infrequently uses __contains__
  • BlockRefBundler and StreamingRefBundler have weird semantics for __contains__, because one moment the bundle is there, and the next it's gone due to "rebundling".
    On that note of bullet # 2, rather than have SupportsRebundling be a protocol, we can have it become SupportsRebundling(BaseBundleQueue), which only supports rebundling, and won't contain a __contains__ method. Haven't thought too deeply into this approach

Copy link
Copy Markdown
Contributor

@omatthew98 omatthew98 Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok yeah the current approach seems reasonable to me then. I think fine to keep the protocols as is rather than creating a more piecemeal approach where we have very tiny protocols.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu requested review from a team as code owners December 2, 2025 23:11
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/unify-internal-queue-abstraction branch from 2ddb01b to 1a58f57 Compare December 2, 2025 23:32
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
"This is a bug in the Ray Data code."
)
remaining_bundle: Optional[RefBundle] = None
if rows_needed < last_pending_bundle.num_rows():
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeError when comparing with None num_rows result

Low Severity

The comparison rows_needed < last_pending_bundle.num_rows() at line 183 and the assertion rows_needed == last_pending_bundle.num_rows() at line 187 directly call num_rows() which can return Optional[int]. If num_rows() returns None, these comparisons would raise a TypeError. While the built-in strategies (EstimateSize, ExactMultipleSize) protect against this via earlier assertions, custom RebundlingStrategy implementations could trigger this error. The comparisons could defensively handle None using or 0 like other parts of the codebase.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's None, that means we default to 0. The bundle can't have 0 rows because we know it triggered a ready bundle.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
alexeykudinkin pushed a commit that referenced this pull request Jan 16, 2026
## Description
Old PR: #59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jan 18, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: Limark Dcunha <limarkdcunha@gmail.com>
jeffery4011 pushed a commit to jeffery4011/ray that referenced this pull request Jan 20, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: jeffery4011 <jefferyshen1015@gmail.com>
@iamjustinhsu iamjustinhsu added the unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it. label Jan 21, 2026
alexeykudinkin pushed a commit that referenced this pull request Jan 24, 2026
## Description
Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off
of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple
ref bundle queues.

Old PR: #59093

There are about ~400 lines of tests, so not actually that big

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
xinyuangui2 pushed a commit to xinyuangui2/ray that referenced this pull request Jan 26, 2026
## Description
Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off
of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple
ref bundle queues.

Old PR: ray-project#59093

There are about ~400 lines of tests, so not actually that big

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
jinbum-kim pushed a commit to jinbum-kim/ray that referenced this pull request Jan 29, 2026
## Description
Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off
of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple
ref bundle queues.

Old PR: ray-project#59093

There are about ~400 lines of tests, so not actually that big

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: jinbum-kim <jinbum9958@gmail.com>
400Ping pushed a commit to 400Ping/ray that referenced this pull request Feb 1, 2026
## Description
Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off
of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple
ref bundle queues.

Old PR: ray-project#59093

There are about ~400 lines of tests, so not actually that big

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: 400Ping <jiekaichang@apache.org>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off
of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple
ref bundle queues.

Old PR: ray-project#59093

There are about ~400 lines of tests, so not actually that big

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Moves OrderedBundleQueue to `bundle_queue/` directory, and rebases off
of the `BaseBundleQueue` class. Creates a `FIFOBundleQueue` for simple
ref bundle queues.

Old PR: ray-project#59093

There are about ~400 lines of tests, so not actually that big

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants