feat(data): Add on_error and error_handler to map_batches#52457
feat(data): Add on_error and error_handler to map_batches#52457demoncoder-crypto wants to merge 1 commit intoray-project:masterfrom
Conversation
Adds parameters ('raise' or 'continue') and (a callable) to .
When , exceptions raised by the user's function during batch processing are caught. The batch is skipped, a warning is logged, and the optional is called.
This allows pipelines to continue processing despite errors in individual batches, preventing complete job failure for transient or data-specific issues.
Implements the core logic by modifying MapTransformer._udf_timed_iter. The error handler currently receives None for the batch due to iterator consumption before the except block.
| on_error: Literal["raise", "continue"] = "raise", | ||
| error_handler: Optional[Callable[[Optional[Any], Exception], None]] = None, |
There was a problem hiding this comment.
Let's actually combine this into a single on_error param that should be a callable with signature (batch, exception) -> batch
Can support either of the following scenarios:
- Recovery: defaulting to none, default, etc
- Failure: capture exception (log, etc) and re-raise
|
Hi @demoncoder-crypto, thanks for your contribution. This is a helpful feature to give users more flexibility for error handling. But regarding the API/implementation, there are a few more things to consider:
Given that, could you open a ticket to discuss this feature first? |
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
|
This pull request has been automatically closed because there has been no more activity in the 14 days Please feel free to reopen or open a new pull request if you'd still like this to be addressed. Again, you can always ask for help on our discussion forum or Ray's public slack channel. Thanks again for your contribution! |
…h inference Add `continue_on_error` parameter to vLLM batch processor config. When enabled, inference failures yield error rows instead of crashing the job. Scoped to Ray Data LLM batch inference only; no changes to Ray Data core. Addresses: ray-project#52449 Related: ray-project#52457 When running batch inference with Ray Data LLM, a single bad row (e.g., prompt exceeding max_model_len) can crash the whole batch. Add optional `continue_on_error` parameter to processor config. The parameter defaults to False, preserving existing fail-fast behavior. When set to True: - Catch exceptions in vLLMEngineStageUDF - Failed rows yield with `__inference_error__` column set to error message - Successful rows have `__inference_error__: None` - Error rows bypass postprocess (avoids crashes when expected fields missing) - Job completes with mixed success/failure outputs - Users filter downstream: ds.filter(lambda r: r["__inference_error__"] is None) 1. **Default behavior unchanged**: `continue_on_error=False` preserves existing fail-fast semantics. This is opt-in only. 2. **Error rows bypass postprocess**: User's postprocess function likely expects `generated_text` and other output fields. Error rows won't have these, so we skip postprocess to avoid secondary crashes. 3. **Error as Optional[str]**: The `__inference_error__` column is None on success, or contains the error message (with type) on failure. This provides debuggability while keeping schema simple. 4. **LLM operator only**: Per feedback, this is scoped to the LLM processor implementation. No changes to Ray Data core primitives. - **Silent vs visible failures**: Choose visible failures (error column) over silent dropping for observability. - **Schema addition**: All outputs now include `__inference_error__` column. This is necessary for users to distinguish success from failure and debug. - **No retry mechanism**: Retrying and auto-tuning is outside the scope of this PR. ``` --- - `python/ray/data/llm.py` - Document new parameter in public API - `python/ray/llm/_internal/batch/processor/base.py` - Add `continue_on_error` to config - `python/ray/llm/_internal/batch/processor/vllm_engine_proc.py` - Pass config to stage - `python/ray/llm/_internal/batch/stages/base.py` - Skip postprocess for error rows - `python/ray/llm/_internal/batch/stages/vllm_engine_stage.py` - Catch errors and yield error rows ```python from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor config = vLLMEngineProcessorConfig( model_source="meta-llama/Meta-Llama-3.1-8B-Instruct", continue_on_error=True, # Enable graceful error handling ) processor = build_llm_processor( config, preprocess=lambda row: dict( messages=[{"role": "user", "content": row["prompt"]}], sampling_params=dict(temperature=0.3, max_tokens=100), ), postprocess=lambda row: dict( response=row["generated_text"], ), ) ds = ray.data.read_json("prompts.json") result = processor(ds) successful = result.filter(lambda r: r["__inference_error__"] is None) successful.write_json("outputs/") failed = result.filter(lambda r: r["__inference_error__"] is not None) print(f"Failed: {failed.count()} rows") failed.show(5) ``` `python/ray/llm/tests/batch/cpu/stages/test_stage_base.py`: - `test_wrap_postprocess_bypasses_error_rows` - `test_wrap_postprocess_success_rows_run_postprocess` `python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py`: - `test_vllm_udf_default_raises_on_error` - `test_vllm_udf_continue_on_error_yields_error_row` - `test_vllm_udf_mixed_success_and_error` Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
…h inference Add `continue_on_error` parameter to vLLM batch processor config. When enabled, inference failures yield error rows instead of crashing the job. Scoped to Ray Data LLM batch inference only; no changes to Ray Data core. Addresses: ray-project#52449 Related: ray-project#52457 When running LLM batch inference at scale, a single bad row (e.g., prompt exceeding max_model_len) can crash the whole batch. Add optional `continue_on_error` parameter to processor config. The parameter defaults to False, preserving existing fail-fast behavior. When set to True: - Catch exceptions in vLLMEngineStageUDF - Failed rows yield with `__inference_error__` column set to error message - Successful rows have `__inference_error__: None` - Error rows bypass postprocess (avoids crashes when expected fields missing) - Job completes with mixed success/failure outputs - Users filter downstream: ds.filter(lambda r: r["__inference_error__"] is None) 1. **Default behavior unchanged**: `continue_on_error=False` preserves existing fail-fast semantics. This is opt-in only. 2. **Error rows bypass postprocess**: User's postprocess function likely expects `generated_text` and other output fields. Error rows won't have these, so we skip postprocess to avoid secondary crashes. 3. **Error as Optional[str]**: The `__inference_error__` column is None on success, or contains the error message (with type) on failure. This provides debuggability while keeping schema simple. 4. **LLM operator only**: Per feedback, this is scoped to the LLM processor implementation. No changes to Ray Data core primitives. - **Silent vs visible failures**: Choose visible failures (error column) over silent dropping for observability. - **Schema addition**: All outputs now include `__inference_error__` column. This is necessary for users to distinguish success from failure and debug. - **No retry mechanism**: Retrying and auto-tuning is outside the scope of this PR. --- - `python/ray/data/llm.py` - Document new parameter in public API - `python/ray/llm/_internal/batch/processor/base.py` - Add `continue_on_error` to config - `python/ray/llm/_internal/batch/processor/vllm_engine_proc.py` - Pass config to stage - `python/ray/llm/_internal/batch/stages/base.py` - Skip postprocess for error rows - `python/ray/llm/_internal/batch/stages/vllm_engine_stage.py` - Catch errors and yield error rows ```python from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor config = vLLMEngineProcessorConfig( model_source="meta-llama/Meta-Llama-3.1-8B-Instruct", continue_on_error=True, # Enable graceful error handling ) processor = build_llm_processor( config, preprocess=lambda row: dict( messages=[{"role": "user", "content": row["prompt"]}], sampling_params=dict(temperature=0.3, max_tokens=100), ), postprocess=lambda row: dict( response=row["generated_text"], ), ) ds = ray.data.read_json("prompts.json") result = processor(ds) successful = result.filter(lambda r: r["__inference_error__"] is None) successful.write_json("outputs/") failed = result.filter(lambda r: r["__inference_error__"] is not None) print(f"Failed: {failed.count()} rows") failed.show(5) ``` `python/ray/llm/tests/batch/cpu/stages/test_stage_base.py`: - `test_wrap_postprocess_bypasses_error_rows` - `test_wrap_postprocess_success_rows_run_postprocess` `python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py`: - `test_vllm_udf_default_raises_on_error` - `test_vllm_udf_continue_on_error_yields_error_row` - `test_vllm_udf_mixed_success_and_error` Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Why are these changes needed?
This change introduces error handling options (
on_error,error_handler) toDataset.map_batches. Currently, a single failing batch halts the entire job, problematic for long-running tasks or those with transient errors. This PR allows users to configure pipelines to skip faulty batches (on_error='continue') and optionally log errors via a handler. This improves the robustness of Ray Data pipelines against individual batch failures, preventing complete job loss from isolated issues.Related issue number
Closes #Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.