Skip to content

feat(data): Add on_error and error_handler to map_batches#52457

Closed
demoncoder-crypto wants to merge 1 commit intoray-project:masterfrom
demoncoder-crypto:feat/data-map-batches-error-handling
Closed

feat(data): Add on_error and error_handler to map_batches#52457
demoncoder-crypto wants to merge 1 commit intoray-project:masterfrom
demoncoder-crypto:feat/data-map-batches-error-handling

Conversation

@demoncoder-crypto
Copy link
Copy Markdown

Why are these changes needed?

This change introduces error handling options (on_error, error_handler) to Dataset.map_batches. Currently, a single failing batch halts the entire job, problematic for long-running tasks or those with transient errors. This PR allows users to configure pipelines to skip faulty batches (on_error='continue') and optionally log errors via a handler. This improves the robustness of Ray Data pipelines against individual batch failures, preventing complete job loss from isolated issues.

Related issue number

Closes #

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Adds parameters  ('raise' or 'continue') and  (a callable) to .

When , exceptions raised by the user's function during batch processing are caught. The batch is skipped, a warning is logged, and the optional  is called.

This allows pipelines to continue processing despite errors in individual batches, preventing complete job failure for transient or data-specific issues.

Implements the core logic by modifying MapTransformer._udf_timed_iter. The error handler currently receives None for the batch due to iterator consumption before the except block.
Comment on lines +62 to +63
on_error: Literal["raise", "continue"] = "raise",
error_handler: Optional[Callable[[Optional[Any], Exception], None]] = None,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's actually combine this into a single on_error param that should be a callable with signature (batch, exception) -> batch

Can support either of the following scenarios:

  • Recovery: defaulting to none, default, etc
  • Failure: capture exception (log, etc) and re-raise

@raulchen
Copy link
Copy Markdown
Contributor

Hi @demoncoder-crypto, thanks for your contribution.

This is a helpful feature to give users more flexibility for error handling. But regarding the API/implementation, there are a few more things to consider:

  1. We can combine these 2 args into one single on_error parameter that supporst either a callback or a literal.
  2. I'd be nice to propagate error stats (e.g. number of errored rows/batches) back to the driver.
  3. We are about to add a per-op OpOptions for advanced configs soon, to avoid exposing too many top-level args. This on_error can go there.
  4. there is already a max_errored_blocks flag in DataContext. It'd be nice to reconcile them.

Given that, could you open a ticket to discuss this feature first?

@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Jun 12, 2025
@github-actions
Copy link
Copy Markdown

This pull request has been automatically closed because there has been no more activity in the 14 days
since being marked stale.

Please feel free to reopen or open a new pull request if you'd still like this to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for your contribution!

@github-actions github-actions bot closed this Jun 26, 2025
nrghosh added a commit to nrghosh/ray that referenced this pull request Dec 6, 2025
…h inference

Add `continue_on_error` parameter to vLLM batch processor config. When
enabled, inference failures yield error rows instead of crashing the job.

Scoped to Ray Data LLM batch inference only; no changes to Ray Data core.

Addresses: ray-project#52449
Related: ray-project#52457

When running batch inference with Ray Data LLM, a single bad row (e.g.,
prompt exceeding max_model_len) can crash the whole batch.

Add optional `continue_on_error` parameter to processor config. The
parameter defaults to False, preserving existing fail-fast behavior.

When set to True:
- Catch exceptions in vLLMEngineStageUDF
- Failed rows yield with `__inference_error__` column set to error message
- Successful rows have `__inference_error__: None`
- Error rows bypass postprocess (avoids crashes when expected fields missing)
- Job completes with mixed success/failure outputs
- Users filter downstream: ds.filter(lambda r: r["__inference_error__"] is None)

1. **Default behavior unchanged**: `continue_on_error=False` preserves
   existing fail-fast semantics. This is opt-in only.

2. **Error rows bypass postprocess**: User's postprocess function likely
   expects `generated_text` and other output fields. Error rows won't have
   these, so we skip postprocess to avoid secondary crashes.

3. **Error as Optional[str]**: The `__inference_error__` column is None on
   success, or contains the error message (with type) on failure. This
   provides debuggability while keeping schema simple.

4. **LLM operator only**: Per feedback, this is scoped to the LLM processor
   implementation. No changes to Ray Data core primitives.

- **Silent vs visible failures**: Choose visible failures (error column)
  over silent dropping for observability.

- **Schema addition**: All outputs now include `__inference_error__` column.
  This is necessary for users to distinguish success from failure and debug.

- **No retry mechanism**: Retrying and auto-tuning is outside the scope of
  this PR.
```

---

- `python/ray/data/llm.py` - Document new parameter in public API
- `python/ray/llm/_internal/batch/processor/base.py` - Add `continue_on_error` to config
- `python/ray/llm/_internal/batch/processor/vllm_engine_proc.py` - Pass config to stage
- `python/ray/llm/_internal/batch/stages/base.py` - Skip postprocess for error rows
- `python/ray/llm/_internal/batch/stages/vllm_engine_stage.py` - Catch errors and yield error rows

```python
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="meta-llama/Meta-Llama-3.1-8B-Instruct",
    continue_on_error=True,  # Enable graceful error handling
)

processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[{"role": "user", "content": row["prompt"]}],
        sampling_params=dict(temperature=0.3, max_tokens=100),
    ),
    postprocess=lambda row: dict(
        response=row["generated_text"],
    ),
)

ds = ray.data.read_json("prompts.json")
result = processor(ds)

successful = result.filter(lambda r: r["__inference_error__"] is None)
successful.write_json("outputs/")

failed = result.filter(lambda r: r["__inference_error__"] is not None)
print(f"Failed: {failed.count()} rows")
failed.show(5)
```

`python/ray/llm/tests/batch/cpu/stages/test_stage_base.py`:
- `test_wrap_postprocess_bypasses_error_rows`
- `test_wrap_postprocess_success_rows_run_postprocess`

`python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py`:
- `test_vllm_udf_default_raises_on_error`
- `test_vllm_udf_continue_on_error_yields_error_row`
- `test_vllm_udf_mixed_success_and_error`

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
nrghosh added a commit to nrghosh/ray that referenced this pull request Dec 6, 2025
…h inference

Add `continue_on_error` parameter to vLLM batch processor config. When
enabled, inference failures yield error rows instead of crashing the job.

Scoped to Ray Data LLM batch inference only; no changes to Ray Data core.

Addresses: ray-project#52449
Related: ray-project#52457

When running LLM batch inference at scale, a single bad row (e.g., prompt
exceeding max_model_len) can crash the whole batch.

Add optional `continue_on_error` parameter to processor config. The
parameter defaults to False, preserving existing fail-fast behavior.
When set to True:
- Catch exceptions in vLLMEngineStageUDF
- Failed rows yield with `__inference_error__` column set to error message
- Successful rows have `__inference_error__: None`
- Error rows bypass postprocess (avoids crashes when expected fields missing)
- Job completes with mixed success/failure outputs
- Users filter downstream: ds.filter(lambda r: r["__inference_error__"] is None)

1. **Default behavior unchanged**: `continue_on_error=False` preserves
   existing fail-fast semantics. This is opt-in only.

2. **Error rows bypass postprocess**: User's postprocess function likely
   expects `generated_text` and other output fields. Error rows won't have
   these, so we skip postprocess to avoid secondary crashes.

3. **Error as Optional[str]**: The `__inference_error__` column is None on
   success, or contains the error message (with type) on failure. This
   provides debuggability while keeping schema simple.

4. **LLM operator only**: Per feedback, this is scoped to the LLM processor
   implementation. No changes to Ray Data core primitives.

- **Silent vs visible failures**: Choose visible failures (error column)
  over silent dropping for observability.

- **Schema addition**: All outputs now include `__inference_error__` column.
  This is necessary for users to distinguish success from failure and debug.

- **No retry mechanism**: Retrying and auto-tuning is outside the scope of
  this PR.

---

- `python/ray/data/llm.py` - Document new parameter in public API
- `python/ray/llm/_internal/batch/processor/base.py` - Add `continue_on_error` to config
- `python/ray/llm/_internal/batch/processor/vllm_engine_proc.py` - Pass config to stage
- `python/ray/llm/_internal/batch/stages/base.py` - Skip postprocess for error rows
- `python/ray/llm/_internal/batch/stages/vllm_engine_stage.py` - Catch errors and yield error rows

```python
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="meta-llama/Meta-Llama-3.1-8B-Instruct",
    continue_on_error=True,  # Enable graceful error handling
)

processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[{"role": "user", "content": row["prompt"]}],
        sampling_params=dict(temperature=0.3, max_tokens=100),
    ),
    postprocess=lambda row: dict(
        response=row["generated_text"],
    ),
)

ds = ray.data.read_json("prompts.json")
result = processor(ds)

successful = result.filter(lambda r: r["__inference_error__"] is None)
successful.write_json("outputs/")

failed = result.filter(lambda r: r["__inference_error__"] is not None)
print(f"Failed: {failed.count()} rows")
failed.show(5)
```

`python/ray/llm/tests/batch/cpu/stages/test_stage_base.py`:
- `test_wrap_postprocess_bypasses_error_rows`
- `test_wrap_postprocess_success_rows_run_postprocess`

`python/ray/llm/tests/batch/gpu/stages/test_vllm_engine_stage.py`:
- `test_vllm_udf_default_raises_on_error`
- `test_vllm_udf_continue_on_error_yields_error_row`
- `test_vllm_udf_mixed_success_and_error`

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues stale The issue is stale. It will be closed within 7 days unless there are further conversation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants