[Data] Update document embedding benchmark to use canonical Ray Data API#57977
[Data] Update document embedding benchmark to use canonical Ray Data API#57977bveeramani merged 1 commit intomasterfrom
Conversation
…Data API This change modernizes the document embedding benchmark to follow Ray Data best practices and use the canonical API pattern. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
There was a problem hiding this comment.
Code Review
This pull request is a great improvement, refactoring the document embedding benchmark to use the idiomatic Ray Data API. The switch from take_all() and read_binary_files to the lazy download() expression is a significant enhancement for performance and memory efficiency, especially for large datasets. The code is now much cleaner and easier to follow with the single chained pipeline.
I have a suggestion to further improve the code's adherence to Ray Data best practices by using a col expression for filtering instead of a lambda function. This can make the code more declarative and potentially unlock further optimizations.
| import pymupdf | ||
| import ray | ||
| import ray.data | ||
| from ray.data.expressions import download |
There was a problem hiding this comment.
To make the code even more idiomatic and potentially more performant through expression optimization, consider importing col here. It can be used to replace the lambda in the filter operation below.
| from ray.data.expressions import download | |
| from ray.data.expressions import col, download |
| file_paths = ( | ||
| ( | ||
| ray.data.read_parquet(INPUT_PATH) | ||
| .filter(lambda row: row["file_name"].endswith(".pdf")) |
There was a problem hiding this comment.
Using a col expression is more idiomatic in Ray Data than using a lambda for simple filtering operations. This declarative style can also allow Ray Data to perform more optimizations on the execution plan.
| .filter(lambda row: row["file_name"].endswith(".pdf")) | |
| .filter(col("file_name").str.endswith(".pdf")) |
…API (#57977) ## Summary This PR updates the document embedding benchmark to use the canonical Ray Data implementation pattern, following best practices for the framework. ## Key Changes ### Use `download()` expression instead of separate materialization **Before:** ```python file_paths = ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .take_all() ) file_paths = [row["uploaded_pdf_path"] for row in file_paths] ds = ray.data.read_binary_files(file_paths, include_paths=True) ``` **After:** ```python ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .with_column("bytes", download("uploaded_pdf_path")) ``` This change: - Eliminates the intermediate materialization with `take_all()`, which loads all data into memory - Uses the `download()` expression to lazily fetch file contents as part of the pipeline - Removes the need for a separate `read_binary_files()` call ### Method chaining for cleaner code All operations are now chained in a single pipeline, making the data flow more clear and idiomatic. ### Consistent column naming Updated references from `path` to `uploaded_pdf_path` throughout the code for consistency with the source data schema. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…API (ray-project#57977) ## Summary This PR updates the document embedding benchmark to use the canonical Ray Data implementation pattern, following best practices for the framework. ## Key Changes ### Use `download()` expression instead of separate materialization **Before:** ```python file_paths = ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .take_all() ) file_paths = [row["uploaded_pdf_path"] for row in file_paths] ds = ray.data.read_binary_files(file_paths, include_paths=True) ``` **After:** ```python ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .with_column("bytes", download("uploaded_pdf_path")) ``` This change: - Eliminates the intermediate materialization with `take_all()`, which loads all data into memory - Uses the `download()` expression to lazily fetch file contents as part of the pipeline - Removes the need for a separate `read_binary_files()` call ### Method chaining for cleaner code All operations are now chained in a single pipeline, making the data flow more clear and idiomatic. ### Consistent column naming Updated references from `path` to `uploaded_pdf_path` throughout the code for consistency with the source data schema. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
…API (ray-project#57977) ## Summary This PR updates the document embedding benchmark to use the canonical Ray Data implementation pattern, following best practices for the framework. ## Key Changes ### Use `download()` expression instead of separate materialization **Before:** ```python file_paths = ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .take_all() ) file_paths = [row["uploaded_pdf_path"] for row in file_paths] ds = ray.data.read_binary_files(file_paths, include_paths=True) ``` **After:** ```python ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .with_column("bytes", download("uploaded_pdf_path")) ``` This change: - Eliminates the intermediate materialization with `take_all()`, which loads all data into memory - Uses the `download()` expression to lazily fetch file contents as part of the pipeline - Removes the need for a separate `read_binary_files()` call ### Method chaining for cleaner code All operations are now chained in a single pipeline, making the data flow more clear and idiomatic. ### Consistent column naming Updated references from `path` to `uploaded_pdf_path` throughout the code for consistency with the source data schema. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
…API (ray-project#57977) ## Summary This PR updates the document embedding benchmark to use the canonical Ray Data implementation pattern, following best practices for the framework. ## Key Changes ### Use `download()` expression instead of separate materialization **Before:** ```python file_paths = ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .take_all() ) file_paths = [row["uploaded_pdf_path"] for row in file_paths] ds = ray.data.read_binary_files(file_paths, include_paths=True) ``` **After:** ```python ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .with_column("bytes", download("uploaded_pdf_path")) ``` This change: - Eliminates the intermediate materialization with `take_all()`, which loads all data into memory - Uses the `download()` expression to lazily fetch file contents as part of the pipeline - Removes the need for a separate `read_binary_files()` call ### Method chaining for cleaner code All operations are now chained in a single pipeline, making the data flow more clear and idiomatic. ### Consistent column naming Updated references from `path` to `uploaded_pdf_path` throughout the code for consistency with the source data schema. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: Future-Outlier <eric901201@gmail.com>
…API (ray-project#57977) ## Summary This PR updates the document embedding benchmark to use the canonical Ray Data implementation pattern, following best practices for the framework. ## Key Changes ### Use `download()` expression instead of separate materialization **Before:** ```python file_paths = ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .take_all() ) file_paths = [row["uploaded_pdf_path"] for row in file_paths] ds = ray.data.read_binary_files(file_paths, include_paths=True) ``` **After:** ```python ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .with_column("bytes", download("uploaded_pdf_path")) ``` This change: - Eliminates the intermediate materialization with `take_all()`, which loads all data into memory - Uses the `download()` expression to lazily fetch file contents as part of the pipeline - Removes the need for a separate `read_binary_files()` call ### Method chaining for cleaner code All operations are now chained in a single pipeline, making the data flow more clear and idiomatic. ### Consistent column naming Updated references from `path` to `uploaded_pdf_path` throughout the code for consistency with the source data schema. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
…API (ray-project#57977) ## Summary This PR updates the document embedding benchmark to use the canonical Ray Data implementation pattern, following best practices for the framework. ## Key Changes ### Use `download()` expression instead of separate materialization **Before:** ```python file_paths = ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .take_all() ) file_paths = [row["uploaded_pdf_path"] for row in file_paths] ds = ray.data.read_binary_files(file_paths, include_paths=True) ``` **After:** ```python ( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .with_column("bytes", download("uploaded_pdf_path")) ``` This change: - Eliminates the intermediate materialization with `take_all()`, which loads all data into memory - Uses the `download()` expression to lazily fetch file contents as part of the pipeline - Removes the need for a separate `read_binary_files()` call ### Method chaining for cleaner code All operations are now chained in a single pipeline, making the data flow more clear and idiomatic. ### Consistent column naming Updated references from `path` to `uploaded_pdf_path` throughout the code for consistency with the source data schema. Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Signed-off-by: peterxcli <peterxcli@gmail.com>
Summary
This PR updates the document embedding benchmark to use the canonical Ray Data implementation pattern, following best practices for the framework.
Key Changes
Use
download()expression instead of separate materializationBefore:
After:
( ray.data.read_parquet(INPUT_PATH) .filter(lambda row: row["file_name"].endswith(".pdf")) .with_column("bytes", download("uploaded_pdf_path"))This change:
take_all(), which loads all data into memorydownload()expression to lazily fetch file contents as part of the pipelineread_binary_files()callMethod chaining for cleaner code
All operations are now chained in a single pipeline, making the data flow more clear and idiomatic.
Consistent column naming
Updated references from
pathtouploaded_pdf_paththroughout the code for consistency with the source data schema.