Skip to content

[RFC] Add PrepareVideoStage to Ray LLM Batch #56767

@pandalee99

Description

@pandalee99

RFC: Add PrepareVideoStage to Ray LLM Batch

1. Summary

This RFC proposes a new PrepareVideoStage for the Ray Data LLM batch pipeline. It aligns with the existing PrepareImageStage (same Stage/UDF shape), extracts video URLs from OpenAI Chat-format messages, performs download/decoding/sampling/preprocessing, and returns a sequence of image frames with metadata for downstream inference.

Minimal viable functionality (MVP):

  • Input: users only provide a video URL in messages (http/https/file/data URI).
  • Configuration: users only specify how to sample frames; default is FPS sampling (e.g., 3 frames/sec).
  • Output: a list of frames (PIL.Image by default), plus video size and frame timestamps.
  • All other parameters (caching, concurrency, output format) have sensible defaults and are optional.

PyAV (FFmpeg bindings) is chosen as the decoding backend because:

  • Broad support: FFmpeg covers the widest range of containers/codecs/network protocols, suitable for production batch pipelines.
  • Accurate timeline: exposes time_base/pts semantics, enabling a well-defined sampling timeline and testable behavior.
  • Streaming and local file support: enables a fast “stream-first” startup and a stable “cache-then-seek” path when random access is needed.

decord doesn't require any additional dependencies, but this tool is a bit outdated. Maybe we can set it as the fallback option?

Why support memory/disk caching options and default to streaming:

  • Streaming (default) reduces time-to-first-frame and avoids disk usage; ideal for light sampling (few frames).
  • Disk cache is better for heavy/random access sampling, reducing network jitter and timeouts.
  • In-memory cache (BytesIO) fits smaller videos or restricted environments; disk cache is optimal for stable reuse.

This stage matches PrepareImageStage: only sets fn=PrepareVideoUDF and get_required_input_keys() at the Stage level; parameters are injected into the UDF via the Processor’s fn_constructor_kwargs. This preserves consistent usage and backward compatibility.

2. Motivation

Modern VLMs (e.g., Qwen2.5-VL, LLaVA-OneVision) support video inputs. With only PrepareImageStage today, users must manually download and extract frames, which is error-prone and inconsistent. This stage standardizes “video → frame sequences” within the Ray Data LLM pipeline, improving usability, reliability, and alignment with vLLM multimodal inputs.

3. Design and Implementation

3.1 Alignment with Existing Stage

  • Stage shell:
    • Only declare fn = PrepareVideoUDF.
    • get_required_input_keys() mirrors PrepareImageStage and still requires "messages".
  • UDF parameters are injected via fn_constructor_kwargs from the Processor, avoiding custom Stage.init.

Minimal shell:

class PrepareVideoStage(StatefulStage):
    """A stage to prepare videos from OpenAI chat template messages."""
    fn: StatefulStageUDF = PrepareVideoUDF

    def get_required_input_keys(self) -> Dict[str, str]:
        return {
            "messages": "A list of messages in OpenAI chat format. See https://platform.openai.com/docs/api-reference/chat/create"
        }

3.2 Functionality and Minimal API (MVP)

  • Input:
    • Accepts in messages:
      • {"type": "video", "video": "<url_or_path_or_data_uri>"}
      • {"type": "video_url", "video_url": {"url": "<http/https/...>"}}
  • Required user-facing parameter (most important):
    • sampling: intuitive and minimal; choose one
      • {"fps": 3} (default, 3 frames per second)
      • {"num_frames": 8} (uniformly sample N frames)
  • Optional parameters (sane defaults; not required):
    • cache_dir=None, cache_mode="auto"|"disk"|"memory" (default "auto")
    • output_format="pil"|"numpy" (default "pil")
    • channels_first=False (only for numpy output)
    • timeout_s=30, max_concurrency=8, retries=2
    • pack_for_model=False (if True, directly pack multi_modal_data for downstream engines)
  • Output:
    • "video": List[List[Frame]] (per video in the request; Frame is PIL.Image or numpy.ndarray)
    • "video_meta": List[Dict], one per input video:
      • "video_size": [width, height]
      • "video_num_frames": int (number of frames returned)
      • "frame_timestamps": List[float] (seconds; computed via pts*time_base)
      • "source": str (canonicalized source, e.g., cached file path or original URL)
      • "failed": bool; "error": Optional[str]

Note: PrepareImageStage outputs "image"/"image_sizes". For videos we use "video"/"video_meta" with a consistent naming style that remains extensible.

3.3 Data Flow and Degradation Paths

  • Source resolution (http/https/file/data URI):
    • http/https: default to streaming (fast). When the sampling plan needs random seeks or many frames, "auto" switches to writing to cache_dir and decoding from disk (stable).
    • data URI: decode into BytesIO (in-memory).
    • file: open directly.
  • Caching (optional):
    • Filename: sha256(url)[:16] + original extension (if detectable); write to a temp file then atomic rename to avoid corruption under concurrency.
    • Concurrent hits: concurrent requests for the same URL result in a single successful write; others reuse the file after the rename.
  • Degradation:
    • If keyframe-accurate seek is not stable, fall back to the nearest frame and record time deviation; if FPS cannot be estimated, fall back to uniform sampling.
    • Prefer zero-copy conversion to rgb24; if unusual pixel formats are encountered, fall back to PIL conversion for correctness.

Why “stream-first + cache-on-demand”:

  • Fast first request with low overhead; cache only when sampling requires stable random access. This balances time-to-first-frame and stability/resource usage.

3.4 Concurrency and Failure Isolation

  • Concurrency: use asyncio.Semaphore(max_concurrency) for per-video concurrency; combine async network I/O with thread-pool decoding (await asyncio.to_thread(...)) to avoid blocking the event loop.
  • Failure isolation: per-video try/except; failed items return failed=True and error, without affecting other items in the batch (best-effort default).

3.5 Sampling Timeline (Testability)

  • Timeline: use FFmpeg time_base/pts; timestamp per frame t = pts * time_base (seconds), returned as frame_timestamps.
  • Strategies:
    • fps: fixed frames per second; truncate at video end. For VFR (variable frame rate), pick the nearest frame on the timeline.
    • num_frames: uniformly distribute N target time points across the duration; align to the nearest frame; if insufficient, allow repeating the last frame or a fallback policy.
  • Optional de-duplication: for VFR, optionally drop duplicate timestamps (off by default to keep the API simple).

3.6 Preprocessing and Zero-Copy

  • Default output is PIL.Image (familiar and easy to visualize/debug).
  • Array output: use frame.to_ndarray(format="rgb24") to minimize copies; with channels_first=True, return (T, C, H, W).
  • Optional preprocessing: resize/crop/convert. Use PIL for lightweight needs; higher-performance paths can be introduced in future iterations when needed.

3.7 Interop with Ray Data

  • Frame passthrough: if the upstream dataset already provides frames (e.g., "video" or "video_frames"), expose bypass_if_frames_present=True to reuse frames and only re-sample/preprocess to avoid redundant decoding.
  • Column naming aligned with PrepareImageStage, enabling composition (image + video) and consistent user expectations.

3.8 Minimal Pseudocode (core outline only)

class PrepareVideoUDF(StatefulStageUDF):
    def __init__(
        self,
        data_column: str,
        expected_input_keys: List[str],
        *,
        sampling: Dict[str, int] = None,      # {"fps": 3} or {"num_frames": 8}
        cache_dir: Optional[str] = None,
        cache_mode: str = "auto",             # "auto"|"disk"|"memory"
        output_format: str = "pil",           # "pil"|"numpy"
        channels_first: bool = False,
        timeout_s: float = 30.0,
        max_concurrency: int = 8,
        retries: int = 2,
        bypass_if_frames_present: bool = False,
        pack_for_model: bool = False,
    ):
        super().__init__(data_column, expected_input_keys)
        self.video = VideoProcessor(
            sampling=sampling or {"fps": 3},
            cache_dir=cache_dir,
            cache_mode=cache_mode,
            output_format=output_format,
            channels_first=channels_first,
            timeout_s=timeout_s,
            max_concurrency=max_concurrency,
            retries=retries,
            bypass_if_frames_present=bypass_if_frames_present,
            pack_for_model=pack_for_model,
        )

    def extract_video_info(self, messages: List[Dict]) -> List[str]:
        # Parse {"type": "video"} and {"type": "video_url"} and return a list of URLs/paths.
        return []

    async def udf(self, batch: List[Dict[str, Any]]):
        msgs = [row["messages"] for row in batch]
        all_video_info = [self.extract_video_info(m) for m in msgs]
        flat = [v for vs in all_video_info for v in vs]
        flat_processed = await self.video.process(flat)

        start = 0
        for i, per_req in enumerate(all_video_info):
            n = len(per_req)
            ret = {self.IDX_IN_BATCH_COLUMN: i}
            if n > 0:
                processed = flat_processed[start:start + n]
                ret.update({
                    "video": [p["frames"] for p in processed],
                    "video_meta": [p["meta"] for p in processed],
                })
                start += n
            yield ret

4. Usage and Documentation (Documentation & Examples)

Goal: the minimal workflow is “URL + simple sampling (e.g., 3 fps) → frames in output”. The stage returns frames automatically from messages. For vLLM integration, similar to has_image, provide an example with has_video=True. If users want to pass frames directly to multi_modal_data, enable pack_for_model=True.

Minimal example:

import ray
from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig

# 1) Configure processor (aligned with image usage)
processor = build_llm_processor(
    vLLMEngineProcessorConfig(
        model_source="Qwen/Qwen2.5-VL-3B-Instruct",
        engine_kwargs=dict(limit_mm_per_prompt={"video": 1}),
        has_video=True,  # analogous to has_image
    ),
    preprocess=lambda row: dict(
        # 2) Provide only a URL + intuitive sampling config (3 fps)
        messages=[{
            "role": "user",
            "content": [
                {"type": "text", "text": "Describe this video."},
                {"type": "video", "video": row["video_url"]},
            ],
        }],
        # Pass minimal parameters into PrepareVideoUDF
        prepare_video_kwargs=dict(
            sampling={"fps": 3},       # or sampling={"num_frames": 8}
            # Optional: cache_dir="/tmp/ray-video-cache", cache_mode="auto"
        ),
        sampling_params=dict(max_tokens=128),
    ),
)

# 3) Build dataset and run
ds = ray.data.from_items([
    {"video_url": "https://.../sample.mp4"},
    {"video_url": "file:///.../local.mp4"},
])

# 4) Result contains "video"/"video_meta" columns (PIL.Image frames by default)
out = processor(ds).materialize()
out.show(1)

Documentation should clarify:

  • Minimal required fields: URL + sampling (fps or num_frames).
  • Defaults: streaming download and PIL output; enabling cache_dir is recommended for stable random access.
  • Output structure: "video" (list of frames) and "video_meta" (size, frame count, timestamps, etc.).

5. Testing Strategy (Unit Tests)

Unit tests only, focused on minimal functionality and robustness:

  • Source resolution: parse http/https/file/data URI and validate illegal inputs.
  • Sampling correctness: on a tiny mp4, validate fps and num_frames strategies (frame counts and monotonic timestamps).
  • Defaults: when sampling is omitted, {"fps": 3} is applied.
  • Failure isolation: unreachable URLs / corrupted files set failed=True without impacting other samples.
  • Caching: with cache_dir, concurrent requests for the same URL avoid corrupted files (atomic write and reuse on hit).
  • Output shape: default PIL output; numpy + channels_first shape checks.

This RFC adds simple, minimal, and intuitive video preparation to Ray LLM Batch without changing existing PrepareImageStage semantics. The default user experience is “URL + fps/num_frames → frames,” while streaming-first and on-demand caching balance fast startup and stability. PyAV is chosen for broad format support and precise timelines to ensure predictable sampling and testability.

Relate Issue

#56424 [Data][LLM] Extend Ray’s Batch Processing to Support Video Inputs for Multimodal Models

cc @GuyStone @richardliaw @nrghosh

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions