Encapsulate spill buffer and memory_monitor#5904
Conversation
40c584f to
8326ac0
Compare
| services=None, | ||
| name=None, | ||
| memory_limit="auto", | ||
| memory_terminate_fraction: float | Literal[False] | None = None, |
There was a problem hiding this comment.
This init parameter is a very recent addition so I think it's safe not to have a deprecation cycle
There was a problem hiding this comment.
Agreed, it doesn't look like there have been any releases since it was introduced:
$ git log a86f4bb568b5aeb60f5a2a8e24f86592a407b09d~1..HEAD --oneline
4918d652 Merge monitor-interval for spill/pause and terminate
47b4ea19 Merge branch 'main' into spill_extension
7a69b5e2 Prevent data duplication on unspill (#5936)
1a0548b6 distributed.worker_memory
2fffe74d Worker State Machine refactor: redesign TaskState and scheduler messages (#5922)
85bf1beb absolufy-imports (#5924)
925c6100 Tidying of OpenSSL 1.0.2/Python 3.9 (and earlier) handling (#5854)
60ce8436 Fix `track_features` for distributed pre-releases (#5927)
2d68dfc8 Add key to compute failed message (#5928)
f9d2f914 zict type annotations (#5905)
30f0b601 Support dumping cluster state to URL (#5863)
936fba5a Xfail test_submit_different_names (#5916)
e1e43858 Change default log format to include timestamp (#5897)
de94b408 Unblock event loop while waiting for ThreadpoolExecutor to shut down (#5883)
39c5e885 handle concurrent or failing handshakes in InProcListener (#5903)
b3f50cef add GitHub URL for PyPi (#5886)
8c98ad8c fix progress_stream teardown (#5823)
be4fc7f7 Drop unused `_round_robin` global variable (#5881)
ca235dd6 Mark xfail COMPILED tests skipif instead (#5884)
16931cc8 Improve type annotations in worker.py (#5814)
a86f4bb5 Mock process memory readings in test_worker.py (v2) (#5878)0d92476 to
6022a78
Compare
| memory_target_fraction: float | Literal[False] | None = None, | ||
| memory_spill_fraction: float | Literal[False] | None = None, | ||
| memory_pause_fraction: float | Literal[False] | None = None, | ||
| max_spill: float | str | Literal[False] | None = None, |
There was a problem hiding this comment.
max_spill is a very recent addition so I didn't put it through a deprecation cycle
same for memory_monitor_interval as a init parameter
| Callable[..., MutableMapping[str, Any]], dict[str, Any] | ||
| ] # (constructor, kwargs to constructor) | ||
| | None # create internally | ||
| ) = None, |
There was a problem hiding this comment.
This seems to me like a feature for power users that is as powerful as obscure, and I would love to hear from somebody who actually uses it to understand their use case!
There was a problem hiding this comment.
I wonder if ManualEvictProto could be used in this typed declaration, but I have no strong opinion, as I'm not sure whether it would even be possible.
There was a problem hiding this comment.
No, because the type declaration says what the parameter must be; ManualEvictProto is an additional, optional interface to the MutableMapping which unlocks extra features.
5d18a87 to
60a5dc2
Compare
| "distributed.worker.memory.pause": False, | ||
| }, | ||
| ) | ||
| async def test_pause_executor_manual(c, s, a): |
| "distributed.worker.memory.monitor-interval.spill-pause": "10ms", | ||
| }, | ||
| ) | ||
| async def test_pause_executor_with_memory_monitor(c, s, a): |
There was a problem hiding this comment.
was test_worker.py::test_pause_executor
| "distributed.worker.memory.monitor-interval.spill-pause": "10ms", | ||
| }, | ||
| ) | ||
| async def test_override_data_does_not_spill(c, s, a): |
| ], | ||
| ) | ||
| @gen_cluster(nthreads=[]) | ||
| async def test_deprecated_attributes(s, cls, name, value): |
| ["memory_target_fraction", "memory_spill_fraction", "memory_pause_fraction"], | ||
| ) | ||
| @gen_cluster(nthreads=[]) | ||
| async def test_deprecated_params(s, name): |
|
@dask/gpu somebody should have a look if this affects anything in RAPIDS |
|
Outstanding:
This is otherwise complete. |
|
Thanks for pinging us here. I was already looking at the PR earlier today but it's fairly long so I couldn't finish yet, plus @crusaderky informed us that #5909 is being fixed here too. I ran Dask-CUDA tests with it and due to the obvious API changes there are more tests failing, could we hold on to merging it for another day or two until we can ensure that doesn't break compatibility in a way that can't be fixed in Dask-CUDA? If not that's ok, but we may need to revisit changes should anything arise. |
|
@pentschev take whatever time you need. Note that I just pushed an additional commit which re-adds the Worker.memory_monitor method. Downstream of it, there shouldn't be any obvious API changes - could you point them out please? |
Unit Test Results 12 files ± 0 12 suites ±0 5h 54m 17s ⏱️ + 7m 47s For more details on these failures, see this check. Results for commit 5883c96. ± Comparison against base commit 9f7027e. ♻️ This comment has been updated with latest results. |
distributed/worker_memory.py
Outdated
| if not isinstance(self.data, SpillBuffer): | ||
| return |
There was a problem hiding this comment.
Should this check be removed? Dask CUDA doesn't use a SpillBuffer, as noted here.
There was a problem hiding this comment.
Alternately, Dask CUDA should inherit from SpillBuffer - in which case, perhaps it'd be a good idea to define what interface a SpillBuffer should implement? Either as part of the docs, or codified as an ABC?
There was a problem hiding this comment.
@shwina to answer the question I would need to understand what Dask CUDA is trying to do and what it was doing before.
Is it supposed to react to the distributed.worker.memory.spill threshold?
If so, how does it do it? Does it replace the memory_monitor method, or is it duck-type compatible with SpillBuffer and offer a data.fast.evict() method? I couldn't find either. If neither is the case, does it mean that, even before #5543 and #5736, Worker.memory_monitor was crashing?
BTW I noticed now that it's not wrapped with log_errors, so the crash would be completely invisible. I'm adding it now.
There was a problem hiding this comment.
Can you point us to where the Dask CUDA data mapping is defined?
There was a problem hiding this comment.
Dask-CUDA implements two different data mappings to handle GPU->CPU spilling:
- https://github.com/rapidsai/dask-cuda/blob/branch-22.04/dask_cuda/device_host_file.py#L145
- https://github.com/rapidsai/dask-cuda/blob/branch-22.04/dask_cuda/proxify_host_file.py#L433
They implement data.fast.evict().
There was a problem hiding this comment.
I see that ProxifyHostFile has been modified to support both before and after #5543 (2022.02.1).
DeviceHostFile won't work with >=2022.02.1 as it misses an evict() method.
See my latest commit (a8ac78d) where I formalise this interface (missing high level docs and unit tests). Could you confirm that ProxifyHostFile works again now?
There was a problem hiding this comment.
DeviceHostFile won't work with >=2022.02.1 as it misses an evict() method.
Implemented now in shwina/dask-cuda#2
| worker_kwargs={"memory_limit": "1 GB", "data": UserDict}, | ||
| config={"distributed.worker.memory.monitor-interval.spill-pause": "10ms"}, | ||
| ) | ||
| async def test_override_data_vs_memory_monitor(c, s, a): |
5fd4754 to
d948828
Compare
| "distributed.worker.memory.monitor-interval.spill-pause": "10ms", | ||
| }, | ||
| ) | ||
| async def test_manual_evict_proto(c, s, a): |
| Worker Memory Management | ||
| ======================== | ||
| For cluster-wide memory-management, see :doc:`memory`. | ||
|
|
There was a problem hiding this comment.
below this point, this is just a cut-paste from worker.rst
There was a problem hiding this comment.
Thanks for your kindness to the reviewer! 😆
bc03fbc to
1a0548b
Compare
|
@shwina can you rerun your dask-cuda PR with the last from here and confirm the latest changes will work for dask-cuda ? |
I checked this with changes from rapidsai/dask-cuda#870 and all tests pass. Would be great to get this PR in before the next Dask release. |
|
After offline discussion with @fjetter, I merged the monitor-interval for spill/pause and terminate |
|
This is now missing exclusively final review and merge |
|
+1 for merging |
|
Do we know what is up with the Windows CI failures? |
|
sjperkins
left a comment
There was a problem hiding this comment.
This PR encapsulates Nanny and Worker Memory Logic in the Nanny and Worker classes into separate NannyMemoryManager and WorkerMemoryManager classes.
My comments are mostly questions and statements for my own understanding, although there are one or two nits that the author may wish to address.
| try: | ||
| return self._status | ||
| except AttributeError: | ||
| return Status.undefined |
There was a problem hiding this comment.
Minor nit: Would it not be simpler to define _status upfront in __init__?
def __init__(self, ...):
self._status = Status.undefinedThere was a problem hiding this comment.
That's already happening, but the subclasses are not calling super().__init__ straight away and that creates something that's hard to disentangle.
| services=None, | ||
| name=None, | ||
| memory_limit="auto", | ||
| memory_terminate_fraction: float | Literal[False] | None = None, |
There was a problem hiding this comment.
Agreed, it doesn't look like there have been any releases since it was introduced:
$ git log a86f4bb568b5aeb60f5a2a8e24f86592a407b09d~1..HEAD --oneline
4918d652 Merge monitor-interval for spill/pause and terminate
47b4ea19 Merge branch 'main' into spill_extension
7a69b5e2 Prevent data duplication on unspill (#5936)
1a0548b6 distributed.worker_memory
2fffe74d Worker State Machine refactor: redesign TaskState and scheduler messages (#5922)
85bf1beb absolufy-imports (#5924)
925c6100 Tidying of OpenSSL 1.0.2/Python 3.9 (and earlier) handling (#5854)
60ce8436 Fix `track_features` for distributed pre-releases (#5927)
2d68dfc8 Add key to compute failed message (#5928)
f9d2f914 zict type annotations (#5905)
30f0b601 Support dumping cluster state to URL (#5863)
936fba5a Xfail test_submit_different_names (#5916)
e1e43858 Change default log format to include timestamp (#5897)
de94b408 Unblock event loop while waiting for ThreadpoolExecutor to shut down (#5883)
39c5e885 handle concurrent or failing handshakes in InProcListener (#5903)
b3f50cef add GitHub URL for PyPi (#5886)
8c98ad8c fix progress_stream teardown (#5823)
be4fc7f7 Drop unused `_round_robin` global variable (#5881)
ca235dd6 Mark xfail COMPILED tests skipif instead (#5884)
16931cc8 Improve type annotations in worker.py (#5814)
a86f4bb5 Mock process memory readings in test_worker.py (v2) (#5878)| # Deprecated attributes; use Nanny.memory_manager.<name> instead | ||
| memory_limit = DeprecatedMemoryManagerAttribute() | ||
| memory_terminate_fraction = DeprecatedMemoryManagerAttribute() | ||
| memory_monitor = DeprecatedMemoryMonitor() |
There was a problem hiding this comment.
For my own comprehension, a descriptor protocol is used to warn and redirect use of the deprecated attribute onto the appropriate attribute inself.memory_manager: NannyMemoryManager.
| """ | ||
| ... # pragma: nocover | ||
|
|
||
|
|
There was a problem hiding this comment.
For my own comprehension, this Protocol checks that the supplied object adheres to a duck-type interface, as opposed to that imposed by an ABC class.
This is primarily used in worker_memory.WorkerMemoryManager._maybe_spill to ensure that the data: MutableMapping member adheres to the appropriate interface.
| # It is not sent to the worker. | ||
| z = c.submit(inc, 2, key="z") | ||
| while "z" not in s.tasks or s.tasks["z"].state != "no-worker": | ||
| await asyncio.sleep(0.01) |
There was a problem hiding this comment.
I understand that these potentially infinite while loops in test cases are safe because there a timeout is enforced on test case runs via @gen_cluster?
| assert w.memory_manager.memory_limit == 2e9 | ||
|
|
||
|
|
||
| @gen_cluster( |
There was a problem hiding this comment.
These look like they were moved fromtest_worker.py
There was a problem hiding this comment.
everything that I didn't comment with "new test" is moved from test_worker
|
|
||
| futures = await c.scatter({"x": None, "y": None, "z": None}) | ||
| while a.data.evicted != {"x", "y", "z"}: | ||
| await asyncio.sleep(0.01) |
There was a problem hiding this comment.
For my own comprehension, this test mocks the process memory to above the 70% limit, which forces the "x", "y" and "z" keys to be evicted as per the ManualEvictDict protocol.
| Callable[..., MutableMapping[str, Any]], dict[str, Any] | ||
| ] # (constructor, kwargs to constructor) | ||
| | None # create internally | ||
| ) = None, |
There was a problem hiding this comment.
I wonder if ManualEvictProto could be used in this typed declaration, but I have no strong opinion, as I'm not sure whether it would even be possible.
Co-authored-by: Simon Perkins <simon.perkins@gmail.com>
Co-authored-by: Simon Perkins <simon.perkins@gmail.com>
|
Is this good to merge? |
|
it is |
This PR updates dask-cuda to work with the new `WorkerMemoryManager` abstraction being introduced in dask/distributed#5904. Once both PRs are merged, and pending the resolution of https://github.com/dask/distributed/pull/5904/files#r822084806, dask-cuda CI should be unblocked. Authors: - Ashwin Srinath (https://github.com/shwina) - Peter Andreas Entschev (https://github.com/pentschev) Approvers: - Peter Andreas Entschev (https://github.com/pentschev) URL: #870
In the config, memory thresholds such as `distributed.worker.memory.terminate` should never be exactly `0.0`. Instead, config should use `false` to disable memory management. This one bit me recently. My older dask config files used `0.0` to disable the memory management features. That worked because older versions of `distributed` interpreted the value `0.0` to be the equivalent to `false` for these fields. But in newer versions, only `false` works. (I suspect the change occurred in #5904.) Nowadays, if the config says `0.0`, then `distributed` interprets that literally -- and no memory can be used at all without incurring the wrath of the memory manager! An easy "fix" is to disallow `0.0` in the user's config. In json schema, `exclusiveMinimum: 0` ensures that the value `0.0` itself is not permitted by the schema.
SpillBufferbreaks usage of customWorker(data=...)types #5909