Conversation
|
This shows the basic working. The following script runs successfully: As coded, this happens whenever an executing key is released for whatever reason, and there is no config for it. Also, if the task in question captures exceptions, it will keep running. (please ignore print statements) |
|
This is cool to see. I know that this is still just a draft status, and I'm sure that this is in your plan, but I would encourage you to write down tests to demonstrate that some of the concerns raised in the issue are handled by this, for example that finally blocks are respected and such. |
|
@mrocklin : first pass at a pair of tests, but we will still need to be very careful and thorough here. btw: it took me ages to realist that I would still like some feedback here on whether this is a good idea for the general case, or should be a config, or only exposed via some explicit client call. The latter is hard, since the user may not have a handle to the particular task that is hanging (or just taking longer than expected). |
|
(windows error is in |
|
cc @crusaderky |
|
FWIW, I would be comfortable enough with calling C API directly for this to be merged. The API seems to be stable since py3.7 (https://docs.python.org/3/c-api/init.html#c.PyThreadState_SetAsyncExc) I'm wondering, would this be something we could contribute to CPython directly? I understand that we had the need to vendor the threadpool for the seceding feature but every modification raises maintenance cost (or the risk of it) |
|
Hello 👋 I stumbled upon this issue/PR while working with Prefect. (See PrefectHQ/prefect#5043). @martindurant : Do you have the intention to finish this PR and make it part of distributed? |
distributed/worker.py
Outdated
| if ts.state == "executing": | ||
| self.executing_count -= 1 | ||
| th = [th for th, k in self.active_threads.items() if k == key] | ||
| if th: | ||
| logger.info("Interrupting thread %i for task %s", th[0], key) | ||
| self.executor.interrupt(th[0]) |
There was a problem hiding this comment.
This section should nowadays live in transition_executing_released
Tied to config variable distributed.worker.thread_auto_interrupt, assumed False to start (not yet set in config schema).
|
provisionally ready for review - we need to decide whether this should be default or not, and how to describe it in the conf and docs. |
Added distributed.worker.thread_auto_interrupt to config schema
|
I have made the default False for now (no interrupt) and added the key to the config schema. It is pretty hidden! We could add this to the docs somewhere, or softly launch it by suggesting some users try it when facing long-running released tasks. |
| if th: | ||
| logger.info("Interrupting thread %i for task %s", th[0], ts.key) | ||
| self.executor.interrupt(th[0]) |
There was a problem hiding this comment.
I would argue it is not possible for th to be empty. If that is true, there should probably be a else: RuntimeError. Although I'm a bit ambivalent since on the one hand I'd like us to avoid implicit fail cases but on the other hand, we are not raising exceptions anywhere in the transition engine, so far.
There was a problem hiding this comment.
I imagine it's possible for the task to end at the same time as it is released for another reason. If it turns out there is no longer a thread associated, that doesn't seem like an error to me.
There was a problem hiding this comment.
At the very least, we should add a comment here. It's not straight forward to figure out and it feels like an important detail.
I'm also wondering now what happens if the thread finishes properly while we're trying to interrupt vs the task raised an exception before and we're trying to interrupt. What will be the final task state if either one happens? (I'm mostly concerned about invalid states. If we rerun anything, I don't mind as long as the state machine isn't messed up)
There was a problem hiding this comment.
Can we test this?
I'm don't see a way we can emulate it without horrible race problems.
what happens if the thread finishes properly while we're trying to interrupt
The thread doesn't finish as such, it goes into the loop to pick more work, which is what this is for. Having said that, the _WorkItem s don't have a state beyond what's encoded in the corresponding future.
task raised an exception before and we're trying to interrupt
If the task is already in an Except block, you would expect to see "During handling of the above exception, another exception occurred:" as the return, but finally: blocks would still run.
If we rerun anything, I don't mind as long as the state machine isn't messed up
Fundamentally, we are cleaning up a task which is set for release, and so the output of the task and the Item corresponding object's state don't matter.
|
Anything left here? |
|
ping? |
|
My final concern here is that we're not using the stdlib distributed/distributed/worker.py Lines 3433 to 3445 in 96d4fd4 Discussion around this https://github.com/dask/distributed/pull/5063/files#r670260505 FWIW, we're not consistent with this and I believe we should require a strict isinstance check, e.g. distributed/distributed/worker.py Lines 1610 to 1613 in 96d4fd4 |
black distributed/flake8 distributed/isort distributed