[data][1/n] Base + HashLinkedBundleQueue#60017
[data][1/n] Base + HashLinkedBundleQueue#60017alexeykudinkin merged 18 commits intoray-project:masterfrom
Conversation
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a new base class for bundle queues, BaseBundleQueue, and a HashLinkedQueue implementation that provides efficient removal and containment checks. This is a solid architectural improvement for better extensibility. The OutputSplitter is updated to leverage this new queue, which simplifies some of its logic. I've identified a critical issue where a method was not updated to use the new queue's API, which could lead to runtime errors. Additionally, I have some suggestions to improve the base class design and code clarity.
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a new base class BaseBundleQueue for bundle queues and a new implementation HashLinkedQueue. HashLinkedQueue is a doubly linked list with a hash map, which provides efficient O(1) for add, get_next, contains, and remove operations. This new queue is used to replace list in OutputSplitter for better performance, especially for bundle removal based on locality. The old FIFOBundleQueue is replaced by HashLinkedQueue in tests. The changes are well-structured and improve performance. I have a few minor suggestions for improving docstrings and fixing a bug with an @override decorator.
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-structured refactoring of the bundle queue implementation. It defines a new base class BaseBundleQueue and a HashLinkedQueue that provides efficient removal of elements. The changes are mostly clean and improve the modularity of the code. However, I've found a critical issue in the implementation of the remove method in HashLinkedQueue which can lead to incorrect behavior in OutputSplitter when dealing with bundles that have the same value but are distinct instances. This needs to be addressed to ensure correctness, especially for locality-aware scheduling.
| def remove(self, bundle: RefBundle) -> RefBundle: | ||
| # Case 1: The queue is empty. | ||
| if bundle not in self._bundle_to_nodes: | ||
| raise ValueError(f"The bundle {bundle} is not in the queue.") | ||
|
|
||
| node = self._bundle_to_nodes[bundle].popleft() | ||
| if not self._bundle_to_nodes[bundle]: | ||
| del self._bundle_to_nodes[bundle] | ||
|
|
||
| node = self._remove_node(node) | ||
| return node.value |
There was a problem hiding this comment.
This remove implementation is ambiguous and can lead to incorrect behavior. It removes the first-added bundle that is equal to the given bundle by using popleft() on the deque of nodes. However, a caller like OutputSplitter._pop_bundle_to_dispatch iterates through the queue to find a specific bundle instance that matches a condition (e.g., locality), and then calls remove(bundle) to remove it. If there are multiple bundle instances in the queue that are equal, the iterator might find one instance, but remove(bundle) might remove a different (older) instance.
Example of bug:
- Two equal but distinct bundles,
bundleA_1andbundleA_2, are added to the queue. The queue state is[nodeA_1, nodeA_2]. The map is{bundleA: deque([nodeA_1, nodeA_2])}. - The iterator in
OutputSplitterfindsnodeA_2's bundle, which matches a locality requirement. remove(bundleA)is called.- This method does
popleft()on the deque, which getsnodeA_1. nodeA_1is removed from the queue, butnodeA_2should have been removed.
This can lead to incorrect bundle dispatching based on locality.
To fix this, the remove operation needs to be unambiguous. Here are some possible solutions:
- Change
__iter__to yield a handle (like the_Nodeobject) along with the bundle. The caller can then pass this handle to a newremove_nodemethod to remove the specific instance. This would require changes inOutputSplitter. - Add a method like
find_and_remove(predicate)toHashLinkedQueuethat finds the first bundle matching a predicate and removes it atomically.
There was a problem hiding this comment.
Two equal but distinct bundles, bundleA_1 and bundleA_2
This is not possible. if two bundles are equal, they cannot be distinct. The hash of a refbundle is id(self), so if their ids are not identical, then according to python dict implementation, even if the ids hash collide, it's still a different ref bundle
There was a problem hiding this comment.
class Test:
def __hash__(self):
return 42
a = Test()
b = Test()
c = {}
c[a] = 1
c[b] = 2
print(c)This creates a dictionary with length 2
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
2920cce to
e5ccfdb
Compare
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…/unify-internal-queue-abstraction-add-base
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
python/ray/data/_internal/execution/operators/output_splitter.py
Outdated
Show resolved
Hide resolved
| """Return the total # of rows across all bundles.""" | ||
| return self._num_rows | ||
|
|
||
| def _reset_metrics(self): |
There was a problem hiding this comment.
Hold on, why do we need this?
There was a problem hiding this comment.
We need this when we call queue.clear(), because we incrementally keep track of queued and dequeued bundles. I could just this to an absolute calculation, but would want to wait until all PRs get merged to do that. The other alternative is to just instantiate a new instance. @alexeykudinkin thoughts?
There was a problem hiding this comment.
on second thought, I realize this is used for these methods: clear_internal_*() so those methods can be moved to the base class.
| ) | ||
|
|
||
|
|
||
| class BaseBundleQueue(_QueueMetricRecorder): |
There was a problem hiding this comment.
Is every subclass gonna inherit from Base? If so, why not just merging mixin into it?
There was a problem hiding this comment.
oh this was a remnant of an old PR where I was creating more base queue methods. I just split up the PRs and didn't change any behavior. I will move it up into the base class
| @override | ||
| def finalize(self, key: Optional[int] = None): | ||
| pass |
There was a problem hiding this comment.
Make it optional in the base
| # Only used when `self._prserve_order` is True. | ||
| self._input_buffers: List[BundleQueue] = [ | ||
| FIFOBundleQueue() for _ in range(len(input_ops)) | ||
| self._input_buffers: List["BaseBundleQueue"] = [ |
There was a problem hiding this comment.
| self._input_buffers: List["BaseBundleQueue"] = [ | |
| self._input_buffers: List[HashLinkedQueue] = [ |
| from ray.tests.conftest import * # noqa | ||
|
|
||
|
|
||
| @pytest.mark.parametrize("equal", [False, True]) |
There was a problem hiding this comment.
Why changing these?
| self._tail: Optional[_Node] = None | ||
| # We use a dictionary to keep track of the nodes corresponding to each bundle. | ||
| # This allows us to remove a bundle from the queue in O(1) time. We need a list | ||
| # because a bundle can be added to the queue multiple times. Nodes in each list |
There was a problem hiding this comment.
When would a bundle be added multiple times? If concurrency > 1 or because of retries?
There was a problem hiding this comment.
I'm actually not so sure when this can happen, I can take a look later
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| # Remove will dequeue the bundle, therefore, we override | ||
| # get_next(), _get_next_inner() |
There was a problem hiding this comment.
Comment is confusing, this method should be overriding _get_next_inner, right?
There was a problem hiding this comment.
lol yea that comment makes no sense. What i meant to say is that remove() and get_next() both use remove_node() which will dequeue bundle for you. I just moved the dequeuing logic up the stack to make it clearer, so now we override get_next_inner()
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
5b702ff to
8510969
Compare
| """Remove all bundles from the queue.""" | ||
| ... | ||
|
|
||
| def finalize(self, **kwargs: Any): |
There was a problem hiding this comment.
Please add return type
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Limark Dcunha <limarkdcunha@gmail.com>
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: jeffery4011 <jefferyshen1015@gmail.com>
## Description After #60017 got merged, I forgot to update the `test_bundle_queue` test suite. This PR adds more tests for `num_blocks`, `num_rows`, `estimate_size_bytes`, and `len(queue)` ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…eue` (#60538) #60017 and #60228 refactored the `FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with `FIFOBundleQueue.get_next`. However, this name change wasn't reflected in the `UnionOperator` implementation, and as a result the operator can error when it clears its output queue. This change also fixes the flaky `test_union.py`. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
## Description After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test suite. This PR adds more tests for `num_blocks`, `num_rows`, `estimate_size_bytes`, and `len(queue)` ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: jinbum-kim <jinbum9958@gmail.com>
…eue` (ray-project#60538) ray-project#60017 and ray-project#60228 refactored the `FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with `FIFOBundleQueue.get_next`. However, this name change wasn't reflected in the `UnionOperator` implementation, and as a result the operator can error when it clears its output queue. This change also fixes the flaky `test_union.py`. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: jinbum-kim <jinbum9958@gmail.com>
…eue` (ray-project#60538) ray-project#60017 and ray-project#60228 refactored the `FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with `FIFOBundleQueue.get_next`. However, this name change wasn't reflected in the `UnionOperator` implementation, and as a result the operator can error when it clears its output queue. This change also fixes the flaky `test_union.py`. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
## Description After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test suite. This PR adds more tests for `num_blocks`, `num_rows`, `estimate_size_bytes`, and `len(queue)` ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: 400Ping <jiekaichang@apache.org>
…eue` (ray-project#60538) ray-project#60017 and ray-project#60228 refactored the `FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with `FIFOBundleQueue.get_next`. However, this name change wasn't reflected in the `UnionOperator` implementation, and as a result the operator can error when it clears its output queue. This change also fixes the flaky `test_union.py`. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: 400Ping <jiekaichang@apache.org>
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
## Description After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test suite. This PR adds more tests for `num_blocks`, `num_rows`, `estimate_size_bytes`, and `len(queue)` ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
…eue` (ray-project#60538) ray-project#60017 and ray-project#60228 refactored the `FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with `FIFOBundleQueue.get_next`. However, this name change wasn't reflected in the `UnionOperator` implementation, and as a result the operator can error when it clears its output queue. This change also fixes the flaky `test_union.py`. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: Adel Nour <ans9868@nyu.edu>
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test suite. This PR adds more tests for `num_blocks`, `num_rows`, `estimate_size_bytes`, and `len(queue)` ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…eue` (ray-project#60538) ray-project#60017 and ray-project#60228 refactored the `FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with `FIFOBundleQueue.get_next`. However, this name change wasn't reflected in the `UnionOperator` implementation, and as a result the operator can error when it clears its output queue. This change also fixes the flaky `test_union.py`. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description Old PR: ray-project#59093 ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
## Description After ray-project#60017 got merged, I forgot to update the `test_bundle_queue` test suite. This PR adds more tests for `num_blocks`, `num_rows`, `estimate_size_bytes`, and `len(queue)` ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…eue` (ray-project#60538) ray-project#60017 and ray-project#60228 refactored the `FIFOBundleQueue` interface and renamed `FIFOBundleQueue.popleft` with `FIFOBundleQueue.get_next`. However, this name change wasn't reflected in the `UnionOperator` implementation, and as a result the operator can error when it clears its output queue. This change also fixes the flaky `test_union.py`. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Old PR: #59093
Related issues
Additional information