Skip to content

Commit 2b8ebf1

Browse files
committed
Scheduler.idle SortedDict -> plain set
1 parent 02b9430 commit 2b8ebf1

2 files changed

Lines changed: 18 additions & 54 deletions

File tree

distributed/scheduler.py

Lines changed: 17 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
Iterable,
2727
Iterator,
2828
Mapping,
29-
Sequence,
3029
Set,
3130
)
3231
from contextlib import suppress
@@ -1493,8 +1492,7 @@ class SchedulerState:
14931492
#: Workers that are currently in running state
14941493
running: set[WorkerState]
14951494
#: Workers that are currently in running state and not fully utilized
1496-
#: (actually a SortedDict, but the sortedcontainers package isn't annotated)
1497-
idle: dict[str, WorkerState]
1495+
idle: set[WorkerState]
14981496
#: Workers that are fully utilized. May include non-running workers.
14991497
saturated: set[WorkerState]
15001498
total_nthreads: int
@@ -1595,7 +1593,7 @@ def __init__(
15951593
self.clients["fire-and-forget"] = ClientState("fire-and-forget")
15961594
self.extensions = {}
15971595
self.host_info = host_info
1598-
self.idle = SortedDict()
1596+
self.idle = set()
15991597
self.n_tasks = 0
16001598
self.resources = resources
16011599
self.saturated = set()
@@ -2055,7 +2053,7 @@ def decide_worker_rootish_queuing_disabled(
20552053
# See root-ish-ness note below in `decide_worker_rootish_queuing_enabled`
20562054
assert math.isinf(self.WORKER_SATURATION)
20572055

2058-
pool = self.idle.values() if self.idle else self.running
2056+
pool = self.idle or self.running
20592057
if not pool:
20602058
return None
20612059

@@ -2126,7 +2124,7 @@ def decide_worker_rootish_queuing_enabled(self) -> WorkerState | None:
21262124

21272125
# Just pick the least busy worker.
21282126
# NOTE: this will lead to worst-case scheduling with regards to co-assignment.
2129-
ws = min(self.idle.values(), key=lambda ws: len(ws.processing) / ws.nthreads)
2127+
ws = min(self.idle, key=lambda ws: len(ws.processing) / ws.nthreads)
21302128
if self.validate:
21312129
assert not _worker_full(ws, self.WORKER_SATURATION), (
21322130
ws,
@@ -2165,43 +2163,12 @@ def decide_worker_non_rootish(self, ts: TaskState) -> WorkerState | None:
21652163
# If there were no restrictions, `valid_workers()` didn't subset by `running`.
21662164
valid_workers = self.running
21672165

2168-
if ts.dependencies or valid_workers is not None:
2169-
ws = decide_worker(
2170-
ts,
2171-
self.running,
2172-
valid_workers,
2173-
partial(self.worker_objective, ts),
2174-
)
2175-
else:
2176-
# TODO if `is_rootish` would always return True for tasks without dependencies,
2177-
# we could remove all this logic. The rootish assignment logic would behave
2178-
# more or less the same as this, maybe without gauranteed round-robin though?
2179-
# This path is only reachable when `ts` doesn't have dependencies, but its
2180-
# group is also smaller than the cluster.
2181-
2182-
# Fastpath when there are no related tasks or restrictions
2183-
worker_pool = self.idle or self.workers
2184-
# FIXME idle and workers are SortedDict's declared as dicts
2185-
# because sortedcontainers is not annotated
2186-
wp_vals = cast("Sequence[WorkerState]", worker_pool.values())
2187-
n_workers: int = len(wp_vals)
2188-
if n_workers < 20: # smart but linear in small case
2189-
ws = min(wp_vals, key=operator.attrgetter("occupancy"))
2190-
assert ws
2191-
if ws.occupancy == 0:
2192-
# special case to use round-robin; linear search
2193-
# for next worker with zero occupancy (or just
2194-
# land back where we started).
2195-
wp_i: WorkerState
2196-
start: int = self.n_tasks % n_workers
2197-
i: int
2198-
for i in range(n_workers):
2199-
wp_i = wp_vals[(i + start) % n_workers]
2200-
if wp_i.occupancy == 0:
2201-
ws = wp_i
2202-
break
2203-
else: # dumb but fast in large case
2204-
ws = wp_vals[self.n_tasks % n_workers]
2166+
ws = decide_worker(
2167+
ts,
2168+
self.running,
2169+
valid_workers,
2170+
partial(self.worker_objective, ts),
2171+
)
22052172

22062173
if self.validate and ws is not None:
22072174
assert self.workers.get(ws.address) is ws
@@ -3038,10 +3005,10 @@ def check_idle_saturated(self, ws: WorkerState, occ: float = -1.0):
30383005
else not _worker_full(ws, self.WORKER_SATURATION)
30393006
):
30403007
if ws.status == Status.running:
3041-
idle[ws.address] = ws
3008+
idle.add(ws)
30423009
saturated.discard(ws)
30433010
else:
3044-
idle.pop(ws.address, None)
3011+
idle.discard(ws)
30453012

30463013
if p > nc:
30473014
pending: float = occ * (p - nc) / (p * nc)
@@ -4719,7 +4686,7 @@ async def remove_worker(
47194686
self.rpc.remove(address)
47204687
del self.stream_comms[address]
47214688
del self.aliases[ws.name]
4722-
self.idle.pop(ws.address, None)
4689+
self.idle.discard(ws)
47234690
self.saturated.discard(ws)
47244691
del self.workers[address]
47254692
ws.status = Status.closed
@@ -4994,22 +4961,19 @@ def validate_state(self, allow_overlap: bool = False) -> None:
49944961
if not (set(self.workers) == set(self.stream_comms)):
49954962
raise ValueError("Workers not the same in all collections")
49964963

4997-
assert self.running.issuperset(self.idle.values()), (
4998-
self.running,
4999-
list(self.idle.values()),
5000-
)
4964+
assert self.running.issuperset(self.idle), (self.running, self.idle)
50014965
task_group_counts: defaultdict[str, int] = defaultdict(int)
50024966
for w, ws in self.workers.items():
50034967
assert isinstance(w, str), (type(w), w)
50044968
assert isinstance(ws, WorkerState), (type(ws), ws)
50054969
assert ws.address == w
50064970
if ws.status != Status.running:
5007-
assert ws.address not in self.idle
4971+
assert ws not in self.idle
50084972
assert ws.long_running.issubset(ws.processing)
50094973
if not ws.processing:
50104974
assert not ws.occupancy
50114975
if ws.status == Status.running:
5012-
assert ws.address in self.idle
4976+
assert ws in self.idle
50134977
assert not ws.needs_what.keys() & ws.has_what
50144978
actual_needs_what: defaultdict[TaskState, int] = defaultdict(int)
50154979
for ts in ws.processing:
@@ -5307,7 +5271,7 @@ def handle_worker_status_change(
53075271
self.send_all(client_msgs, worker_msgs)
53085272
else:
53095273
self.running.discard(ws)
5310-
self.idle.pop(ws.address, None)
5274+
self.idle.discard(ws)
53115275

53125276
async def handle_request_refresh_who_has(
53135277
self, keys: Iterable[str], worker: str, stimulus_id: str

distributed/stealing.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ def balance(self) -> None:
400400
with log_errors():
401401
i = 0
402402
# Paused and closing workers must never become thieves
403-
potential_thieves = set(s.idle.values())
403+
potential_thieves = s.idle.copy()
404404
if not potential_thieves or len(potential_thieves) == len(s.workers):
405405
return
406406
victim: WorkerState | None

0 commit comments

Comments
 (0)