Conversation
|
This PR is based on top of #5954 and should be rebased when #5954 is merged. TODO:
|
Unit Test Results 12 files ± 0 12 suites ±0 6h 19m 36s ⏱️ - 16m 38s For more details on these failures, see this check. Results for commit 2888b26. ± Comparison against base commit 6dd928b. ♻️ This comment has been updated with latest results. |
|
From #5872
and #5987 (comment)
|
distributed/tests/test_client.py
Outdated
|
|
||
| async def get_story(self, *args, **kw): | ||
| await self.unblock_worker.wait() | ||
| return super().get_story(*args, **kw) |
There was a problem hiding this comment.
| return super().get_story(*args, **kw) | |
| return await super().get_story(*args, **kw) |
distributed/client.py
Outdated
| return await task | ||
| except Exception: | ||
| if on_error == "raise": | ||
| task.cancel() |
There was a problem hiding this comment.
the task will always be done at this point - because of the return await task
distributed/scheduler.py
Outdated
| bits = [ | ||
| (ws, await self.rpc.connect(ws.address)) for ws in self.workers.values() | ||
| ] | ||
| tasks = [] | ||
|
|
||
| for _, worker_comm in bits: | ||
| coro = send_recv(comm=worker_comm, reply=True, op="get_story", keys=keys) | ||
| tasks.append(asyncio.ensure_future(coro)) | ||
|
|
||
| try: | ||
| worker_stories = await asyncio.gather( | ||
| *tasks, return_exceptions=on_error == "ignore" | ||
| ) | ||
| except Exception: | ||
| for task in tasks: | ||
| task.cancel() | ||
|
|
||
| raise | ||
| finally: | ||
| for worker_state, worker_comm in bits: | ||
| self.rpc.reuse(worker_state.address, worker_comm) |
There was a problem hiding this comment.
I think reproducing the pattern from
distributed/distributed/scheduler.py
Lines 6155 to 6187 in 5c7d555
|
Closed by #6161 |
Client.story- Support collecting cluster-wide story for a key or stimulus ID #5872pre-commit run --all-files