[data] New executor backend [3/n]--- Add basic operators impl#31305
[data] New executor backend [3/n]--- Add basic operators impl#31305ericl merged 21 commits intoray-project:masterfrom
Conversation
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
| assert _take_outputs(op) == [[i] for i in range(10)] | ||
|
|
||
|
|
||
| def test_map_operator_ray_args(shutdown_only): |
There was a problem hiding this comment.
Debating whether it's worth it to mock out the Ray API here to speed up these tests a bit. Maybe it's not that important since the bulk of the testing will be for StreamingExecutor, which we can write separate mocks for.
python/ray/data/_internal/execution/operators/input_data_buffer.py
Outdated
Show resolved
Hide resolved
|
|
||
| Supported strategies: {TaskPoolStrategy, ActorPoolStrategy}. | ||
| """ | ||
| return self._strategy |
There was a problem hiding this comment.
Hmm I wonder if we can keep the implementation details of compute strategy and ray remote args etc outside of the operators? It could be cleaner if we pass in the ray.remote Callable instead of the worker's Callable as the transform_fn but not sure if this will work so I'll leave it up to you.
There was a problem hiding this comment.
Yeah, I have a TODO on line 78 to clean this up in the future. I'm hoping the ComputeStrategy can turn into a simple dataclass once we migrate fully to the new backend. Right now, I avoided doing this refactoring to keep the changes self-contained.
About the callable, I think that's possible but it's probably also easier to do once we have the logical optimization layer in place (the optimizer could generate the ray.remote callable).
|
I'll hold this open until EOD for more comments. |
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Show resolved
Hide resolved
| input_op: Operator generating input data for this op. | ||
| name: The name of this operator. | ||
| compute_strategy: Customize the compute strategy for this op. | ||
| min_rows_per_batch: The number of rows to gather per batch passed to the |
There was a problem hiding this comment.
Should we name it as min_rows_per_fn_call? batch is kind of confusing here, as this is neither user-facing batch in map_batches, nor zero-copy batch execution we shall introduce later.
There was a problem hiding this comment.
Batch seems clearer to me: it basically is the same as the user facing batch size.
There was a problem hiding this comment.
I wonder if we should keep using "target_row_per_batch", since there is not guarantee for "min" here. And we should clarify it's possible the target is not met when not enough rows.
There was a problem hiding this comment.
I don't think so, the previous naming was very confusing for me. The new one is clear in intent.
There was a problem hiding this comment.
@ericl But that intent is incorrect: this is a target to get near to, not a minimum/floor. We add blocks to a bundle up to this target size, but we purposefully do not exceed it, so this is definitely not a minimum.
There was a problem hiding this comment.
Alright, let me rename this to min_rows_per_bundle then. I don't think it's possible to be precisely unambiguous, and would prefer we keep the "min" intent which is the big picture.
| input_op: Operator generating input data for this op. | ||
| name: The name of this operator. | ||
| compute_strategy: Customize the compute strategy for this op. | ||
| min_rows_per_batch: The number of rows to gather per batch passed to the |
There was a problem hiding this comment.
I wonder if we should keep using "target_row_per_batch", since there is not guarantee for "min" here. And we should clarify it's possible the target is not met when not enough rows.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/map_operator_tasks_impl.py
Show resolved
Hide resolved
Co-authored-by: Clark Zinzow <clarkzinzow@gmail.com> Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
| self._obj_store_mem_peak: int = 0 | ||
|
|
||
| def add_input(self, bundle: RefBundle) -> None: | ||
| if self._min_rows_per_bundle is None: |
There was a problem hiding this comment.
I ended up putting this back, in order to enable empty block propagation.
ericl
left a comment
There was a problem hiding this comment.
Updated; main changes was I removed the circular dependency between the operator impl and the wrapper operator.
Add the initial operator implementations. This is split out from #30903
…oject#31305) Add the initial operator implementations. This is split out from ray-project#30903 Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
Why are these changes needed?
Add the initial operator implementations.
This is split out from #30903
TODO: