Limit incoming data transfers by amount of data#6975
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 30m 32s ⏱️ + 2m 52s Results for commit 0a5517d. ± Comparison against base commit bfc5cfe. ♻️ This comment has been updated with latest results. |
141a9de to
0275f99
Compare
732bdf5 to
ac7ebfe
Compare
|
Note: The current implementation may cross the limit in some instances, i.e., any time we are starting to gather from a new worker. I'm working on some cleaner logic that avoids crossing the limit unless we are not gathering any data. This change may also be moved to a separate PR if we want to get this general change merged. |
I adjusted the logic and now we should never exceed the limit unless for the first task to gather to ensure that we make progress. |
| assert ws.tasks["a"].state == "fetch" | ||
| assert ws.tasks["b"].state == "flight" | ||
| assert ws.tasks["c"].state == "flight" |
There was a problem hiding this comment.
This is only deterministic because there is some inherent sorting to the python collections (e.g. dicts are insertion sorted). From first principle, the worker doesn't prefer either of these tasks over the other. I think we should write the test agnostic to this. Or asked differently, would you have known which is scheduled and which is queued before executing the test once?
how about
tasks_by_state = defaultdict(list)
for ts in ws.tasks.values():
tasks_by_state[ts.state].append(ts)
assert len(tasks_by_state["flight"]) == 2
assert len(tasks_by_state["fetch"]) == 1
# NOTE: We do not compare instructions since their sorting is random
ws.handle_stimulus(
GatherDepSuccessEvent(
worker=ws2, data={ts.key: 123 for ts in tasks_by_state['flight']}, total_nbytes=200, stimulus_id="s2"
)
)
assert all(ts.state == "memory" for ts in tasks_by_state['flight'])
assert all(ts.state == "flight" for ts in tasks_by_state['fetch'])There was a problem hiding this comment.
Fair point, this makes an effort to hide unnecessary details and limit the assertions to what's important.
distributed/worker_state_machine.py
Outdated
| to_gather: list[TaskState] = [] | ||
| total_nbytes = 0 | ||
|
|
||
| while available: | ||
| ts = available.peek() | ||
| # The top-priority task is fetched regardless of its size | ||
| if ( | ||
| to_gather | ||
| and total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes | ||
| gather_at_least_one_task = self.transfer_incoming_bytes or to_gather | ||
| exceed_message_target = ( | ||
| total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes | ||
| ) | ||
| exceed_bytes_limit = ( | ||
| self.transfer_incoming_bytes_limit is not None | ||
| and self.transfer_incoming_bytes + total_nbytes + ts.get_nbytes() | ||
| > self.transfer_incoming_bytes_limit | ||
| ) | ||
| if gather_at_least_one_task and ( | ||
| exceed_message_target or exceed_bytes_limit |
There was a problem hiding this comment.
How about
if self.transfer_incoming_bytes_limit:
bytes_left_to_fetch = min(
self.transfer_incoming_bytes_limit - self.transfer_incoming_bytes,
self.transfer_message_target_bytes,
)
else:
bytes_left_to_fetch = self.transfer_message_target_bytes
while available:
ts = available.peek()
if (
# If there is no other traffic, the top priority task may be
# fetched regardless of its size
to_gather or self.transfer_incoming_bytes
) and total_nbytes + ts.get_nbytes() > bytes_left_to_fetch:
break
for worker in ts.who_has:
# This also effectively pops from available
self.data_needed[worker].remove(ts)
to_gather.append(ts)
total_nbytes += ts.get_nbytes()
return to_gather, total_nbytesAt least subjectively this seems simpler.
There was a problem hiding this comment.
Agreed, this looks simpler.
| assert ws.tasks["a"].state == "flight" | ||
| assert ws.tasks["b"].state == "fetch" |
There was a problem hiding this comment.
similar concern about determinism as above
| assert instructions == [ | ||
| GatherDep.match( | ||
| worker=ws2, | ||
| to_gather={"a"}, | ||
| stimulus_id="s1", | ||
| ), | ||
| ] | ||
| assert ws.tasks["a"].state == "flight" |
There was a problem hiding this comment.
I think you should either assert the state or the instruction. Both is redundant.
There was a problem hiding this comment.
Removed the instruction checks
distributed/worker.py
Outdated
| if self.memory_manager.memory_limit is None | ||
| else int( | ||
| self.memory_manager.memory_limit | ||
| * dask.config.get("distributed.worker.memory.transfer") |
There was a problem hiding this comment.
according to the distributed-schema.yaml distributed.worker.memory.transfer is allowed to be False. This would raise
There was a problem hiding this comment.
Good catch, thanks!
hendrikmakait
left a comment
There was a problem hiding this comment.
Incorporated review feedback
distributed/worker.py
Outdated
| if self.memory_manager.memory_limit is None | ||
| else int( | ||
| self.memory_manager.memory_limit | ||
| * dask.config.get("distributed.worker.memory.transfer") |
There was a problem hiding this comment.
Good catch, thanks!
distributed/worker_state_machine.py
Outdated
| to_gather: list[TaskState] = [] | ||
| total_nbytes = 0 | ||
|
|
||
| while available: | ||
| ts = available.peek() | ||
| # The top-priority task is fetched regardless of its size | ||
| if ( | ||
| to_gather | ||
| and total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes | ||
| gather_at_least_one_task = self.transfer_incoming_bytes or to_gather | ||
| exceed_message_target = ( | ||
| total_nbytes + ts.get_nbytes() > self.transfer_message_target_bytes | ||
| ) | ||
| exceed_bytes_limit = ( | ||
| self.transfer_incoming_bytes_limit is not None | ||
| and self.transfer_incoming_bytes + total_nbytes + ts.get_nbytes() | ||
| > self.transfer_incoming_bytes_limit | ||
| ) | ||
| if gather_at_least_one_task and ( | ||
| exceed_message_target or exceed_bytes_limit |
There was a problem hiding this comment.
Agreed, this looks simpler.
| assert ws.tasks["a"].state == "flight" | ||
| assert ws.tasks["b"].state == "fetch" |
| assert instructions == [ | ||
| GatherDep.match( | ||
| worker=ws2, | ||
| to_gather={"a"}, | ||
| stimulus_id="s1", | ||
| ), | ||
| ] | ||
| assert ws.tasks["a"].state == "flight" |
There was a problem hiding this comment.
Removed the instruction checks
| assert ws.tasks["a"].state == "fetch" | ||
| assert ws.tasks["b"].state == "flight" | ||
| assert ws.tasks["c"].state == "flight" |
There was a problem hiding this comment.
Fair point, this makes an effort to hide unnecessary details and limit the assertions to what's important.
Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
Closes #6208
pre-commit run --all-files