-
Notifications
You must be signed in to change notification settings - Fork 7.4k
Closed
Closed
Copy link
Labels
dataRay Data-related issuesRay Data-related issuesenhancementRequest for new feature and/or capabilityRequest for new feature and/or capabilityllmtriageNeeds triage (eg: priority, bug/not-bug, and owning component)Needs triage (eg: priority, bug/not-bug, and owning component)
Description
Description
Goal: enable users to control resources/concurrency of preprocess/postprocess stages inside build_llm_processor (Ray Data LLM batch processor), so they can scale those stages independently of the main LLM stage without external workarounds. Allows things like provisioning fractional CPU to improve utilization / efficiency (whereas Ray data would default to CPU: 1).
Summary
- Today
build_llm_processorwirespreprocess -> vLLM -> postprocessusing fixedDataset.map(...). mapdefaults: ~1 CPU/task; concurrency ~= num blocks (bounded by available CPUs).- Users cannot tune pre/post stages (e.g.,
num_cpus) independently. - Feature Request: allow passing through
mapkwargs per stage.
Motivation
- Common pipelines (e.g., image captioning) need lightweight CPU-heavy preprocess and cheap postprocess that should scale differently from the LLM/GPU stage.
- Today the workaround is to set
preprocess=None/postprocess=Noneand wrap the processor with external Ray Data ops.
Proposal
-
Extend API:
build_llm_processor( config, preprocess: Optional[Callable] = None, postprocess: Optional[Callable] = None, preprocess_map_kwargs: Optional[Dict[str, Any]] = None, postprocess_map_kwargs: Optional[Dict[str, Any]] = None, )
-
In
ProcessorBase(e.g.,python/ray/llm/_internal/batch/processor/base.py), forward the given kwargs to:if self.preprocess: dataset = dataset.map(self.preprocess, **(self.preprocess_map_kwargs or {})) ... if self.postprocess: dataset = dataset.map(self.postprocess, **(self.postprocess_map_kwargs or {}))
-
Validate keys against supported
Dataset.mapkwargs; warn on unknown keys. -
Defaults
None→ current behavior unchanged.
Use case
Example
proc = build_llm_processor(
processor_config,
preprocess=caption_preprocess, # fn(row) -> row
postprocess=caption_postprocess, # fn(row) -> row
preprocess_map_kwargs={"num_cpus": 0.5},
postprocess_map_kwargs={"num_cpus": 0.25},
)
result = proc(input_ds)Acceptance Criteria
- Users can set per-stage
num_cpusvia the new kwargs. - Unit tests confirm kwargs are honored (resource specs attached to map tasks; concurrency respected).
- Docs updated (processor API, example snippet).
- Backward compatibility preserved.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
dataRay Data-related issuesRay Data-related issuesenhancementRequest for new feature and/or capabilityRequest for new feature and/or capabilityllmtriageNeeds triage (eg: priority, bug/not-bug, and owning component)Needs triage (eg: priority, bug/not-bug, and owning component)