Conversation
* code cleanup * code cleanup * Early exit pattern * refactor * cleanup * fix regression * annotations * revert * backend prototype * slightly faster both in CPython and Cython * slightly faster both in CPython and Cython * polish * polish * early exit * polish * polish * backend review * nonfunctional GUI prototype * GUI prototype (unpolished) * tooltip * refactor * GUI * GUI * GUI * refactor * polish * simpler tooltip * Reduce spilled size on delitem * tweak cluster-wide nbytes gauge * workers tab * Self-review * bokeh unit tests * test SpillBuffer * Code review * cython optimizations * test MemoryState * test backend * Remove unnecessary casts uint->sint * Self-review * Test edge cases * fix test failure * redesign test * relax maximums * fix test * lint * fix test * fix test * fix bar on small screens * height in em * larger * fix flaky test
|
I confirmed locally this fixes the problem for me too, thanks so much for the quick fix @crusaderky ! |
|
Stress test outcome:
|
| # ws._nbytes is updated at a different time and sizeof() may not be accurate, | ||
| # so size may be (temporarily) negative; floor it to zero. | ||
| size = max(0, (metrics["memory"] or 0) - ws._nbytes + metrics["spilled_nbytes"]) | ||
|
|
There was a problem hiding this comment.
I can't see a way to write a unit test for this short of monkey-patching SystemMonitor?
There was a problem hiding this comment.
It seems like it's going to be the only way, another test does exactly that:
distributed/distributed/tests/test_worker.py
Lines 1697 to 1717 in e4b534a
There was a problem hiding this comment.
There is distributed.admin.system-monitor.interval which controls how often the monitor runs. You could set it to incredibly high values such that it is never executed during the test runtime
Another patch version w/ using monkeypatch, you could remove the PC before it is even started. Something like
@pyters.mark.asyncio
async def test_foo():
s = Scheduler()
s.periodic_callbacks["monitor"] = None
w = Worker(s)
w.periodic_callbacks["monitor"] = None
await s
await w
async with Client(s):
....There was a problem hiding this comment.
Setting distributed.admin.system-monitor.interval to a very high value before I create the Scheduler has no effect ( I can see data arriving in the heartbeat from the SystemMonitor.update).
Setting
s.periodic_callbacks["monitor"] = None
w.periodic_callbacks["monitor"] = Nonefails with
for pc in self.periodic_callbacks.values():
> pc.stop()
E AttributeError: 'NoneType' object has no attribute 'stop'
this has no effect:
del s.periodic_callbacks["monitor"]
del w.periodic_callbacks["monitor"]this has no effect:
pc = PeriodicCallback(lamba: None, 999999999)
s.periodic_callbacks["monitor"] = pc
w.periodic_callbacks["monitor"] = pc|
Would people be ok with merging this even without a test and leaving a test for a different PR? This would unblock CI in Dask-CUDA and UCX-Py (potentially other RAPIDS projects too). |
|
I'm fine with postponing tests since our CI is also failing hard. @crusaderky your call |
|
I need an extra 2-3 hours to cook a unit test. I'm ok merging without |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks for your work here @crusaderky. I'm going to merge this as is to unblock CI, but left a comment below
| assert_memory(s, "managed_spilled", 1, 999) | ||
| # Wait for the spilling to finish. Note that this does not make the test take | ||
| # longer as we're waiting for recent_to_old_time anyway. | ||
| sleep(10) |
There was a problem hiding this comment.
Is there something more direct we can probe here instead of sleeping for 10 seconds?
There was a problem hiding this comment.
Not really because the unmanaged memory is very volatile, so we don't know how many keys are going to be spilled out exactly. Also, as noted it doesn't slow the test down.
Fix regressions introduced in #4651: