[data] New executor [4/n]--- Add bulk executor implementation#31443
[data] New executor [4/n]--- Add bulk executor implementation#31443ericl merged 7 commits intoray-project:masterfrom
Conversation
| for i, (k, v) in enumerate(stages.items()): | ||
| if i == 0: | ||
| stage_infos[self.stage_name + "_" + k] = v | ||
| if len(stages) > 1: |
There was a problem hiding this comment.
This change just allows build_multistage() to be used in all scenarios as a simplification.
| stage_uuid = self.dataset_uuid + stage_name | ||
| # TODO(ekl) deprecate and remove the notion of dataset UUID once we move | ||
| # fully to streaming execution. | ||
| stage_uuid = (self.dataset_uuid or "unknown_uuid") + stage_name |
There was a problem hiding this comment.
We have a hack in the followup PR to set this correctly. For now, adding a default so that we don't need to be adding dataset_uuid to the execute() API if we are removing it in the future.
Signed-off-by: Eric Liang <ekhliang@gmail.com>
| def execute_recursive(op: PhysicalOperator) -> List[RefBundle]: | ||
| # Avoid duplicate executions. | ||
| if op in saved_outputs: | ||
| return saved_outputs[op] |
There was a problem hiding this comment.
Nit: Could we have test coverage for duplicate execution elimination?
There was a problem hiding this comment.
Hmm, this brings up an interesting issue of supporting Union() DAGs properly. I think this is currently forcing materialization. Let me add a TODO to address this more holistically (could be something @jianoaix can help out on next).
| if initial_stats: | ||
| self._stats = initial_stats | ||
|
|
||
| saved_outputs: Dict[PhysicalOperator, List[RefBundle]] = {} |
There was a problem hiding this comment.
Just calling out that the default hash function for PhysicalOperator will be based on its memory address, so if it's accidentally copied somewhere, we will be silently performing duplicate executions. Not likely to happen with how the operators will be used, though, and manually defining __hash__ for operators seems more cumbersome than it's worth.
|
|
||
| saved_outputs: Dict[PhysicalOperator, List[RefBundle]] = {} | ||
|
|
||
| def execute_recursive(op: PhysicalOperator) -> List[RefBundle]: |
There was a problem hiding this comment.
Supernit: If we make the BulkExecutor single-execution, then we could make this a utility method (with the saved_outputs execution cache an instance attribute) rather than a dynamically defined function. Probably not much benefit in this, other than it maybe being more testable (e.g. being able to mock out the recursive utility method, being able to mock out/inject into saved_outputs, etc.).
|
|
||
| # Fully execute this operator. | ||
| logger.debug("Executing op %s", op.name) | ||
| builder = self._stats.child_builder(op.name) |
There was a problem hiding this comment.
I was wondering why this builder is created here if it isn't used until line 70 when building the stats, and then remembered that this is needed to initialize the start time before executing the operator.
We could make this dependency more explicit by registering the parent stats with the PhysicalOperator (e.g. op.init_stats(self._stats)), have the operator internally create the builder, and have op.get_stats() return the actual built DatasetStats object, which would have the added advantage of not needing to expose the op.get_metrics() API (adding the metrics to the stats would be an implementation detail of op.get_stats()).
Are there any pros to keeping the stats building logic in the executors? Will the StreamingExecutor need to do anything differently?
There was a problem hiding this comment.
Hmm, I think the main thing is avoiding Operators needing to know about DatasetStats. This gives us a bit more loose coupling between components.
There was a problem hiding this comment.
You are correct there is a tradeoff though, maybe we can reconsider when implementing StreamingExecutor.
There was a problem hiding this comment.
Yeah good point, operators only needing to know about block metadata (no stats objects) is nice.
You are correct there is a tradeoff though, maybe we can reconsider when implementing StreamingExecutor.
That sounds like a perfect time to look at this again, sounds good! 👍
| saved_outputs[op] = output | ||
| op_stats = op.get_stats() | ||
| op_metrics = op.get_metrics() | ||
| if op_stats: |
There was a problem hiding this comment.
A few questions about this:
- So there would primarily be no stats for e.g.
InputDataBufferoperators, but what about other operators? - If we're not setting
self._statsin this case, are we making the assumption here that (a) we want to skip this generation (i.e. where the child operator is parented under the grandparent), (b) operators without stats will have no downstream operators (not the case forInputDataBuffer, obviously, but maybe other no-stats operators), or (c) one of the other parents of the child operator will have stats defined? - With the depth-first traversal of the DAG, we're going to set the parent stats for a given operator to the last parent operator in its
op.input_dependencieslist; is this always what we want?
There was a problem hiding this comment.
- That's right. It's an optional method.
- I think the expectation is that all operators will implement stats, except maybe the input, so maybe this is moot.
- Probably not, but I think this circles back to the broader issue of supporting true DAGs (e.g., Union).
| if tasks: | ||
| bar = ProgressBar(op.name, total=op.num_outputs_total()) | ||
| while tasks: | ||
| done, _ = ray.wait( | ||
| tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1 | ||
| ) | ||
| for ready in done: | ||
| op.notify_work_completed(ready) | ||
| tasks = op.get_work_refs() | ||
| while op.has_next(): | ||
| bar.update(1) | ||
| output.append(op.get_next()) | ||
| bar.close() | ||
| # An operator is finished only after it has no remaining work as well as no | ||
| # remaining outputs. | ||
| while op.has_next(): | ||
| output.append(op.get_next()) |
There was a problem hiding this comment.
Nit: Going back to this comment, by making this an if-else and adding some comments, we might be able to make this contract around when the executor can expect an operator to (1) have new outputs, and (2) be done producing outputs, a bit more clear, both when an operator has work tasks and does not have work tasks
| if tasks: | |
| bar = ProgressBar(op.name, total=op.num_outputs_total()) | |
| while tasks: | |
| done, _ = ray.wait( | |
| tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1 | |
| ) | |
| for ready in done: | |
| op.notify_work_completed(ready) | |
| tasks = op.get_work_refs() | |
| while op.has_next(): | |
| bar.update(1) | |
| output.append(op.get_next()) | |
| bar.close() | |
| # An operator is finished only after it has no remaining work as well as no | |
| # remaining outputs. | |
| while op.has_next(): | |
| output.append(op.get_next()) | |
| if tasks: | |
| # Keep consuming operator outputs until all work is complete. | |
| bar = ProgressBar(op.name, total=op.num_outputs_total()) | |
| while tasks: | |
| done, _ = ray.wait( | |
| tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1 | |
| ) | |
| for ready in done: | |
| op.notify_work_completed(ready) | |
| tasks = op.get_work_refs() | |
| while op.has_next(): | |
| bar.update(1) | |
| output.append(op.get_next()) | |
| bar.close() | |
| else: | |
| # Retrieve outputs from operator with no work tasks. | |
| while op.has_next(): | |
| output.append(op.get_next()) |
There was a problem hiding this comment.
I can see where you're coming from, but I believe it is a little premature to add this assumption. I'd prefer to keep this code as is until we see there is a concrete problem, since it is more defensively written with less assumptions on operator internals.
There was a problem hiding this comment.
since it is more defensively written with less assumptions on operator internals.
I totally see the defensive approach here of always draining outputs even after the work task loop, and we can leave this as is since this isn't that important, but I want to call out that where we disagree is that I think this is about an important API contract between the executor and operators, not about the executor assuming things about operator internals.
IMO, this is about the executor + operator contract for when the executor can expect the operator to (1) have new outputs, and (2) be done producing outputs. Without a sensible contract (e.g. based on events that the executor can observe, like work tasks finishing), the executor won't know when it should invoke another while op.has_next() consuming loop, and won't know when the operator is done producing.
For example, does the executor support an operator that produces new output every 10 seconds without any internal work tasks? Definitely not!
My contention is that a sensible contract is:
- If an operator has internal work tasks, then the executor can assume that outputs are produced by those work tasks finishing, and the operator therefore only produces more outputs after a task completes (i.e. after op.notify_work_completed() is called).
- If an operator has no internal work tasks, then it has a static set of outputs that can be immediately consumed in a single pass.
And what I was pushing for above is explicitly representing this two-case contract in code. But yeah, definitely a pretty nitpicky change, not a high priority, and I think that the contract is clear enough as it is!
| op_metrics = op.get_metrics() | ||
| if op_stats: | ||
| self._stats = builder.build_multistage(op_stats) | ||
| self._stats.extra_metrics = op_metrics |
There was a problem hiding this comment.
Instead of mutating the extra_metrics field, adding a DatasetStats.set_metrics() setter would make it more clear to readers of the DatasetStats class that it is settable.
There was a problem hiding this comment.
Done.
Edit: actually, we should revisit this later, since it would be inconsistent with the use of other fields of DatasetStats as a dataclass.
Add the basic bulk executor. This is split out from #30903
…oject#31443) Add the basic bulk executor. This is split out from ray-project#30903 Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
Why are these changes needed?
Add the basic bulk executor.
This is split out from #30903