Encapsulate Worker.batched_stream.send()#6475
Conversation
| and not self.batched_stream.comm.closed() | ||
| ): | ||
| self.batched_stream.send(msg) | ||
|
|
There was a problem hiding this comment.
This method will become an @abc.abstractmethod in the WorkerBase class
| len(instructions), | ||
| ) | ||
| else: | ||
| self._handle_instructions(instructions) |
There was a problem hiding this comment.
This is old cruft from when the only possible instructions were messages
Unit Test Results 15 files + 3 15 suites +3 6h 38m 51s ⏱️ + 1h 57m 38s For more details on these failures and errors, see this check. Results for commit c4ff4c3. ± Comparison against base commit a341432. ♻️ This comment has been updated with latest results. |
|
Failure in test_transition_counter_max_worker is fixed by #6474 |
|
cc @gjoseph92 for review if you have time |
| }, | ||
| ) | ||
| elif self._status != Status.closed: | ||
| self.loop.call_later(0.05, self._send_worker_status_change, stimulus_id) |
There was a problem hiding this comment.
This retry will now be lost. I imagine this case occurs if the worker's status changes before it's connected to the scheduler. I'm not sure if this is important.
As part of #6389, we could pretty easily make BatchedSend support queuing messages before it's been started.
There was a problem hiding this comment.
This used to be necessary when I implemented it - the WorkerState on the scheduler side would never emerge out of initialising state. Now it is not needed anymore.
cf50bd0 to
a258f11
Compare
|
Ready for final review and merge |
@fail_hardcan kill the whole test suite; hide errors #6474Encapsulate calls to
self.batched_stream.sendin a way that