Semaphore: Ensure semaphore picks correct IOLoop for threadpool workers#4060
Conversation
| with dask.config.set( | ||
| { | ||
| "distributed.scheduler.locks.lease-validation-interval": 0.01, | ||
| "distributed.scheduler.locks.lease-timeout": 0.05, |
There was a problem hiding this comment.
One comment here: during some quick testing, it seems like there are some delays with these timeouts. I think the semaphore released after 10 seconds when I set the timeout to 200ms, also when setting it to low refrehs didn't refresh quick enough (I think that was < 20ms though)
There was a problem hiding this comment.
Can you reproduce this? If so, another ticket would be appreciated.
There was a problem hiding this comment.
I'm not sure. The settings might be a bit confusing. You can set the timeout much lower then the validation interval, which might have caused it. Although the default is 1 second?
There was a problem hiding this comment.
Testing it locally: it looks like the test is actually fine, seems like a timeout can occur the latest at validation-interval + lease-timeout.
There was a problem hiding this comment.
if the timeout is lower than the validation interval this guarantees every lease to timeout. We do not check for this, though.
interval: how often does the scheduler check the leases
timeout: what's the maximum time between refreshs before we consider the lease stale
I just realised that the description in the config is very misleading. I'll adjust this. Thanks
distributed/semaphore.py
Outdated
| # Ensure that the PC below takes the correct IOLoop | ||
| if not self.client.io_loop.is_current: | ||
| self.client.io_loop.make_current() |
There was a problem hiding this comment.
Instead I recommend that we start (and maybe create) the periodic callback in the __await__ method. Changing default event loops in different threads seems like it might cause trouble later on.
There was a problem hiding this comment.
Setting this in __await__ is not sufficient since the await is usually only called on client side. The PC must be registered on worker side, though, since the worker acquires the lease and needs to take care of the refresh. Even if it was awaited on the worker, we would still have the same problem, wouldn't we?
There was a problem hiding this comment.
I could reset the default once the PC is created and started. Not really nice but I'm not sure how else to deal with this as long as the PC doesn't allow to set the event loop (I think this was possible pre 5.0 but I do not know why it was removed)
There was a problem hiding this comment.
Setting this in await is not sufficient since the await is usually only called on client side
Hrm, I see. My expectation was that we would create things in __init__ and then either await, or call sync(await) when creating these objects. I see that that isn't always the case though.
In that case I would use client.loop.add_callback to make sure that this gets run in the event loop.
I think that it should work to create the PeriodicCallback here and then start it in the event loop
self.refresh_callback = PeriodicCallback(...)
self.client.loop.add(self.refresh_callback.start)There was a problem hiding this comment.
I noticed that in a previous comment @pvanderlinden said that this might not work. I would be a little bit surprised by this.
There was a problem hiding this comment.
but I do not know why it was removed
I think that the compromise that we worked out with the Tornado team was that the event loop would be set in pc.start, so we just needed to make sure that that gets run in the right place.
There was a problem hiding this comment.
@mrocklin That's in the issue not in the PR, but I was using IOLoop.current() so that's why it didn't work.
Should the refresh_callback be started on acquire, which is already a coroutine?
There was a problem hiding this comment.
Indeed, the add_callback seems to work. I'm wondering how other PCs are affected and if we should replace all pc.start() with this.
Should the refresh_callback be started on acquire, which is already a coroutine?
Theoretically, that should work but for whatever reason a few of the tests break when trying this. Instead of debugging this, I'd prefer to use the add_callback
There was a problem hiding this comment.
Indeed, the add_callback seems to work. I'm wondering how other PCs are affected and if we should replace all pc.start() with this.
I think that we're usually pretty careful with these, but I wouldn't be surprised if something else has slipped.
|
This looks good to me. Merging. Thanks @fjetter for the work and @pvanderlinden for raising the issue and helping with review. |
When running workers in a threadpool the Tornado PC doesn't seem to pick the correct IOLoop instance. The PC is effectively never scheduled causing all semaphore leases to timeout.
Closes #4057