Improve differentiation between incoming/outgoing connections and transfers#6933
Conversation
distributed/worker.py
Outdated
| incoming_count: int | ||
| outgoing_count: int | ||
| outgoing_current_count: int | ||
| incoming_transfer_count: int |
There was a problem hiding this comment.
Instead of talking about incoming_transfer/outgoing_transfer, which we already had through the {incoming|outgoing}_transfer_log, we could also talk about send/recv to further avoid confusion.
There was a problem hiding this comment.
This stuff is "public". we should add a deprecation warning (e.g. by using a property). I'm not sure if any downstream project rely on these attributes
There was a problem hiding this comment.
[EDIT] let's keep them but rename with a deprecation. Discussion on #6936 (comment)
distributed/worker_state_machine.py
Outdated
| comm_nbytes: int | ||
|
|
||
| #: The maximum number of concurrent incoming requests for data. | ||
| #: The maximum number of concurrent outgoing requests for data. |
distributed/worker.py
Outdated
| if self.status == Status.paused: | ||
| max_connections = 1 | ||
| throttle_msg = " Throttling outgoing connections because worker is paused." | ||
| throttle_msg = " Throttling incoming connections because worker is paused." |
There was a problem hiding this comment.
Fixing throttle_msg.
There was a problem hiding this comment.
These are inbound connections for outbound data. Maybe change it to "Throttling outgoing data transfers"?
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 6h 48m 11s ⏱️ + 4m 58s Results for commit a0faa5f. ± Comparison against base commit c083790. ♻️ This comment has been updated with latest results. |
distributed/worker.py
Outdated
| incoming_count: int | ||
| outgoing_count: int | ||
| outgoing_current_count: int | ||
| incoming_transfer_count: int |
There was a problem hiding this comment.
This stuff is "public". we should add a deprecation warning (e.g. by using a property). I'm not sure if any downstream project rely on these attributes
distributed/worker.py
Outdated
| @property | ||
| def incoming_count(self): | ||
| warnings.warn( | ||
| "The `Worker.incoming_count` attribute has been renamed to " | ||
| "`Worker.incoming_transfer_count`", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| return self.incoming_transfer_count | ||
|
|
||
| @property | ||
| def outgoing_count(self): | ||
| warnings.warn( | ||
| "The `Worker.outgoing_count` attribute has been renamed to " | ||
| "`Worker.outgoing_transfer_count`", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| return self.outgoing_transfer_count | ||
|
|
||
| @property | ||
| def outgoing_current_count(self): | ||
| warnings.warn( | ||
| "The `Worker.outgoing_current_count` attribute has been renamed to " | ||
| "`Worker.current_outgoing_transfer_count`", | ||
| DeprecationWarning, | ||
| stacklevel=2, | ||
| ) | ||
| return self.current_outgoing_transfer_count |
There was a problem hiding this comment.
Added DeprecationWarnings for the renamed attributes via properties. Setters have been skipped. IIUC, these attributes should not be modified by others. cc @fjetter
distributed/worker.py
Outdated
| outgoing_count: int | ||
| outgoing_current_count: int | ||
| incoming_transfer_count: int | ||
| outgoing_transfer_count: int |
There was a problem hiding this comment.
While we're at it, would total_outgoing_transfer_count be clearer for this?
There was a problem hiding this comment.
How about
outgoing_transfer_count_currentoutgoing_transfer_count_total
Pros:
- differentiation is very clear
- can be sorted alphabetically and related stuff is grouped
- code autocompletion is straight forward and a developer can search for
outgoing_stuff_*
not a strong opinion
hendrikmakait
left a comment
There was a problem hiding this comment.
Copying @crusaderky's comment for visibility:
To recap our meeting:
- rename
WorkerState.comm_nbytestocomm_incoming_bytes.
No deprecation in WorkerState needed; just change the already existingDeprecatedWorkerStateAttributein Worker- new property
WorkerState.comm_incoming_count: return len(self.in_flight_workers)- rename
Worker.outgoing_current_counttocomm_outgoing_count. Add a deprecated property (read-only).- new attribute
Worker.comm_outgoing_bytes- rename
Worker.incoming_counttocomm_incoming_cumulative_countwith a deprecated accessor- rename
Worker.outgoing_counttocomm_outgoing_cumulative_countwith a deprecated accessor- heartbeat will now return
comm: incoming_bytes: # incoming_count: # incoming_cumulative_count: # outgoing_bytes: # outgoing_count: # outgoing_cumulative_count: #
update #6933 to match. Prefer "comm" prefix to "transfers" suffix everywhere.
In addition to the suggested renamings, I have renamed Worker.{incoming|outgoing}_transfer_log to Worker.comm_{incoming|outgoing}_log for consistency.
I'm wondering if instead of stripping the word transfer out of names, we should explicitly in for anything that's a data transfer, i.e., a connection but going the other way.
distributed/worker_state_machine.py
Outdated
| if ( | ||
| len(self.in_flight_workers) >= self.total_out_connections | ||
| and self.comm_nbytes >= self.comm_threshold_bytes | ||
| self.comm_incoming_count >= self.total_out_connections |
There was a problem hiding this comment.
After implementing those changes, I am not a huge fan of comm_incoming_count. This line illustrates why:
Why do we compare comm_incoming_count to total_out_connections? How are they related?
comm_incoming_count still feels vaguely defined to me. This might be because comm_ feels like a namespacing prefix for everything communications-related, so what are we counting exactly in that namespace that's incoming? I might just need to get more used to the fact that when we're talking about comm_, we tend to mean Comm object. Then again, that Comm object feels equivalent to a connection, so why is the comm incoming while the connection is outgoing? Should we get transfer back into the naming here?
There was a problem hiding this comment.
Happy to rename comm to transfer everywhere.
comm_incoming_count -> transfer_incoming_count_current
total_out_connections -> transfer_incoming_count_max
|
Updated naming on #6936. Please let's keep all naming discussion there from now on. |
8bdf7a0 to
4b073aa
Compare
| "Ready": [len(w.state.ready)], | ||
| "Waiting": [w.state.waiting_for_data_count], | ||
| "Connections": [len(w.state.in_flight_workers)], | ||
| "Connections": [w.state.transfer_incoming_count], |
There was a problem hiding this comment.
For now, I don't want to touch "Connections".
| @gen_cluster() | ||
| async def test_deprecated_worker_attributes(s, a, b): | ||
| n = a.state.comm_threshold_bytes | ||
| n = a.state.target_message_size |
There was a problem hiding this comment.
This tests the use of a deprecated attribute with the same name on the WorkerState. Since we renamed comm_threshold_bytes, I had to pick another variable.
fjetter
left a comment
There was a problem hiding this comment.
The abundance transfer_-prefixed variables might indicate a missing abstraction
This is just about instrumentation. We could introduce a dataclass holding these things but I think that'd be overkill.
I don't want to do this now, it might be worth taking a closer look at this once/if we are diving into things like dynamically limiting comms. Currently, the comms-related logic is spread around |
No, it should not rely on dynamic memory measurements. We're not actually measuring any memory here. We're running off of data provided by the scheduler |
Agreed, as I said, this note was merely meant to be kept in mind for future work such as the dynamic limiting ideas mentioned in #6212 or #6208 (comment). |
|
Thank you! |
The unclear differentiation between incoming/outgoing connections and transfers has made it difficult for me to understand what we are doing. In addition, there were two mistakes in docstrings/messages that we fix.
Following #6936 (comment) while assuming some small liberties, this PR now implements the following changes:
Notes:
currentsuffixtotalsuffix to highlight cumulative sums.cumulativetotal, it's currenttransfer_-prefixed variables might indicate a missing abstractionpre-commit run --all-files