Skip to content

Add thread interrupt logic#4726

Open
martindurant wants to merge 13 commits intodask:mainfrom
martindurant:interrupt
Open

Add thread interrupt logic#4726
martindurant wants to merge 13 commits intodask:mainfrom
martindurant:interrupt

Conversation

@martindurant
Copy link
Copy Markdown
Member

@martindurant martindurant commented Apr 21, 2021

@martindurant
Copy link
Copy Markdown
Member Author

This shows the basic working. The following script runs successfully:

import dask.distributed
import time
import threading
client = dask.distributed.Client(processes=False, n_workers=1, threads_per_worker=1)
client.wait_for_workers(1)

def runme(N):
    for i in range(N):
        time.sleep(1)

fut = client.submit(runme, 10000000)
time.sleep(0.01)  # make sure fut is allocated first
fut0 = client.submit(lambda x: x, True)

fut.cancel()
fut0.result()

As coded, this happens whenever an executing key is released for whatever reason, and there is no config for it. Also, if the task in question captures exceptions, it will keep running.

(please ignore print statements)

@mrocklin
Copy link
Copy Markdown
Member

This is cool to see. I know that this is still just a draft status, and I'm sure that this is in your plan, but I would encourage you to write down tests to demonstrate that some of the concerns raised in the issue are handled by this, for example that finally blocks are respected and such.

@martindurant
Copy link
Copy Markdown
Member Author

@mrocklin : first pass at a pair of tests, but we will still need to be very careful and thorough here. btw: it took me ages to realist that fut.cancel() is a coroutine needing await.

I would still like some feedback here on whether this is a good idea for the general case, or should be a config, or only exposed via some explicit client call. The latter is hard, since the user may not have a handle to the particular task that is hanging (or just taking longer than expected).

@martindurant
Copy link
Copy Markdown
Member Author

(windows error is in test_str, not obviously related)

@jrbourbeau
Copy link
Copy Markdown
Member

cc @crusaderky

@fjetter
Copy link
Copy Markdown
Member

fjetter commented Apr 28, 2021

FWIW, I would be comfortable enough with calling C API directly for this to be merged. The API seems to be stable since py3.7 (https://docs.python.org/3/c-api/init.html#c.PyThreadState_SetAsyncExc)
If we add more tests and document this behaviour, I'd be fine with it. In terms of complexity, this is also justified considering the benefits. If this were to make trouble down the road, we could think about toggles but for now I'd like to avoid more options.

I'm wondering, would this be something we could contribute to CPython directly? I understand that we had the need to vendor the threadpool for the seceding feature but every modification raises maintenance cost (or the risk of it)

@alexandervaneck
Copy link
Copy Markdown

Hello 👋 I stumbled upon this issue/PR while working with Prefect. (See PrefectHQ/prefect#5043).
I've also tested the proposed PR and it seems to fix this issue, great work @martindurant 🙌

@martindurant : Do you have the intention to finish this PR and make it part of distributed?

Comment on lines +2781 to +2786
if ts.state == "executing":
self.executing_count -= 1
th = [th for th, k in self.active_threads.items() if k == key]
if th:
logger.info("Interrupting thread %i for task %s", th[0], key)
self.executor.interrupt(th[0])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section should nowadays live in transition_executing_released

Tied to config variable distributed.worker.thread_auto_interrupt,
assumed False to start (not yet set in config schema).
@martindurant martindurant marked this pull request as ready for review October 15, 2021 19:50
@martindurant
Copy link
Copy Markdown
Member Author

provisionally ready for review - we need to decide whether this should be default or not, and how to describe it in the conf and docs.

Added distributed.worker.thread_auto_interrupt to config schema
@martindurant
Copy link
Copy Markdown
Member Author

I have made the default False for now (no interrupt) and added the key to the config schema. It is pretty hidden! We could add this to the docs somewhere, or softly launch it by suggesting some users try it when facing long-running released tasks.

Comment on lines +2151 to +2153
if th:
logger.info("Interrupting thread %i for task %s", th[0], ts.key)
self.executor.interrupt(th[0])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue it is not possible for th to be empty. If that is true, there should probably be a else: RuntimeError. Although I'm a bit ambivalent since on the one hand I'd like us to avoid implicit fail cases but on the other hand, we are not raising exceptions anywhere in the transition engine, so far.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine it's possible for the task to end at the same time as it is released for another reason. If it turns out there is no longer a thread associated, that doesn't seem like an error to me.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Can we test this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least, we should add a comment here. It's not straight forward to figure out and it feels like an important detail.

I'm also wondering now what happens if the thread finishes properly while we're trying to interrupt vs the task raised an exception before and we're trying to interrupt. What will be the final task state if either one happens? (I'm mostly concerned about invalid states. If we rerun anything, I don't mind as long as the state machine isn't messed up)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test this?

I'm don't see a way we can emulate it without horrible race problems.

what happens if the thread finishes properly while we're trying to interrupt

The thread doesn't finish as such, it goes into the loop to pick more work, which is what this is for. Having said that, the _WorkItem s don't have a state beyond what's encoded in the corresponding future.

task raised an exception before and we're trying to interrupt

If the task is already in an Except block, you would expect to see "During handling of the above exception, another exception occurred:" as the return, but finally: blocks would still run.

If we rerun anything, I don't mind as long as the state machine isn't messed up

Fundamentally, we are cleaning up a task which is set for release, and so the output of the task and the Item corresponding object's state don't matter.

@martindurant
Copy link
Copy Markdown
Member Author

Anything left here?

@martindurant
Copy link
Copy Markdown
Member Author

ping?

@fjetter
Copy link
Copy Markdown
Member

fjetter commented Dec 16, 2021

My final concern here is that we're not using the stdlib ThreadPoolExecutor interface any longer but require an extended interface which breaks some assumption about our compatibility, see

elif "ThreadPoolExecutor" in str(type(e)):
result = await self.loop.run_in_executor(
e,
apply_function,
function,
args2,
kwargs2,
self.execution_state,
ts.key,
self.active_threads,
self.active_threads_lock,
self.scheduler_delay,
)

Discussion around this https://github.com/dask/distributed/pull/5063/files#r670260505

FWIW, we're not consistent with this and I believe we should require a strict isinstance check, e.g.

if isinstance(executor, ThreadPoolExecutor):
executor._work_queue.queue.clear()
executor.shutdown(wait=executor_wait, timeout=timeout)
else:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Allow workers to cancel running task?

5 participants