-
Notifications
You must be signed in to change notification settings - Fork 7.4k
[RFC] Add PrepareVideoStage to Ray LLM Batch #56767
Description
RFC: Add PrepareVideoStage to Ray LLM Batch
1. Summary
This RFC proposes a new PrepareVideoStage for the Ray Data LLM batch pipeline. It aligns with the existing PrepareImageStage (same Stage/UDF shape), extracts video URLs from OpenAI Chat-format messages, performs download/decoding/sampling/preprocessing, and returns a sequence of image frames with metadata for downstream inference.
Minimal viable functionality (MVP):
- Input: users only provide a video URL in messages (http/https/file/data URI).
- Configuration: users only specify how to sample frames; default is FPS sampling (e.g., 3 frames/sec).
- Output: a list of frames (PIL.Image by default), plus video size and frame timestamps.
- All other parameters (caching, concurrency, output format) have sensible defaults and are optional.
PyAV (FFmpeg bindings) is chosen as the decoding backend because:
- Broad support: FFmpeg covers the widest range of containers/codecs/network protocols, suitable for production batch pipelines.
- Accurate timeline: exposes time_base/pts semantics, enabling a well-defined sampling timeline and testable behavior.
- Streaming and local file support: enables a fast “stream-first” startup and a stable “cache-then-seek” path when random access is needed.
decord doesn't require any additional dependencies, but this tool is a bit outdated. Maybe we can set it as the fallback option?
Why support memory/disk caching options and default to streaming:
- Streaming (default) reduces time-to-first-frame and avoids disk usage; ideal for light sampling (few frames).
- Disk cache is better for heavy/random access sampling, reducing network jitter and timeouts.
- In-memory cache (BytesIO) fits smaller videos or restricted environments; disk cache is optimal for stable reuse.
This stage matches PrepareImageStage: only sets fn=PrepareVideoUDF and get_required_input_keys() at the Stage level; parameters are injected into the UDF via the Processor’s fn_constructor_kwargs. This preserves consistent usage and backward compatibility.
2. Motivation
Modern VLMs (e.g., Qwen2.5-VL, LLaVA-OneVision) support video inputs. With only PrepareImageStage today, users must manually download and extract frames, which is error-prone and inconsistent. This stage standardizes “video → frame sequences” within the Ray Data LLM pipeline, improving usability, reliability, and alignment with vLLM multimodal inputs.
3. Design and Implementation
3.1 Alignment with Existing Stage
- Stage shell:
- Only declare fn = PrepareVideoUDF.
- get_required_input_keys() mirrors PrepareImageStage and still requires "messages".
- UDF parameters are injected via fn_constructor_kwargs from the Processor, avoiding custom Stage.init.
Minimal shell:
class PrepareVideoStage(StatefulStage):
"""A stage to prepare videos from OpenAI chat template messages."""
fn: StatefulStageUDF = PrepareVideoUDF
def get_required_input_keys(self) -> Dict[str, str]:
return {
"messages": "A list of messages in OpenAI chat format. See https://platform.openai.com/docs/api-reference/chat/create"
}3.2 Functionality and Minimal API (MVP)
- Input:
- Accepts in messages:
- {"type": "video", "video": "<url_or_path_or_data_uri>"}
- {"type": "video_url", "video_url": {"url": "<http/https/...>"}}
- Accepts in messages:
- Required user-facing parameter (most important):
- sampling: intuitive and minimal; choose one
- {"fps": 3} (default, 3 frames per second)
- {"num_frames": 8} (uniformly sample N frames)
- sampling: intuitive and minimal; choose one
- Optional parameters (sane defaults; not required):
- cache_dir=None, cache_mode="auto"|"disk"|"memory" (default "auto")
- output_format="pil"|"numpy" (default "pil")
- channels_first=False (only for numpy output)
- timeout_s=30, max_concurrency=8, retries=2
- pack_for_model=False (if True, directly pack multi_modal_data for downstream engines)
- Output:
- "video": List[List[Frame]] (per video in the request; Frame is PIL.Image or numpy.ndarray)
- "video_meta": List[Dict], one per input video:
- "video_size": [width, height]
- "video_num_frames": int (number of frames returned)
- "frame_timestamps": List[float] (seconds; computed via pts*time_base)
- "source": str (canonicalized source, e.g., cached file path or original URL)
- "failed": bool; "error": Optional[str]
Note: PrepareImageStage outputs "image"/"image_sizes". For videos we use "video"/"video_meta" with a consistent naming style that remains extensible.
3.3 Data Flow and Degradation Paths
- Source resolution (http/https/file/data URI):
- http/https: default to streaming (fast). When the sampling plan needs random seeks or many frames, "auto" switches to writing to cache_dir and decoding from disk (stable).
- data URI: decode into BytesIO (in-memory).
- file: open directly.
- Caching (optional):
- Filename: sha256(url)[:16] + original extension (if detectable); write to a temp file then atomic rename to avoid corruption under concurrency.
- Concurrent hits: concurrent requests for the same URL result in a single successful write; others reuse the file after the rename.
- Degradation:
- If keyframe-accurate seek is not stable, fall back to the nearest frame and record time deviation; if FPS cannot be estimated, fall back to uniform sampling.
- Prefer zero-copy conversion to rgb24; if unusual pixel formats are encountered, fall back to PIL conversion for correctness.
Why “stream-first + cache-on-demand”:
- Fast first request with low overhead; cache only when sampling requires stable random access. This balances time-to-first-frame and stability/resource usage.
3.4 Concurrency and Failure Isolation
- Concurrency: use asyncio.Semaphore(max_concurrency) for per-video concurrency; combine async network I/O with thread-pool decoding (await asyncio.to_thread(...)) to avoid blocking the event loop.
- Failure isolation: per-video try/except; failed items return failed=True and error, without affecting other items in the batch (best-effort default).
3.5 Sampling Timeline (Testability)
- Timeline: use FFmpeg time_base/pts; timestamp per frame t = pts * time_base (seconds), returned as frame_timestamps.
- Strategies:
- fps: fixed frames per second; truncate at video end. For VFR (variable frame rate), pick the nearest frame on the timeline.
- num_frames: uniformly distribute N target time points across the duration; align to the nearest frame; if insufficient, allow repeating the last frame or a fallback policy.
- Optional de-duplication: for VFR, optionally drop duplicate timestamps (off by default to keep the API simple).
3.6 Preprocessing and Zero-Copy
- Default output is PIL.Image (familiar and easy to visualize/debug).
- Array output: use frame.to_ndarray(format="rgb24") to minimize copies; with channels_first=True, return (T, C, H, W).
- Optional preprocessing: resize/crop/convert. Use PIL for lightweight needs; higher-performance paths can be introduced in future iterations when needed.
3.7 Interop with Ray Data
- Frame passthrough: if the upstream dataset already provides frames (e.g., "video" or "video_frames"), expose bypass_if_frames_present=True to reuse frames and only re-sample/preprocess to avoid redundant decoding.
- Column naming aligned with PrepareImageStage, enabling composition (image + video) and consistent user expectations.
3.8 Minimal Pseudocode (core outline only)
class PrepareVideoUDF(StatefulStageUDF):
def __init__(
self,
data_column: str,
expected_input_keys: List[str],
*,
sampling: Dict[str, int] = None, # {"fps": 3} or {"num_frames": 8}
cache_dir: Optional[str] = None,
cache_mode: str = "auto", # "auto"|"disk"|"memory"
output_format: str = "pil", # "pil"|"numpy"
channels_first: bool = False,
timeout_s: float = 30.0,
max_concurrency: int = 8,
retries: int = 2,
bypass_if_frames_present: bool = False,
pack_for_model: bool = False,
):
super().__init__(data_column, expected_input_keys)
self.video = VideoProcessor(
sampling=sampling or {"fps": 3},
cache_dir=cache_dir,
cache_mode=cache_mode,
output_format=output_format,
channels_first=channels_first,
timeout_s=timeout_s,
max_concurrency=max_concurrency,
retries=retries,
bypass_if_frames_present=bypass_if_frames_present,
pack_for_model=pack_for_model,
)
def extract_video_info(self, messages: List[Dict]) -> List[str]:
# Parse {"type": "video"} and {"type": "video_url"} and return a list of URLs/paths.
return []
async def udf(self, batch: List[Dict[str, Any]]):
msgs = [row["messages"] for row in batch]
all_video_info = [self.extract_video_info(m) for m in msgs]
flat = [v for vs in all_video_info for v in vs]
flat_processed = await self.video.process(flat)
start = 0
for i, per_req in enumerate(all_video_info):
n = len(per_req)
ret = {self.IDX_IN_BATCH_COLUMN: i}
if n > 0:
processed = flat_processed[start:start + n]
ret.update({
"video": [p["frames"] for p in processed],
"video_meta": [p["meta"] for p in processed],
})
start += n
yield ret4. Usage and Documentation (Documentation & Examples)
Goal: the minimal workflow is “URL + simple sampling (e.g., 3 fps) → frames in output”. The stage returns frames automatically from messages. For vLLM integration, similar to has_image, provide an example with has_video=True. If users want to pass frames directly to multi_modal_data, enable pack_for_model=True.
Minimal example:
import ray
from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig
# 1) Configure processor (aligned with image usage)
processor = build_llm_processor(
vLLMEngineProcessorConfig(
model_source="Qwen/Qwen2.5-VL-3B-Instruct",
engine_kwargs=dict(limit_mm_per_prompt={"video": 1}),
has_video=True, # analogous to has_image
),
preprocess=lambda row: dict(
# 2) Provide only a URL + intuitive sampling config (3 fps)
messages=[{
"role": "user",
"content": [
{"type": "text", "text": "Describe this video."},
{"type": "video", "video": row["video_url"]},
],
}],
# Pass minimal parameters into PrepareVideoUDF
prepare_video_kwargs=dict(
sampling={"fps": 3}, # or sampling={"num_frames": 8}
# Optional: cache_dir="/tmp/ray-video-cache", cache_mode="auto"
),
sampling_params=dict(max_tokens=128),
),
)
# 3) Build dataset and run
ds = ray.data.from_items([
{"video_url": "https://.../sample.mp4"},
{"video_url": "file:///.../local.mp4"},
])
# 4) Result contains "video"/"video_meta" columns (PIL.Image frames by default)
out = processor(ds).materialize()
out.show(1)Documentation should clarify:
- Minimal required fields: URL + sampling (fps or num_frames).
- Defaults: streaming download and PIL output; enabling cache_dir is recommended for stable random access.
- Output structure: "video" (list of frames) and "video_meta" (size, frame count, timestamps, etc.).
5. Testing Strategy (Unit Tests)
Unit tests only, focused on minimal functionality and robustness:
- Source resolution: parse http/https/file/data URI and validate illegal inputs.
- Sampling correctness: on a tiny mp4, validate fps and num_frames strategies (frame counts and monotonic timestamps).
- Defaults: when sampling is omitted, {"fps": 3} is applied.
- Failure isolation: unreachable URLs / corrupted files set failed=True without impacting other samples.
- Caching: with cache_dir, concurrent requests for the same URL avoid corrupted files (atomic write and reuse on hit).
- Output shape: default PIL output; numpy + channels_first shape checks.
This RFC adds simple, minimal, and intuitive video preparation to Ray LLM Batch without changing existing PrepareImageStage semantics. The default user experience is “URL + fps/num_frames → frames,” while streaming-first and on-demand caching balance fast startup and stability. PyAV is chosen for broad format support and precise timelines to ensure predictable sampling and testability.
Relate Issue
#56424 [Data][LLM] Extend Ray’s Batch Processing to Support Video Inputs for Multimodal Models