Skip to content

Missing input data for a task should fail worker, not the task #6142

@gjoseph92

Description

@gjoseph92

If an input to a task is not actually in self.data, this currently manifests as the task failing (as though it's a user error). This in fact indicates a serious worker state issue, and should probably cause the entire worker to shut down.

See #6125 (comment) for a traceback of how an invalid transition led to a task executing where its inputs weren't all in memory yet. The df.compute() in client code failed as though this was a normal error, with the message KeyError: "('split-shuffle-1-b4961b03aa9e8bec7c581d2dc337f717', 10, (3, 9))".

The problem is this overly-generous try/except in Worker.execute, which treats any problem in the try block as an issue with the task, including _prepare_args_for_execution, which looks up the input keys:

if self.validate:
assert not ts.waiting_for_data
assert ts.state == "executing"
assert ts.run_spec is not None
function, args, kwargs = await self._maybe_deserialize_task( # type: ignore
ts, stimulus_id=stimulus_id
)
args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)
try:
executor = ts.annotations["executor"] # type: ignore
except (TypeError, KeyError):
executor = "default"
try:
e = self.executors[executor]
except KeyError:
raise ValueError(
f"Invalid executor {executor!r}; "
f"expected one of: {sorted(self.executors)}"
)
self.active_keys.add(key)
try:
ts.start_time = time()
if iscoroutinefunction(function):
result = await apply_function_async(
function,
args2,
kwargs2,
self.scheduler_delay,
)
elif "ThreadPoolExecutor" in str(type(e)):
result = await self.loop.run_in_executor(
e,
apply_function,
function,
args2,
kwargs2,
self.execution_state,
key,
self.active_threads,
self.active_threads_lock,
self.scheduler_delay,
)
else:
result = await self.loop.run_in_executor(
e,
apply_function_simple,
function,
args2,
kwargs2,
self.scheduler_delay,
)
finally:
self.active_keys.discard(key)
self.threads[key] = result["thread"]
if result["op"] == "task-finished":
if self.digests is not None:
self.digests["task-duration"].add(result["stop"] - result["start"])
return ExecuteSuccessEvent(
key=key,
value=result["result"],
start=result["start"],
stop=result["stop"],
nbytes=result["nbytes"],
type=result["type"],
stimulus_id=stimulus_id,
)
if isinstance(result["actual-exception"], Reschedule):
return RescheduleEvent(key=ts.key, stimulus_id=stimulus_id)
logger.warning(
"Compute Failed\n"
"Key: %s\n"
"Function: %s\n"
"args: %s\n"
"kwargs: %s\n"
"Exception: %r\n",
key,
str(funcname(function))[:1000],
convert_args_to_str(args2, max_len=1000),
convert_kwargs_to_str(kwargs2, max_len=1000),
result["exception_text"],
)
return ExecuteFailureEvent(
key=key,
start=result["start"],
stop=result["stop"],
exception=result["exception"],
traceback=result["traceback"],
exception_text=result["exception_text"],
traceback_text=result["traceback_text"],
stimulus_id=stimulus_id,
)
except Exception as exc:
logger.error("Exception during execution of task %s.", key, exc_info=True)
msg = error_message(exc)
return ExecuteFailureEvent(
key=key,
start=None,
stop=None,
exception=msg["exception"],
traceback=msg["traceback"],
exception_text=msg["exception_text"],
traceback_text=msg["traceback_text"],
stimulus_id=stimulus_id,
)

Most problems that could happen there would be user error, but not all of them. We should be more selective?

Metadata

Metadata

Assignees

No one assigned

    Labels

    stabilityIssue or feature related to cluster stability (e.g. deadlock)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions