Skip to content

[data] New executor [4/n]--- Add bulk executor implementation#31443

Merged
ericl merged 7 commits intoray-project:masterfrom
ericl:part-4
Jan 5, 2023
Merged

[data] New executor [4/n]--- Add bulk executor implementation#31443
ericl merged 7 commits intoray-project:masterfrom
ericl:part-4

Conversation

@ericl
Copy link
Copy Markdown
Contributor

@ericl ericl commented Jan 4, 2023

Why are these changes needed?

Add the basic bulk executor.

This is split out from #30903

ericl added 4 commits January 4, 2023 13:15
Signed-off-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Eric Liang <ekhliang@gmail.com>
for i, (k, v) in enumerate(stages.items()):
if i == 0:
stage_infos[self.stage_name + "_" + k] = v
if len(stages) > 1:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change just allows build_multistage() to be used in all scenarios as a simplification.

stage_uuid = self.dataset_uuid + stage_name
# TODO(ekl) deprecate and remove the notion of dataset UUID once we move
# fully to streaming execution.
stage_uuid = (self.dataset_uuid or "unknown_uuid") + stage_name
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a hack in the followup PR to set this correctly. For now, adding a default so that we don't need to be adding dataset_uuid to the execute() API if we are removing it in the future.

Signed-off-by: Eric Liang <ekhliang@gmail.com>
Copy link
Copy Markdown
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, got a few nits and a few questions about the stats, but wouldn't block this PR on anything since the bulk executor is a stopgap.

def execute_recursive(op: PhysicalOperator) -> List[RefBundle]:
# Avoid duplicate executions.
if op in saved_outputs:
return saved_outputs[op]
Copy link
Copy Markdown
Contributor

@clarkzinzow clarkzinzow Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could we have test coverage for duplicate execution elimination?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this brings up an interesting issue of supporting Union() DAGs properly. I think this is currently forcing materialization. Let me add a TODO to address this more holistically (could be something @jianoaix can help out on next).

if initial_stats:
self._stats = initial_stats

saved_outputs: Dict[PhysicalOperator, List[RefBundle]] = {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just calling out that the default hash function for PhysicalOperator will be based on its memory address, so if it's accidentally copied somewhere, we will be silently performing duplicate executions. Not likely to happen with how the operators will be used, though, and manually defining __hash__ for operators seems more cumbersome than it's worth.


saved_outputs: Dict[PhysicalOperator, List[RefBundle]] = {}

def execute_recursive(op: PhysicalOperator) -> List[RefBundle]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Supernit: If we make the BulkExecutor single-execution, then we could make this a utility method (with the saved_outputs execution cache an instance attribute) rather than a dynamically defined function. Probably not much benefit in this, other than it maybe being more testable (e.g. being able to mock out the recursive utility method, being able to mock out/inject into saved_outputs, etc.).


# Fully execute this operator.
logger.debug("Executing op %s", op.name)
builder = self._stats.child_builder(op.name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering why this builder is created here if it isn't used until line 70 when building the stats, and then remembered that this is needed to initialize the start time before executing the operator.

We could make this dependency more explicit by registering the parent stats with the PhysicalOperator (e.g. op.init_stats(self._stats)), have the operator internally create the builder, and have op.get_stats() return the actual built DatasetStats object, which would have the added advantage of not needing to expose the op.get_metrics() API (adding the metrics to the stats would be an implementation detail of op.get_stats()).

Are there any pros to keeping the stats building logic in the executors? Will the StreamingExecutor need to do anything differently?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think the main thing is avoiding Operators needing to know about DatasetStats. This gives us a bit more loose coupling between components.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct there is a tradeoff though, maybe we can reconsider when implementing StreamingExecutor.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good point, operators only needing to know about block metadata (no stats objects) is nice.

You are correct there is a tradeoff though, maybe we can reconsider when implementing StreamingExecutor.

That sounds like a perfect time to look at this again, sounds good! 👍

saved_outputs[op] = output
op_stats = op.get_stats()
op_metrics = op.get_metrics()
if op_stats:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions about this:

  1. So there would primarily be no stats for e.g. InputDataBuffer operators, but what about other operators?
  2. If we're not setting self._stats in this case, are we making the assumption here that (a) we want to skip this generation (i.e. where the child operator is parented under the grandparent), (b) operators without stats will have no downstream operators (not the case for InputDataBuffer, obviously, but maybe other no-stats operators), or (c) one of the other parents of the child operator will have stats defined?
  3. With the depth-first traversal of the DAG, we're going to set the parent stats for a given operator to the last parent operator in its op.input_dependencies list; is this always what we want?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. That's right. It's an optional method.
  2. I think the expectation is that all operators will implement stats, except maybe the input, so maybe this is moot.
  3. Probably not, but I think this circles back to the broader issue of supporting true DAGs (e.g., Union).

Comment on lines +91 to +107
if tasks:
bar = ProgressBar(op.name, total=op.num_outputs_total())
while tasks:
done, _ = ray.wait(
tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1
)
for ready in done:
op.notify_work_completed(ready)
tasks = op.get_work_refs()
while op.has_next():
bar.update(1)
output.append(op.get_next())
bar.close()
# An operator is finished only after it has no remaining work as well as no
# remaining outputs.
while op.has_next():
output.append(op.get_next())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Going back to this comment, by making this an if-else and adding some comments, we might be able to make this contract around when the executor can expect an operator to (1) have new outputs, and (2) be done producing outputs, a bit more clear, both when an operator has work tasks and does not have work tasks

Suggested change
if tasks:
bar = ProgressBar(op.name, total=op.num_outputs_total())
while tasks:
done, _ = ray.wait(
tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1
)
for ready in done:
op.notify_work_completed(ready)
tasks = op.get_work_refs()
while op.has_next():
bar.update(1)
output.append(op.get_next())
bar.close()
# An operator is finished only after it has no remaining work as well as no
# remaining outputs.
while op.has_next():
output.append(op.get_next())
if tasks:
# Keep consuming operator outputs until all work is complete.
bar = ProgressBar(op.name, total=op.num_outputs_total())
while tasks:
done, _ = ray.wait(
tasks, num_returns=len(tasks), fetch_local=True, timeout=0.1
)
for ready in done:
op.notify_work_completed(ready)
tasks = op.get_work_refs()
while op.has_next():
bar.update(1)
output.append(op.get_next())
bar.close()
else:
# Retrieve outputs from operator with no work tasks.
while op.has_next():
output.append(op.get_next())

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see where you're coming from, but I believe it is a little premature to add this assumption. I'd prefer to keep this code as is until we see there is a concrete problem, since it is more defensively written with less assumptions on operator internals.

Copy link
Copy Markdown
Contributor

@clarkzinzow clarkzinzow Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it is more defensively written with less assumptions on operator internals.

I totally see the defensive approach here of always draining outputs even after the work task loop, and we can leave this as is since this isn't that important, but I want to call out that where we disagree is that I think this is about an important API contract between the executor and operators, not about the executor assuming things about operator internals.

IMO, this is about the executor + operator contract for when the executor can expect the operator to (1) have new outputs, and (2) be done producing outputs. Without a sensible contract (e.g. based on events that the executor can observe, like work tasks finishing), the executor won't know when it should invoke another while op.has_next() consuming loop, and won't know when the operator is done producing.

For example, does the executor support an operator that produces new output every 10 seconds without any internal work tasks? Definitely not!

My contention is that a sensible contract is:

  1. If an operator has internal work tasks, then the executor can assume that outputs are produced by those work tasks finishing, and the operator therefore only produces more outputs after a task completes (i.e. after op.notify_work_completed() is called).
  2. If an operator has no internal work tasks, then it has a static set of outputs that can be immediately consumed in a single pass.

And what I was pushing for above is explicitly representing this two-case contract in code. But yeah, definitely a pretty nitpicky change, not a high priority, and I think that the contract is clear enough as it is!

op_metrics = op.get_metrics()
if op_stats:
self._stats = builder.build_multistage(op_stats)
self._stats.extra_metrics = op_metrics
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of mutating the extra_metrics field, adding a DatasetStats.set_metrics() setter would make it more clear to readers of the DatasetStats class that it is settable.

Copy link
Copy Markdown
Contributor Author

@ericl ericl Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Edit: actually, we should revisit this later, since it would be inconsistent with the use of other fields of DatasetStats as a dataclass.

@ericl ericl merged commit 11089fa into ray-project:master Jan 5, 2023
AmeerHajAli pushed a commit that referenced this pull request Jan 12, 2023
Add the basic bulk executor.

This is split out from #30903
tamohannes pushed a commit to ju2ez/ray that referenced this pull request Jan 25, 2023
…oject#31443)

Add the basic bulk executor.

This is split out from ray-project#30903

Signed-off-by: tmynn <hovhannes.tamoyan@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants