[data] New executor [7/n]--- bare bones implementation of StreamingExecutor#31579
[data] New executor [7/n]--- bare bones implementation of StreamingExecutor#31579ericl merged 24 commits intoray-project:masterfrom
Conversation
e5dca74 to
8404b37
Compare
309620b to
78a89c3
Compare
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
stephanie-wang
left a comment
There was a problem hiding this comment.
Might be missing something, but actually I wonder if we really need the inputs_done interface? I think this is needed for all-to-all operations and to flush at the end of the job, right?
For all-to-all, I think we could move the logic inside the bulk_fn that's passed to the operator. For example, we can call the bulk_fn on each input added, and bulk_fn would only run the shuffle once it's received the expected number of inputs. This also better matches what the operator would look like in the windowed shuffle case.
For the flush case, we can track the done inputs in the op state as we are right now, but then only call op.flush() once per op instead of once per op input.
Fine leaving this cleanup to later but it occurred to me that it may simplify the current code.
| return selected | ||
|
|
||
|
|
||
| def dispatch_next_task(op_state: OpState) -> None: |
There was a problem hiding this comment.
Should this just be a method of OpState?
|
|
||
| def inputs_done(self, input_index: int) -> None: | ||
| self._execution_state.inputs_done(input_index) | ||
| self._inputs_done = True |
There was a problem hiding this comment.
Is there an assumption here that there is only one input (add a note?)?
| self._completed = True | ||
|
|
||
| def completed(self) -> bool: | ||
| return self._completed |
There was a problem hiding this comment.
Why don't we need to check whether self.has_next() as we do in the map operator?
There was a problem hiding this comment.
Done (moved to physical op class).
|
|
||
| def completed(self) -> bool: | ||
| return ( | ||
| self._inputs_done and len(self.get_work_refs()) == 0 and not self.has_next() |
There was a problem hiding this comment.
Seems like this definition could be shared across the different operators?
| op_state = OpState(o2) | ||
| o2.add_input = MagicMock() | ||
| op_state.inqueues[0].append("dummy1") | ||
| dispatch_next_task(op_state) |
There was a problem hiding this comment.
Suggest running this multiple times so we can test indices other than 0.
There was a problem hiding this comment.
Added a test for multiple inputs, and a TODO for multiple indices.
There was a problem hiding this comment.
Updated. Regarding removing the completion method, I don't think that's possible if we want to support operators with unknown output size in general. Currently, we have a method that returns an estimate of the number of outputs for the progress bar, but this is allowed to return None for operators that don't know their number of outputs at planning time.
| self._completed = True | ||
|
|
||
| def completed(self) -> bool: | ||
| return self._completed |
There was a problem hiding this comment.
Done (moved to physical op class).
|
|
||
| def inputs_done(self, input_index: int) -> None: | ||
| self._execution_state.inputs_done(input_index) | ||
| self._inputs_done = True |
|
|
||
| def completed(self) -> bool: | ||
| return ( | ||
| self._inputs_done and len(self.get_work_refs()) == 0 and not self.has_next() |
| return selected | ||
|
|
||
|
|
||
| def dispatch_next_task(op_state: OpState) -> None: |
| op_state = OpState(o2) | ||
| o2.add_input = MagicMock() | ||
| op_state.inqueues[0].append("dummy1") | ||
| dispatch_next_task(op_state) |
There was a problem hiding this comment.
Added a test for multiple inputs, and a TODO for multiple indices.
Hmm I see, yeah I was thinking we would just flush once enough outputs have been accumulated, but I guess it ends up being the same thing. It's more minor, but maybe we could at least change the signature from |
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
|
Yup good point, I can't think of where an operator would need the index for the done signal. Removed. |
c641f76 to
528f87d
Compare
Signed-off-by: Eric Liang <ekhliang@gmail.com>
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Eric Liang <ekhliang@gmail.com>
clarkzinzow
left a comment
There was a problem hiding this comment.
LGTM overall, mostly questions and impl/comment/test nits.
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
ericl
left a comment
There was a problem hiding this comment.
All comments addressed, ptal.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
9da93b8 to
31cfd08
Compare
…ecutor (ray-project#31579) Initial implementation of ray-project/enhancements#18, dependent on ray-project#30903 Streaming execution can be toggled with the following env var: RAY_DATASET_USE_STREAMING_EXECUTOR=0|1. Signed-off-by: Andrea Pisoni <andreapiso@gmail.com>
Why are these changes needed?
Initial implementation of ray-project/enhancements#18, dependent on #30903
Streaming execution can be toggled with the following env var:
RAY_DATASET_USE_STREAMING_EXECUTOR=0|1.Initial PR TODOs:
Future TODOs: