-
-
Notifications
You must be signed in to change notification settings - Fork 756
Description
If an input to a task is not actually in self.data, this currently manifests as the task failing (as though it's a user error). This in fact indicates a serious worker state issue, and should probably cause the entire worker to shut down.
See #6125 (comment) for a traceback of how an invalid transition led to a task executing where its inputs weren't all in memory yet. The df.compute() in client code failed as though this was a normal error, with the message KeyError: "('split-shuffle-1-b4961b03aa9e8bec7c581d2dc337f717', 10, (3, 9))".
The problem is this overly-generous try/except in Worker.execute, which treats any problem in the try block as an issue with the task, including _prepare_args_for_execution, which looks up the input keys:
distributed/distributed/worker.py
Lines 3488 to 3601 in 68319f3
| if self.validate: | |
| assert not ts.waiting_for_data | |
| assert ts.state == "executing" | |
| assert ts.run_spec is not None | |
| function, args, kwargs = await self._maybe_deserialize_task( # type: ignore | |
| ts, stimulus_id=stimulus_id | |
| ) | |
| args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs) | |
| try: | |
| executor = ts.annotations["executor"] # type: ignore | |
| except (TypeError, KeyError): | |
| executor = "default" | |
| try: | |
| e = self.executors[executor] | |
| except KeyError: | |
| raise ValueError( | |
| f"Invalid executor {executor!r}; " | |
| f"expected one of: {sorted(self.executors)}" | |
| ) | |
| self.active_keys.add(key) | |
| try: | |
| ts.start_time = time() | |
| if iscoroutinefunction(function): | |
| result = await apply_function_async( | |
| function, | |
| args2, | |
| kwargs2, | |
| self.scheduler_delay, | |
| ) | |
| elif "ThreadPoolExecutor" in str(type(e)): | |
| result = await self.loop.run_in_executor( | |
| e, | |
| apply_function, | |
| function, | |
| args2, | |
| kwargs2, | |
| self.execution_state, | |
| key, | |
| self.active_threads, | |
| self.active_threads_lock, | |
| self.scheduler_delay, | |
| ) | |
| else: | |
| result = await self.loop.run_in_executor( | |
| e, | |
| apply_function_simple, | |
| function, | |
| args2, | |
| kwargs2, | |
| self.scheduler_delay, | |
| ) | |
| finally: | |
| self.active_keys.discard(key) | |
| self.threads[key] = result["thread"] | |
| if result["op"] == "task-finished": | |
| if self.digests is not None: | |
| self.digests["task-duration"].add(result["stop"] - result["start"]) | |
| return ExecuteSuccessEvent( | |
| key=key, | |
| value=result["result"], | |
| start=result["start"], | |
| stop=result["stop"], | |
| nbytes=result["nbytes"], | |
| type=result["type"], | |
| stimulus_id=stimulus_id, | |
| ) | |
| if isinstance(result["actual-exception"], Reschedule): | |
| return RescheduleEvent(key=ts.key, stimulus_id=stimulus_id) | |
| logger.warning( | |
| "Compute Failed\n" | |
| "Key: %s\n" | |
| "Function: %s\n" | |
| "args: %s\n" | |
| "kwargs: %s\n" | |
| "Exception: %r\n", | |
| key, | |
| str(funcname(function))[:1000], | |
| convert_args_to_str(args2, max_len=1000), | |
| convert_kwargs_to_str(kwargs2, max_len=1000), | |
| result["exception_text"], | |
| ) | |
| return ExecuteFailureEvent( | |
| key=key, | |
| start=result["start"], | |
| stop=result["stop"], | |
| exception=result["exception"], | |
| traceback=result["traceback"], | |
| exception_text=result["exception_text"], | |
| traceback_text=result["traceback_text"], | |
| stimulus_id=stimulus_id, | |
| ) | |
| except Exception as exc: | |
| logger.error("Exception during execution of task %s.", key, exc_info=True) | |
| msg = error_message(exc) | |
| return ExecuteFailureEvent( | |
| key=key, | |
| start=None, | |
| stop=None, | |
| exception=msg["exception"], | |
| traceback=msg["traceback"], | |
| exception_text=msg["exception_text"], | |
| traceback_text=msg["traceback_text"], | |
| stimulus_id=stimulus_id, | |
| ) | |
Most problems that could happen there would be user error, but not all of them. We should be more selective?