Skip to content

[data][1/n] Base + HashLinkedBundleQueue#60017

Merged
alexeykudinkin merged 18 commits intoray-project:masterfrom
iamjustinhsu:jhsu/unify-internal-queue-abstraction-add-base
Jan 16, 2026
Merged

[data][1/n] Base + HashLinkedBundleQueue#60017
alexeykudinkin merged 18 commits intoray-project:masterfrom
iamjustinhsu:jhsu/unify-internal-queue-abstraction-add-base

Conversation

@iamjustinhsu
Copy link
Copy Markdown
Contributor

Description

Old PR: #59093

Related issues

Additional information

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu requested a review from a team as a code owner January 9, 2026 22:34
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new base class for bundle queues, BaseBundleQueue, and a HashLinkedQueue implementation that provides efficient removal and containment checks. This is a solid architectural improvement for better extensibility. The OutputSplitter is updated to leverage this new queue, which simplifies some of its logic. I've identified a critical issue where a method was not updated to use the new queue's API, which could lead to runtime errors. Additionally, I have some suggestions to improve the base class design and code clarity.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new base class BaseBundleQueue for bundle queues and a new implementation HashLinkedQueue. HashLinkedQueue is a doubly linked list with a hash map, which provides efficient O(1) for add, get_next, contains, and remove operations. This new queue is used to replace list in OutputSplitter for better performance, especially for bundle removal based on locality. The old FIFOBundleQueue is replaced by HashLinkedQueue in tests. The changes are well-structured and improve performance. I have a few minor suggestions for improving docstrings and fixing a bug with an @override decorator.

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant and well-structured refactoring of the bundle queue implementation. It defines a new base class BaseBundleQueue and a HashLinkedQueue that provides efficient removal of elements. The changes are mostly clean and improve the modularity of the code. However, I've found a critical issue in the implementation of the remove method in HashLinkedQueue which can lead to incorrect behavior in OutputSplitter when dealing with bundles that have the same value but are distinct instances. This needs to be addressed to ensure correctness, especially for locality-aware scheduling.

Comment on lines +98 to +108
def remove(self, bundle: RefBundle) -> RefBundle:
# Case 1: The queue is empty.
if bundle not in self._bundle_to_nodes:
raise ValueError(f"The bundle {bundle} is not in the queue.")

node = self._bundle_to_nodes[bundle].popleft()
if not self._bundle_to_nodes[bundle]:
del self._bundle_to_nodes[bundle]

node = self._remove_node(node)
return node.value
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This remove implementation is ambiguous and can lead to incorrect behavior. It removes the first-added bundle that is equal to the given bundle by using popleft() on the deque of nodes. However, a caller like OutputSplitter._pop_bundle_to_dispatch iterates through the queue to find a specific bundle instance that matches a condition (e.g., locality), and then calls remove(bundle) to remove it. If there are multiple bundle instances in the queue that are equal, the iterator might find one instance, but remove(bundle) might remove a different (older) instance.

Example of bug:

  1. Two equal but distinct bundles, bundleA_1 and bundleA_2, are added to the queue. The queue state is [nodeA_1, nodeA_2]. The map is {bundleA: deque([nodeA_1, nodeA_2])}.
  2. The iterator in OutputSplitter finds nodeA_2's bundle, which matches a locality requirement.
  3. remove(bundleA) is called.
  4. This method does popleft() on the deque, which gets nodeA_1.
  5. nodeA_1 is removed from the queue, but nodeA_2 should have been removed.

This can lead to incorrect bundle dispatching based on locality.

To fix this, the remove operation needs to be unambiguous. Here are some possible solutions:

  • Change __iter__ to yield a handle (like the _Node object) along with the bundle. The caller can then pass this handle to a new remove_node method to remove the specific instance. This would require changes in OutputSplitter.
  • Add a method like find_and_remove(predicate) to HashLinkedQueue that finds the first bundle matching a predicate and removes it atomically.

Copy link
Copy Markdown
Contributor Author

@iamjustinhsu iamjustinhsu Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two equal but distinct bundles, bundleA_1 and bundleA_2

This is not possible. if two bundles are equal, they cannot be distinct. The hash of a refbundle is id(self), so if their ids are not identical, then according to python dict implementation, even if the ids hash collide, it's still a different ref bundle

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class Test:

    def __hash__(self):
        return 42


a = Test()
b = Test()

c = {}
c[a] = 1
c[b] = 2
print(c)

This creates a dictionary with length 2

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/unify-internal-queue-abstraction-add-base branch from 2920cce to e5ccfdb Compare January 9, 2026 23:31
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…/unify-internal-queue-abstraction-add-base
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@ray-gardener ray-gardener bot added python Pull requests that update Python code data Ray Data-related issues labels Jan 10, 2026
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
"""Return the total # of rows across all bundles."""
return self._num_rows

def _reset_metrics(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold on, why do we need this?

Copy link
Copy Markdown
Contributor Author

@iamjustinhsu iamjustinhsu Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this when we call queue.clear(), because we incrementally keep track of queued and dequeued bundles. I could just this to an absolute calculation, but would want to wait until all PRs get merged to do that. The other alternative is to just instantiate a new instance. @alexeykudinkin thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on second thought, I realize this is used for these methods: clear_internal_*() so those methods can be moved to the base class.

)


class BaseBundleQueue(_QueueMetricRecorder):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is every subclass gonna inherit from Base? If so, why not just merging mixin into it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh this was a remnant of an old PR where I was creating more base queue methods. I just split up the PRs and didn't change any behavior. I will move it up into the base class

Comment on lines +123 to +125
@override
def finalize(self, key: Optional[int] = None):
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it optional in the base

# Only used when `self._prserve_order` is True.
self._input_buffers: List[BundleQueue] = [
FIFOBundleQueue() for _ in range(len(input_ops))
self._input_buffers: List["BaseBundleQueue"] = [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._input_buffers: List["BaseBundleQueue"] = [
self._input_buffers: List[HashLinkedQueue] = [

from ray.tests.conftest import * # noqa


@pytest.mark.parametrize("equal", [False, True])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why changing these?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops

self._tail: Optional[_Node] = None
# We use a dictionary to keep track of the nodes corresponding to each bundle.
# This allows us to remove a bundle from the queue in O(1) time. We need a list
# because a bundle can be added to the queue multiple times. Nodes in each list
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would a bundle be added multiple times? If concurrency > 1 or because of retries?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not so sure when this can happen, I can take a look later

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Comment on lines +66 to +67
# Remove will dequeue the bundle, therefore, we override
# get_next(), _get_next_inner()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is confusing, this method should be overriding _get_next_inner, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol yea that comment makes no sense. What i meant to say is that remove() and get_next() both use remove_node() which will dequeue bundle for you. I just moved the dequeuing logic up the stack to make it clearer, so now we override get_next_inner()

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
@iamjustinhsu iamjustinhsu force-pushed the jhsu/unify-internal-queue-abstraction-add-base branch from 5b702ff to 8510969 Compare January 15, 2026 08:03
@iamjustinhsu iamjustinhsu added the go add ONLY when ready to merge, run all tests label Jan 15, 2026
"""Remove all bundles from the queue."""
...

def finalize(self, **kwargs: Any):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add return type

@alexeykudinkin alexeykudinkin merged commit f6d9b28 into ray-project:master Jan 16, 2026
6 checks passed
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jan 18, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: Limark Dcunha <limarkdcunha@gmail.com>
jeffery4011 pushed a commit to jeffery4011/ray that referenced this pull request Jan 20, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: jeffery4011 <jefferyshen1015@gmail.com>
alexeykudinkin pushed a commit that referenced this pull request Jan 21, 2026
## Description
After #60017 got merged, I forgot to update the `test_bundle_queue` test
suite. This PR adds more tests for `num_blocks`, `num_rows`,
`estimate_size_bytes`, and `len(queue)`
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
bveeramani added a commit that referenced this pull request Jan 27, 2026
…eue` (#60538)

#60017 and
#60228 refactored the
`FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with
`FIFOBundleQueue.get_next`. However, this name change wasn't reflected
in the `UnionOperator` implementation, and as a result the operator can
error when it clears its output queue.

This change also fixes the flaky `test_union.py`.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
jinbum-kim pushed a commit to jinbum-kim/ray that referenced this pull request Jan 29, 2026
## Description
After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test
suite. This PR adds more tests for `num_blocks`, `num_rows`,
`estimate_size_bytes`, and `len(queue)`
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: jinbum-kim <jinbum9958@gmail.com>
jinbum-kim pushed a commit to jinbum-kim/ray that referenced this pull request Jan 29, 2026
…eue` (ray-project#60538)

ray-project#60017 and
ray-project#60228 refactored the
`FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with
`FIFOBundleQueue.get_next`. However, this name change wasn't reflected
in the `UnionOperator` implementation, and as a result the operator can
error when it clears its output queue.

This change also fixes the flaky `test_union.py`.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: jinbum-kim <jinbum9958@gmail.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jan 29, 2026
…eue` (ray-project#60538)

ray-project#60017 and
ray-project#60228 refactored the
`FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with
`FIFOBundleQueue.get_next`. However, this name change wasn't reflected
in the `UnionOperator` implementation, and as a result the operator can
error when it clears its output queue.

This change also fixes the flaky `test_union.py`.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
400Ping pushed a commit to 400Ping/ray that referenced this pull request Feb 1, 2026
## Description
After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test
suite. This PR adds more tests for `num_blocks`, `num_rows`,
`estimate_size_bytes`, and `len(queue)`
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: 400Ping <jiekaichang@apache.org>
400Ping pushed a commit to 400Ping/ray that referenced this pull request Feb 1, 2026
…eue` (ray-project#60538)

ray-project#60017 and
ray-project#60228 refactored the
`FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with
`FIFOBundleQueue.get_next`. However, this name change wasn't reflected
in the `UnionOperator` implementation, and as a result the operator can
error when it clears its output queue.

This change also fixes the flaky `test_union.py`.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: 400Ping <jiekaichang@apache.org>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Feb 3, 2026
## Description
After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test
suite. This PR adds more tests for `num_blocks`, `num_rows`,
`estimate_size_bytes`, and `len(queue)`
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
…eue` (ray-project#60538)

ray-project#60017 and
ray-project#60228 refactored the
`FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with
`FIFOBundleQueue.get_next`. However, this name change wasn't reflected
in the `UnionOperator` implementation, and as a result the operator can
error when it clears its output queue.

This change also fixes the flaky `test_union.py`.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: Adel Nour <ans9868@nyu.edu>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test
suite. This PR adds more tests for `num_blocks`, `num_rows`,
`estimate_size_bytes`, and `len(queue)`
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…eue` (ray-project#60538)

ray-project#60017 and
ray-project#60228 refactored the
`FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with
`FIFOBundleQueue.get_next`. However, this name change wasn't reflected
in the `UnionOperator` implementation, and as a result the operator can
error when it clears its output queue.

This change also fixes the flaky `test_union.py`.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Old PR: ray-project#59093

## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test
suite. This PR adds more tests for `num_blocks`, `num_rows`,
`estimate_size_bytes`, and `len(queue)`
## Related issues

## Additional information

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…eue` (ray-project#60538)

ray-project#60017 and
ray-project#60228 refactored the
`FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with
`FIFOBundleQueue.get_next`. However, this name change wasn't reflected
in the `UnionOperator` implementation, and as a result the operator can
error when it clears its output queue.

This change also fixes the flaky `test_union.py`.

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests python Pull requests that update Python code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants