Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 10h 6m 8s ⏱️ - 33m 37s For more details on these failures, see this check. Results for commit eb308ba. ± Comparison against base commit dc019ed. |
| return f"<{type(self).__name__}: {len(self)} items>" | ||
|
|
||
| def __reduce__(self) -> tuple[Callable, tuple]: | ||
| heap = [(k, i, v) for k, i, vref in self._heap if (v := vref()) in self._data] |
There was a problem hiding this comment.
is the worker state ever pickled from a different thread than where it's being mutated?
| heap = [(k, i, v) for k, i, vref in self._heap if (v := vref()) in self._data] | |
| heap = [(k, i, v) for k, i, vref in self._heap.copy() if (v := vref()) in self._data] |
There was a problem hiding this comment.
unlikely but not impossible.
Standard pattern (beyond the current benchmarkign stuff in dask/dask-benchmarks#50)
would be something like client.run or a newer version of client.dump_cluster_state which would all be executing on the main htead.
However, we cannot protect from improper usages, e.g.
def task():
return pickle.dumps(get_worker().state)
client.submit(task)is valid dask but would pickle in another thread. I don't think we should protect or guard against this if it is expensive/ _heap is a list here, isn't it? a shallow copy should be fine but I'm not convinced it is necessary.
There was a problem hiding this comment.
None of the class is thread-safe; I don't see why pickling should be an exception.
Also, there's no guarantee that list.copy() will hold the GIL - if it does, it's an implementation detail.
There was a problem hiding this comment.
P.S. the multithreading issue would be prevented with either client.run or an async task.
There was a problem hiding this comment.
list and dict copying is atomic
There was a problem hiding this comment.
We perhaps should explicitly document what non-trivial operations are atomical (for example list and dict copying is atomic) and whether atomacity is the part of the language specification or CPython implementation detail.
For as long as it's not explicitly documented, we must treat it as an implementation detail we can't rely upon
| ) | ||
| ) | ||
| ws.handle_stimulus(UpdateDataEvent(data={"y": 123}, report=False, stimulus_id="s")) | ||
| ws2 = pickle.loads(pickle.dumps(ws)) |
Co-authored-by: Thomas Grainger <tagrain@gmail.com>
Provide initial support for pickling the WorkerState.
This is not (yet) used for dumping; consensus should be reached before we do anything about it.
For the time being, this is necessary because of dask/dask-benchmarks#50.