Conversation
| def cogroup_objective(self, cogroup: int, ws: WorkerState) -> tuple: | ||
| # Cogroups are not always connected subgraphs but if we assume they | ||
| # were, only the top prio task would need a transfer | ||
| tasks_in_group = self.cogroups[cogroup] | ||
| # TODO: this could be made more efficient / we should remeber max if it is required | ||
| ts_top_prio = max(tasks_in_group, key=lambda ts: ts.priority) | ||
| dts: TaskState | ||
| comm_bytes: int = 0 | ||
| cotasks_on_worker = 0 | ||
| for ts in tasks_in_group: | ||
| if ts in ws.processing or ws in ts.who_has: | ||
| cotasks_on_worker += 1 | ||
| for dts in ts_top_prio.dependencies: | ||
| if ( | ||
| # This is new compared to worker_objective | ||
| (dts not in tasks_in_group or dts not in ws.processing) | ||
| and ws not in dts.who_has | ||
| ): | ||
| nbytes = dts.get_nbytes() | ||
| comm_bytes += nbytes | ||
|
|
||
| stack_time: float = ws.occupancy / ws.nthreads | ||
| start_time: float = stack_time + comm_bytes / self.bandwidth | ||
|
|
||
| if ts_top_prio.actor: | ||
| raise NotImplementedError("Cogroup assignment for actors not implemented") | ||
| else: | ||
| return (-cotasks_on_worker, start_time, ws.nbytes) |
There was a problem hiding this comment.
This is a very naive way to decide where to put the task. We could also use a similar approach to #7076 but this felt minimal invasice
| if ts.cogroup is not None: | ||
| decider = self.decide_worker_cogroup | ||
| else: | ||
| if not (ws := self.decide_worker_non_rootish(ts)): | ||
| return {ts.key: "no-worker"}, {}, {} | ||
| decider = self.decide_worker_non_rootish | ||
|
|
||
| if not (ws := decider(ts)): | ||
| return {ts.key: "no-worker"}, {}, {} |
There was a problem hiding this comment.
As already stated, I haven't dealt with queuing, yet. The structure of all the decide functions felt sufficiently confusing that I didn't know where to put the new logic. Should not be too difficult but will require some thought. I mostly wanted to verify the core logic quickly
distributed/scheduler.py
Outdated
| cogroups = coassignmnet_groups(sorted_tasks[::-1], start=start) | ||
| self.cogroups.update(cogroups) |
There was a problem hiding this comment.
TODO: Somewhere we'd need to handle cleanup of Scheduler.cogroups
| while len(next.dependents) == 1: | ||
| dep = list(next.dependents)[0] | ||
| if len(dep.dependencies) != 1: | ||
| # This algorithm has the shortcoming that groups may grow too large if the dependent of a group | ||
| group_dependents_seen.add(dep) | ||
| break | ||
| next = dep |
There was a problem hiding this comment.
Two things where this deviates from the original whiteboard implementation
- I ended up walking linear chains after all. This may not be necessary after 2.) any more, I haven't checked.
- I'm breaking early by excluding any dependents of groups. This is a but ugly but pragmatic.
| nthreads=[("", 1)] * 6, | ||
| config={"distributed.scheduler.worker-saturation": 1.0}, | ||
| ) | ||
| async def test_utilization_over_co_assignment(c, s, *workers): |
There was a problem hiding this comment.
I copied these over from #7076 but they are not working yet
|
@fjetter we had the same idea for a fun weekend project, I also put together a prototype of this on the train a couple days ago. I think you've gotten further than me, but I'll push up my branch since we did some things a little differently and it might be interesting to compare. Overall, I got as far as discovering that it didn't do well with widely-shared dependencies, or fan-in tasks in tree reductions. I may have missed something in the implementation though. I'll show you a couple of tests for that that were failing, let's see if they work on your branch. |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 14 files - 1 14 suites - 1 0s ⏱️ - 6h 17m 3s For more details on these failures, see this check. Results for commit 1344f9c. ± Comparison against base commit 6002e72. ♻️ This comment has been updated with latest results. |
distributed/scheduler.py
Outdated
| self.keys.discard(key) | ||
|
|
||
|
|
||
| def coassignmnet_groups( |
There was a problem hiding this comment.
| def coassignmnet_groups( | |
| def coassignment_groups( |
distributed/scheduler.py
Outdated
| while len(next.dependents) == 1: | ||
| dep = list(next.dependents)[0] | ||
| if len(dep.dependencies) != 1: | ||
| # This algorithm has the shortcoming that groups may grow too large if the dependent of a group |
There was a problem hiding this comment.
Incomplete comment? "If the dependent of a group ..."
There was a problem hiding this comment.
So we find a task and then walk dependents (recursively) (as long as there is only one dependent) and add them to the group until we find a task that has more than a single dependency?
There was a problem hiding this comment.
I fixed this already but didn't push the commit... 🤦
| break | ||
| next = dep | ||
| max_prio = tasks.index(next) + 1 | ||
| groups[group] = set(tasks[min_prio:max_prio]) |
There was a problem hiding this comment.
| groups[group] = set(tasks[min_prio:max_prio]) | |
| tasks = set(tasks[min_prio:max_prio]) | |
| for ts in tasks: | |
| ts.cogroup = group | |
| groups[group] = tasks |
Rationale: this connection between TaskState and cogroup data structures must be maintained, best to do so at construction time, rather than having to remember that things are done later.
There was a problem hiding this comment.
I chose not to do this s.t. coassignment_groups is a pure function. Much easier to test and reason about. Might be slightly worse in performance but I doubt this will be relevant
| for gr_ix, tss in self.cogroups.items(): | ||
| for ts in tss: | ||
| ts.cogroup = gr_ix |
There was a problem hiding this comment.
If one wants to go with a plain dict for maintaining cogroups, I think it would make more sense if this invariant were maintained in coassignment_groups (see below).
There was a problem hiding this comment.
I haven't decided, yet, what to use to maintain this. Maintenance of this structure is not implemented yet (e.g. we're not cleaning it up again). For now, I am using a dict for simplicity. I'm also not set on gr_ix being an integer fwiw
|
|
||
|
|
||
| def coassignmnet_groups( | ||
| tasks: Sequence[TaskState], start: int = 0 |
There was a problem hiding this comment.
OK, so tasks is a list of taskstates sorted in increasing priority order.
There was a problem hiding this comment.
yes, I wanted to add a TODO to verify this but this is guaranteed in update_graph so for this prototype, it works
|
@fjetter @wence- this was my implementation: https://github.com/gjoseph92/distributed/pull/6/files. Just in case it's useful for comparison. |
This is an implementation of an algorithm we discussed in an offline work session that tries to combine tasks into groups based on whether or not they should be co-located to reduce network traffic and RAM.
I called these groups "co assignment groups" or in short, cogroups. The idea is basically to lean on dask.order and use "jumps" in priority to detect branches. cc @eriknw I would be very interested if something like this can not be returned directly from one of the dask.order functions.
This is leaning on an earlier attempt for this in #7076
This implementation is still incomplete. Particularly, what's missing is
Raw notes from the offline workshop (I'll open another issue shortly to summarize)