|
26 | 26 | Iterable, |
27 | 27 | Iterator, |
28 | 28 | Mapping, |
29 | | - Sequence, |
30 | 29 | Set, |
31 | 30 | ) |
32 | 31 | from contextlib import suppress |
@@ -1493,8 +1492,7 @@ class SchedulerState: |
1493 | 1492 | #: Workers that are currently in running state |
1494 | 1493 | running: set[WorkerState] |
1495 | 1494 | #: Workers that are currently in running state and not fully utilized |
1496 | | - #: (actually a SortedDict, but the sortedcontainers package isn't annotated) |
1497 | | - idle: dict[str, WorkerState] |
| 1495 | + idle: set[WorkerState] |
1498 | 1496 | #: Workers that are fully utilized. May include non-running workers. |
1499 | 1497 | saturated: set[WorkerState] |
1500 | 1498 | total_nthreads: int |
@@ -1595,7 +1593,7 @@ def __init__( |
1595 | 1593 | self.clients["fire-and-forget"] = ClientState("fire-and-forget") |
1596 | 1594 | self.extensions = {} |
1597 | 1595 | self.host_info = host_info |
1598 | | - self.idle = SortedDict() |
| 1596 | + self.idle = set() |
1599 | 1597 | self.n_tasks = 0 |
1600 | 1598 | self.resources = resources |
1601 | 1599 | self.saturated = set() |
@@ -2055,7 +2053,7 @@ def decide_worker_rootish_queuing_disabled( |
2055 | 2053 | # See root-ish-ness note below in `decide_worker_rootish_queuing_enabled` |
2056 | 2054 | assert math.isinf(self.WORKER_SATURATION) |
2057 | 2055 |
|
2058 | | - pool = self.idle.values() if self.idle else self.running |
| 2056 | + pool = self.idle or self.running |
2059 | 2057 | if not pool: |
2060 | 2058 | return None |
2061 | 2059 |
|
@@ -2126,7 +2124,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None: |
2126 | 2124 |
|
2127 | 2125 | # Just pick the least busy worker. |
2128 | 2126 | # NOTE: this will lead to worst-case scheduling with regards to co-assignment. |
2129 | | - ws = min(self.idle.values(), key=lambda ws: len(ws.processing) / ws.nthreads) |
| 2127 | + ws = min(self.idle, key=lambda ws: len(ws.processing) / ws.nthreads) |
2130 | 2128 | if self.validate: |
2131 | 2129 | assert not _worker_full(ws, self.WORKER_SATURATION), ( |
2132 | 2130 | ws, |
@@ -2165,43 +2163,12 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None: |
2165 | 2163 | # If there were no restrictions, `valid_workers()` didn't subset by `running`. |
2166 | 2164 | valid_workers = self.running |
2167 | 2165 |
|
2168 | | - if ts.dependencies or valid_workers is not None: |
2169 | | - ws = decide_worker( |
2170 | | - ts, |
2171 | | - self.running, |
2172 | | - valid_workers, |
2173 | | - partial(self.worker_objective, ts), |
2174 | | - ) |
2175 | | - else: |
2176 | | - # TODO if `is_rootish` would always return True for tasks without dependencies, |
2177 | | - # we could remove all this logic. The rootish assignment logic would behave |
2178 | | - # more or less the same as this, maybe without gauranteed round-robin though? |
2179 | | - # This path is only reachable when `ts` doesn't have dependencies, but its |
2180 | | - # group is also smaller than the cluster. |
2181 | | - |
2182 | | - # Fastpath when there are no related tasks or restrictions |
2183 | | - worker_pool = self.idle or self.workers |
2184 | | - # FIXME idle and workers are SortedDict's declared as dicts |
2185 | | - # because sortedcontainers is not annotated |
2186 | | - wp_vals = cast("Sequence[WorkerState]", worker_pool.values()) |
2187 | | - n_workers: int = len(wp_vals) |
2188 | | - if n_workers < 20: # smart but linear in small case |
2189 | | - ws = min(wp_vals, key=operator.attrgetter("occupancy")) |
2190 | | - assert ws |
2191 | | - if ws.occupancy == 0: |
2192 | | - # special case to use round-robin; linear search |
2193 | | - # for next worker with zero occupancy (or just |
2194 | | - # land back where we started). |
2195 | | - wp_i: WorkerState |
2196 | | - start: int = self.n_tasks % n_workers |
2197 | | - i: int |
2198 | | - for i in range(n_workers): |
2199 | | - wp_i = wp_vals[(i + start) % n_workers] |
2200 | | - if wp_i.occupancy == 0: |
2201 | | - ws = wp_i |
2202 | | - break |
2203 | | - else: # dumb but fast in large case |
2204 | | - ws = wp_vals[self.n_tasks % n_workers] |
| 2166 | + ws = decide_worker( |
| 2167 | + ts, |
| 2168 | + self.running, |
| 2169 | + valid_workers, |
| 2170 | + partial(self.worker_objective, ts), |
| 2171 | + ) |
2205 | 2172 |
|
2206 | 2173 | if self.validate and ws is not None: |
2207 | 2174 | assert self.workers.get(ws.address) is ws |
@@ -3038,10 +3005,10 @@ def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0): |
3038 | 3005 | else not _worker_full(ws, self.WORKER_SATURATION) |
3039 | 3006 | ): |
3040 | 3007 | if ws.status == Status.running: |
3041 | | - idle[ws.address] = ws |
| 3008 | + idle.add(ws) |
3042 | 3009 | saturated.discard(ws) |
3043 | 3010 | else: |
3044 | | - idle.pop(ws.address, None) |
| 3011 | + idle.discard(ws) |
3045 | 3012 |
|
3046 | 3013 | if p > nc: |
3047 | 3014 | pending: float = occ * (p - nc) / (p * nc) |
@@ -4719,7 +4686,7 @@ async def remove_worker( |
4719 | 4686 | self.rpc.remove(address) |
4720 | 4687 | del self.stream_comms[address] |
4721 | 4688 | del self.aliases[ws.name] |
4722 | | - self.idle.pop(ws.address, None) |
| 4689 | + self.idle.discard(ws) |
4723 | 4690 | self.saturated.discard(ws) |
4724 | 4691 | del self.workers[address] |
4725 | 4692 | ws.status = Status.closed |
@@ -4994,22 +4961,19 @@ def validate_state(self, allow_overlap: bool = False) -> None: |
4994 | 4961 | if not (set(self.workers) == set(self.stream_comms)): |
4995 | 4962 | raise ValueError("Workers not the same in all collections") |
4996 | 4963 |
|
4997 | | - assert self.running.issuperset(self.idle.values()), ( |
4998 | | - self.running, |
4999 | | - list(self.idle.values()), |
5000 | | - ) |
| 4964 | + assert self.running.issuperset(self.idle), (self.running, self.idle) |
5001 | 4965 | task_group_counts: defaultdict[str, int] = defaultdict(int) |
5002 | 4966 | for w, ws in self.workers.items(): |
5003 | 4967 | assert isinstance(w, str), (type(w), w) |
5004 | 4968 | assert isinstance(ws, WorkerState), (type(ws), ws) |
5005 | 4969 | assert ws.address == w |
5006 | 4970 | if ws.status != Status.running: |
5007 | | - assert ws.address not in self.idle |
| 4971 | + assert ws not in self.idle |
5008 | 4972 | assert ws.long_running.issubset(ws.processing) |
5009 | 4973 | if not ws.processing: |
5010 | 4974 | assert not ws.occupancy |
5011 | 4975 | if ws.status == Status.running: |
5012 | | - assert ws.address in self.idle |
| 4976 | + assert ws in self.idle |
5013 | 4977 | assert not ws.needs_what.keys() & ws.has_what |
5014 | 4978 | actual_needs_what: defaultdict[TaskState, int] = defaultdict(int) |
5015 | 4979 | for ts in ws.processing: |
@@ -5307,7 +5271,7 @@ def handle_worker_status_change( |
5307 | 5271 | self.send_all(client_msgs, worker_msgs) |
5308 | 5272 | else: |
5309 | 5273 | self.running.discard(ws) |
5310 | | - self.idle.pop(ws.address, None) |
| 5274 | + self.idle.discard(ws) |
5311 | 5275 |
|
5312 | 5276 | async def handle_request_refresh_who_has( |
5313 | 5277 | self, keys: Iterable[str], worker: str, stimulus_id: str |
|
0 commit comments