Skip to content

feat(api-server): Add support for streaming query results from files.#1636

Merged
hoophalab merged 14 commits into
y-scope:mainfrom
hoophalab:fsjob
Nov 25, 2025
Merged

feat(api-server): Add support for streaming query results from files.#1636
hoophalab merged 14 commits into
y-scope:mainfrom
hoophalab:fsjob

Conversation

@hoophalab

@hoophalab hoophalab commented Nov 20, 2025

Copy link
Copy Markdown
Contributor

Description

  1. If the job config sets write_to_disk as true, then fetch_result in client will stream results from the files.

Checklist

  • The PR satisfies the [contribution guidelines][y1. Send cope-contrib-guidelines].
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  1. Compress postgresql compressed to dataset postgresql.
  2. Post {"query_string": "line_num: 4", "dataset": "postgresql", "write_to_file": true} to http://localhost:3001/query
    Response: {"query_results_uri":"/query_results/10"}
  3. Get http://localhost:3001/query_results/10. The result is correct.

Summary by CodeRabbit

  • New Features

    • Configurable result streaming with per-job selectable storage backends (filesystem or database).
  • Improvements

    • Per-job routing of result streams to the chosen backend for consistent delivery.
    • Broader malformed-data detection for more resilient error handling.
    • New configuration options and sensible defaults for stream output storage.
  • Chores

    • Deployment now mounts host directories for staged and live stream output.

✏️ Tip: You can customize this high-level summary in your review settings.

@hoophalab hoophalab requested a review from a team as a code owner November 20, 2025 02:38
@coderabbitai

coderabbitai Bot commented Nov 20, 2025

Copy link
Copy Markdown
Contributor

Walkthrough

Adds stream-result delivery: new async/stream dependencies, API client returns a pinned enum stream selecting filesystem or Mongo backends based on job config, new stream-related config types and defaults, an rmp decode IsMalformedData impl, and Docker volume mounts for stream directories.

Changes

Cohort / File(s) Summary
Dependencies
components/api-server/Cargo.toml
Added async-stream = "0.3.6", pin-project-lite = "0.2.16", and tokio-stream = { version = "0.1.17", features = ["fs"] }.
API client streaming
components/api-server/src/client.rs
fetch_results now fetches job config and returns a pinned SearchResultStream enum with File and Mongo variants. Added SearchResultStream (pin-project!), get_job_config, fetch_results_from_file, fetch_results_from_mongo, file and Mongo stream implementations, Event/EventTuple helpers, and a Stream impl that delegates poll_next and logs errors.
Error handling
components/api-server/src/error.rs
Added impl IsMalformedData for rmp_serde::decode::Error.
Stream configuration types
components/clp-rust-utils/src/clp_config/package/config.rs
Added stream_output: StreamOutput to Config; introduced StreamOutput and StreamOutputStorage (Fs { directory }, S3 { staging_directory }) with Default -> Fs { directory: "var/data/streams" }.
Deployment volumes
tools/deployment/package/docker-compose-all.yaml
Added two volumes to api_server: ${CLP_STAGED_STREAM_OUTPUT_DIR_HOST:-empty}:/var/data/staged-streams and ${CLP_STREAM_OUTPUT_DIR_HOST:-empty}:/var/data/streams.

Sequence Diagram(s)

sequenceDiagram
    participant Caller
    participant APIClient as Client
    participant DB as JobConfig_DB
    participant FS as FileSystem
    participant Mongo as MongoDB

    Caller->>APIClient: fetch_results(search_job_id)
    APIClient->>DB: get_job_config(search_job_id)
    DB-->>APIClient: SearchJobConfig

    alt write_to_file == true
        APIClient->>FS: open/read stream files
        FS-->>APIClient: stream of serialized records
        APIClient->>APIClient: deserialize/map -> Result<String, ClientError>
        APIClient-->>Caller: SearchResultStream::File (stream)
    else
        APIClient->>Mongo: open cursor / stream docs
        Mongo-->>APIClient: cursor stream of docs
        APIClient->>APIClient: map docs -> Result<String, ClientError>
        APIClient-->>Caller: SearchResultStream::Mongo (stream)
    end

    Note right of APIClient `#DDEBF7`: SearchResultStream is a pinned enum delegating poll_next and logging errors
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • Inspect components/api-server/src/client.rs for correct pin-project usage, Stream impl soundness, and async lifetimes.
  • Verify MessagePack decoding and IsMalformedData impl in components/api-server/src/error.rs.
  • Confirm serde defaults, visibility, and Default impls in components/clp-rust-utils/src/clp_config/package/config.rs.
  • Validate docker-compose volume syntax and environment-variable substitution in tools/deployment/package/docker-compose-all.yaml.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(api-server): Add support for streaming query results from files.' clearly and accurately summarizes the main change in the pull request.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4025869 and 0067f60.

📒 Files selected for processing (1)
  • components/api-server/src/client.rs (4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.

Applied to files:

  • components/api-server/src/client.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: rust-checks
🔇 Additional comments (3)
components/api-server/src/client.rs (3)

19-51: QueryConfig defaults and mapping into SearchJobConfig look sound.

serde(deny_unknown_fields) plus #[serde(default)] give strict but forgiving request validation, and the From<QueryConfig> + submit_query normalisation (dataset fallback and max_num_results defaulting) cleanly wires the new write_to_file flag without changing existing behaviour.

Also applies to: 103-114


129-182: Polling + routing to FS vs Mongo reads clearly.

The exponential backoff loop around get_status is robust, and splitting get_job_config out keeps fetch_results focused on control flow. The early-return for file-backed jobs makes the Mongo vs FS routing straightforward and keeps indentation shallow.

Also applies to: 208-217


314-351: SearchResultStream enum with pin_project is a clean unification of backends.

The enum wrapper over the FS and Mongo streams, plus pin_project-based projection in the Stream impl, removes the need for unsafe projection or mutexes while still logging errors centrally. The docstring and explicit FileStream/MongoStream type parameters also make the intent clear.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
components/api-server/src/error.rs (1)

25-37: Remove blanket IsMalformedData impl for rmp_serde::decode::Error — IO failures are being misclassified

The blanket implementation treats all rmp_serde::decode::Error variants as malformed data, but the error enum explicitly distinguishes IO errors (InvalidMarkerRead, InvalidDataRead) from format/semantic problems (TypeMismatch, Syntax, Utf8Error, LengthMismatch, OutOfRange, Uncategorized, DepthLimitExceeded). Idiomatic Rust practice is to handle these separately—IO failures should not surface as ClientError::MalformedData, as this masks real transport/connection problems.

Either:

  • Remove the blanket impl and match on error variants to classify only format/semantic errors as malformed data, or
  • Use from_slice (which only produces deserialization/format errors) instead of from_read to guarantee malformed-data classification.
components/api-server/src/client.rs (1)

152-195: Fix stream!try_stream! and replace todo!() with structured error

Two critical issues block compilation and safe production deployment:

  1. stream! macro does not support ? operator

    The ? operator on read_dir, next_entry, and File::open inside the stream! block cannot work as intended. The stream! macro produces a generator that yields items; the ? operator would attempt to propagate errors out of the closure, not into the stream. This fails to compile. Use try_stream! instead, which supports ? and yields bare String values while automatically converting errors through existing From impls.

    Suggested change:

  • use async_stream::stream;
  • use async_stream::try_stream;
  •        let stream = stream! {
    
  •        let stream = try_stream! {
               let dir_path = dir_path;
               let mut entries = tokio::fs::read_dir(dir_path).await?;
               while let Some(entry) = entries.next_entry().await? {
                   let file_path = entry.path();
                   let reader = std::fs::File::open(file_path)?;
                   let mut deserializer = rmp_serde::Deserializer::new(reader);
                   while let Ok(e) = Deserialize::deserialize(&mut deserializer) {
                       let e: (i64, String, String, String, i64) = e;
    
  •                    yield Ok(e.1);
    
  •                    yield e.1;
                   }
               }
           };
    
    
    
  1. todo!() creates a production panic path

    If the api-server is configured with S3 storage and write_to_file=true, the todo!() at line 176 will crash the process. Replace it with a structured error using the existing ClientError::Io variant:

  •        let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
    
  •            todo!("Outputting query results to S3 is not supported for now.");
    
  •        };
    
  •        let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
    
  •            return Err(ClientError::Io(std::io::Error::new(
    
  •                std::io::ErrorKind::Other,
    
  •                "S3 stream output is not yet supported; please configure fs storage",
    
  •            )));
    
  •        };
    
    
    
components/clp-rust-utils/src/clp_config/package/config.rs (1)

18-31: Fix incomplete documentation comment for StreamOutput struct

The doc comment at lines 153–158 in components/clp-rust-utils/src/clp_config/package/config.rs contains a sentence fragment. Line 156 reads /// deserialization. without context, breaking the documentation clarity.

Verification confirms the default values are correctly synchronized with Python: both use "var/data/streams" for the Fs storage variant, and docker-compose mounts align with this path. However, the doc comment should be completed—likely intended to say something like "* This type is partially defined: unused fields are omitted and discarded through deserialization."

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 42c18c7 and c743212.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • components/api-server/Cargo.toml (2 hunks)
  • components/api-server/src/client.rs (3 hunks)
  • components/api-server/src/error.rs (1 hunks)
  • components/clp-rust-utils/src/clp_config/package/config.rs (3 hunks)
  • tools/deployment/package/docker-compose-all.yaml (1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).

Applied to files:

  • components/api-server/Cargo.toml
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.

Applied to files:

  • components/clp-rust-utils/src/clp_config/package/config.rs
📚 Learning: 2025-11-10T05:19:56.600Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1575
File: components/clp-py-utils/clp_py_utils/clp_config.py:602-607
Timestamp: 2025-11-10T05:19:56.600Z
Learning: In the y-scope/clp repository, the `ApiServer` class in `components/clp-py-utils/clp_py_utils/clp_config.py` does not need a `transform_for_container()` method because no other containerized service depends on the API server - it's only accessed from the host, so no docker-network communication is expected.

Applied to files:

  • tools/deployment/package/docker-compose-all.yaml
🧬 Code graph analysis (2)
components/clp-rust-utils/src/clp_config/package/config.rs (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
  • StreamOutput (553-561)
components/api-server/src/client.rs (1)
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
  • SearchJobConfig (85-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: rust-checks
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (4)
tools/deployment/package/docker-compose-all.yaml (1)

507-508: Streaming volume wiring for api-server looks consistent

Mounting CLP_STAGED_STREAM_OUTPUT_DIR_HOST and CLP_STREAM_OUTPUT_DIR_HOST into the api-server matches how other services (query-worker / webui / garbage-collector) see the same paths, so the filesystem-based result streaming should have access to the same directories. Just ensure the host dirs exist with the right ownership in packaged envs.

components/api-server/src/client.rs (2)

18-49: write_to_file is correctly plumbed into SearchJobConfig

The addition of write_to_file to QueryConfig and the From<QueryConfig> for SearchJobConfig mapping looks clean and keeps the Rust job config aligned with the Python SearchJobConfig field. This should make the DB-stored job_config sufficient to decide between Mongo- and FS-backed result streaming.


241-250: Original review comment is incorrect; the schema concern is inverted

The review suggests Python changes must remain compatible with Rust's deserialization, but verification shows the opposite is true. Python's SearchJobConfig contains fewer fields (6) than Rust's struct (11), not more. When Python serializes to MsgPack, Rust deserializes safely via the #[serde(default)] attribute, which fills missing fields (like write_to_file, ignore_case, path_filter) with defaults. The get_job_config call is read-only and does not write back to the database, so no round-trip risk exists. The design already handles Python having a subset of Rust fields without issue.

Likely an incorrect or invalid review comment.

components/api-server/Cargo.toml (1)

16-30: I'll verify the crate versions and their compatibility to ensure there are no known issues:

Cargo.lock is already committed; dependencies are compatible.

The async-stream 0.3.6 minimum supported Rust version is 1.65, and Tokio's current MSRV is 1.71. Since tokio-stream 0.1.17 is part of the Tokio ecosystem, it follows the same MSRV policy. The dependencies are compatible with no known issues.

The git status output confirms that Cargo.lock is already committed and in sync with Cargo.toml (working tree clean), so both concerns from the review have been addressed.

Comment thread components/api-server/src/client.rs Outdated
Comment on lines 176 to 193
todo!("Outputting query results to S3 is not supported for now.");
};
let mongodb::bson::Bson::String(message) = msg else {
return Err(ClientError::MalformedData);
let dir_path = std::path::Path::new("/")
.join(directory)
.join(search_job_id.to_string());
let stream = stream! {
let dir_path = dir_path;
let mut entries = tokio::fs::read_dir(dir_path).await?;
while let Some(entry) = entries.next_entry().await? {
let file_path = entry.path();
let reader = std::fs::File::open(file_path)?;
let mut deserializer = rmp_serde::Deserializer::new(reader);
while let Ok(e) = Deserialize::deserialize(&mut deserializer) {
let e: (i64, String, String, String, i64) = e;
yield Ok(e.1);
}
}
};

@coderabbitai coderabbitai Bot Nov 20, 2025

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider non-blocking I/O and deterministic ordering for FS results

Within the FS-backed stream:

  • std::fs::File::open and the rmp_serde::Deserializer over a blocking reader run on the Tokio worker thread. For large result files, this can block the reactor. Long term, consider using tokio::fs::File plus an async adapter if performance under load becomes an issue.
  • tokio::fs::read_dir does not guarantee directory entry order. If result files are named to encode ordering (e.g., numeric suffixes), you may want to collect and sort the paths before reading to keep the stream order deterministic across platforms.

Neither is a correctness bug for small results, but both may affect behaviour at scale.

🤖 Prompt for AI Agents
In components/api-server/src/client.rs around lines 176-193, the FS-backed
stream uses blocking std::fs::File::open and an rmp_serde Deserializer on the
Tokio worker thread and iterates tokio::fs::read_dir without a defined order;
change it to use non-blocking IO and deterministic ordering by collecting all
directory entries into a Vec<PathBuf> (await all entries from
tokio::fs::read_dir), sort that Vec by filename (or numeric suffix) to enforce
order, then for each path open the file asynchronously with
tokio::fs::File::open and convert it to an async reader (e.g.,
tokio_util::io::ReaderStream or tokio::io::AsyncReadExt adapters) or, if
rmp_serde requires a blocking reader, offload decoding to
tokio::task::spawn_blocking so deserialization does not block the reactor;
iterate the sorted paths and yield results from the async-safe reader/decoding
path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rmp_serde::Deserializer::new requires a blocking reader. We yield on each message. This shouldn't block the thread too long.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hoophalab, understood—if rmp_serde requires a blocking reader and you're yielding between messages, that's a reasonable tradeoff. As long as individual messages stay small, the reactor should get control back frequently enough. Thanks for clarifying!

Comment thread components/api-server/src/client.rs Outdated
Comment thread components/api-server/src/client.rs Outdated
Comment thread components/clp-rust-utils/src/clp_config/package/config.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/api-server/src/client.rs (1)

127-151: Update doc comment to reflect dual streaming paths.

The doc comment still mentions "parsed JSON value" but the function now returns String messages from either filesystem or MongoDB. Additionally, filesystem-specific errors (I/O errors, deserialization errors) are not documented.

Apply this diff to update the documentation:

     /// Asynchronously fetches the results of a completed search job.
     ///
     /// # Returns
     ///
-    /// A stream of the job's results on success. Each item in the stream is a [`Result`] that:
+    /// A stream of query result messages. Each item in the stream is a [`Result`] that:
     ///
     /// ## Returns
     ///
-    /// A parsed JSON value representing a search result on success.
+    /// A String message representing a search result on success.
     ///
     /// ## Errors
     ///
     /// Returns an error if:
     ///
-    /// * [`ClientError::MalformedData`] if a retrieved document does not contain a "message" field,
-    ///   or if the "message" field is not a BSON string.
-    /// * Forwards [`mongodb::error::Error`] produced by the `MongoDB` cursor item access.
-    /// * Forwards [`serde_json::from_str`]'s return values on failure.
+    /// * For filesystem-backed results: I/O errors when reading result files or deserialization
+    ///   errors when decoding MessagePack records.
+    /// * For MongoDB-backed results: [`ClientError::MalformedData`] if a document is missing
+    ///   the "message" field or it's not a BSON string, or [`mongodb::error::Error`] from cursor access.
     ///
     /// # Errors
     ///
     /// Returns an error if:
     ///
     /// * Forwards [`Client::get_status`]'s return values on failure.
-    /// * Forwards [`mongodb::Collection::find`]'s return values on failure.
+    /// * Forwards [`Client::get_job_config`]'s return values on failure.
♻️ Duplicate comments (1)
components/api-server/src/client.rs (1)

183-192: File ordering is non-deterministic.

tokio::fs::read_dir does not guarantee directory entry order. If query results are split across multiple files and order matters for result correctness or user experience, results may be returned in unpredictable order across different platforms or filesystems.

This was flagged in past review comments. If ordering matters, apply this approach:

             let stream = stream! {
                 let dir_path = dir_path;
-                let mut entries = tokio::fs::read_dir(dir_path).await?;
-                while let Some(entry) = entries.next_entry().await? {
-                    let file_path = entry.path();
+                let mut entries = tokio::fs::read_dir(&dir_path).await?;
+                let mut paths = Vec::new();
+                while let Some(entry) = entries.next_entry().await? {
+                    paths.push(entry.path());
+                }
+                // Sort by filename to ensure deterministic order
+                paths.sort();
+                
+                for file_path in paths {
                     let reader = std::fs::File::open(file_path)?;
                     let mut deserializer = rmp_serde::Deserializer::new(reader);
                     while let Ok(e) = Deserialize::deserialize(&mut deserializer) {
                         let e: (i64, String, String, String, i64) = e;
                         yield Ok(e.1);
                     }
                 }
             };
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c743212 and 28a43a9.

📒 Files selected for processing (1)
  • components/api-server/src/client.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/api-server/src/client.rs
🧬 Code graph analysis (1)
components/api-server/src/client.rs (1)
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
  • SearchJobConfig (85-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: rust-checks
🔇 Additional comments (2)
components/api-server/src/client.rs (2)

241-250: Erreur conversion is correctly implemented—no issues found.

The verification confirms that rmp_serde::decode::Error properly converts to ClientError through the blanket impl<T: IsMalformedData> From<T> for ClientError in error.rs (lines 31–33). Since rmp_serde::decode::Error implements IsMalformedData (line 29), the ? operator in get_job_config at line 249 will correctly propagate deserialization errors as ClientError::MalformedData.


178-180: Verify path construction with absolute directory values.

The path construction Path::new("/").join(directory).join(search_job_id.to_string()) assumes directory is relative. If directory is absolute (starts with /), Path::join will replace the base path rather than appending. The default config uses "var/data/streams" (relative), which produces /var/data/streams/{job_id} as expected. However, if users configure an absolute path like /custom/streams, the leading / will be ignored and replaced.

Run the following script to verify how directory paths are configured and whether absolute paths are supported:

Comment thread components/api-server/src/client.rs Outdated
Comment thread components/api-server/src/client.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
components/api-server/src/client.rs (3)

127-151: Update doc comment to reflect actual return type.

As noted in previous review comments, the doc comment states the stream returns "parsed JSON values", but both streaming paths (filesystem and MongoDB) actually return String messages.

Update the doc comment to accurately describe both streaming paths:

 /// Asynchronously fetches the results of a completed search job.
 ///
 /// # Returns
 ///
-/// A stream of the job's results on success. Each item in the stream is a [`Result`] that:
+/// A stream of String messages representing the job's results on success.
+/// Results are streamed from either the filesystem (when `write_to_file` is true)
+/// or MongoDB (when `write_to_file` is false). Each item in the stream is a [`Result`] that:
 ///
 /// ## Returns
 ///
-/// A parsed JSON value representing a search result on success.
+/// A String message representing a search result on success.

188-191: Hardcoded tuple deserialization remains unaddressed.

This concern was previously raised but not yet addressed. The hardcoded tuple (i64, String, String, String, i64) is fragile and not self-documenting. If the serialisation format changes, this code will fail silently or produce incorrect results.

As suggested in the previous review, define a named struct:

#[derive(Deserialize)]
struct QueryResultRecord {
    timestamp: i64,
    message: String,
    file_path: String,
    orig_file_id: String,
    line_num: i64,
}

Then update the deserialization:

-                    while let Ok(e) = Deserialize::deserialize(&mut deserializer) {
-                        let e: (i64, String, String, String, i64) = e;
-                        yield Ok(e.1);
+                    while let Ok(record) = QueryResultRecord::deserialize(&mut deserializer) {
+                        yield Ok(record.message);

269-271: Use safe projection instead of map_unchecked_mut.

This was previously flagged with a safe alternative, but the unsafe code remains unaddressed. While the safety reasoning is sound, eliminating unsafe improves code maintainability and auditability.

Apply the safe alternative suggested in the previous review:

     fn poll_next(
         mut self: Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Self::Item>> {
-        // SAFETY: The async runtime guarantees that `poll_next` is called exclusively. We only poll
-        // `self.0` and never move it out.
-        let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
-
-        let poll = inner.poll_next(cx);
+        let poll = self.0.as_mut().poll_next(cx);
         if let std::task::Poll::Ready(Some(Err(err))) = &poll {
             tracing::error!("An error occurred when streaming results: {}", err);
         }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 28a43a9 and 7cc2bec.

📒 Files selected for processing (1)
  • components/api-server/src/client.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/api-server/src/client.rs
🧬 Code graph analysis (1)
components/api-server/src/client.rs (1)
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
  • SearchJobConfig (85-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: rust-checks
🔇 Additional comments (5)
components/api-server/src/client.rs (5)

1-16: Imports look correct.

The new imports for Pin, async_stream, and StreamOutputStorage are appropriate for the filesystem-based streaming functionality.


195-214: MongoDB streaming path implementation is correct.

The error handling for missing or malformed "message" fields is appropriate, and the use of ResultsStream::new is consistent with the filesystem path.


241-251: get_job_config implementation looks correct.

The SQL query is properly parameterised, and the MessagePack deserialisation is appropriate for retrieving the job configuration.


181-193: Les conversions d'erreur pour ClientError sont correctement implémentées.

Vérification complète :

  • std::io::Error est convertie via l'attribut #[from] sur la variante Io (ligne 15 de error.rs)
  • rmp_serde::decode::Error est convertie via l'implémentation générique impl<T: IsMalformedData> From<T> for ClientError (lignes 30-33), qui mappe vers ClientError::MalformedData

Les opérations tokio::fs::read_dir(), entries.next_entry() et std::fs::File::open() propagent toutes std::io::Error, qui est correctement gérée. La désérialisation utilise while let Ok(e) plutôt que l'opérateur ?, donc les erreurs de décodage sont implicitement gérées par l'implémentation générique.


178-180: Path construction relies on undocumented assumption about directory being relative.

The code joins directory with "/" to create an absolute path, but the Rust config loading (unlike Python) does not normalize paths during deserialization. This means:

  • Configuration defaults show relative paths: "var/data/streams"
  • The code assumes directory remains relative and forces it absolute by prepending "/"
  • If the configuration is later changed or normalized elsewhere, this logic breaks

Verify that:

  1. directory will always be a relative path when passed to this code
  2. This is the intended way to construct the path (not use it as-is if already absolute)
  3. Consider aligning with Python's path normalization approach during config loading instead of at usage time

Comment thread components/api-server/src/client.rs Outdated
let job_config = self.get_job_config(search_job_id).await?;
if job_config.write_to_file {
let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
todo!("Outputting query results to S3 is not supported for now.");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

todo!() macro will panic if S3 storage is configured.

This is a production blocker. If StreamOutputStorage::S3 is configured, the code will panic at runtime rather than returning a proper error.

Replace the todo!() with a proper error return:

-            let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
-                todo!("Outputting query results to S3 is not supported for now.");
+            let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
+                return Err(ClientError::UnsupportedOperation(
+                    "S3-based query result streaming is not yet supported".to_string()
+                ));
             };

Note: This assumes ClientError has an UnsupportedOperation variant or similar. If not, use an appropriate existing variant or add one.

Run this script to check if ClientError has an appropriate variant for unsupported operations:


🏁 Script executed:

#!/bin/bash
# Check ClientError enum variants
ast-grep --pattern 'pub enum ClientError {
  $$$
}'

Length of output: 965


I need to inspect the full code context around the todo!() to verify all concerns and determine the appropriate error handling approach.



🏁 Script executed:

#!/bin/bash
# Inspect lines 170-200 of client.rs to see full context
sed -n '170,200p' components/api-server/src/client.rs | cat -n

Length of output: 1670


🏁 Script executed:

#!/bin/bash
# Check if ClientError is used in result returns in client.rs
rg -n 'Result.*ClientError' components/api-server/src/client.rs -A2 -B2 | head -40

Length of output: 2008


🏁 Script executed:

#!/bin/bash
# Search for other error handling patterns in the file
rg -n 'Err\(' components/api-server/src/client.rs | head -20

Length of output: 263


🏁 Script executed:

#!/bin/bash
# Check if there's a pattern of adding new error variants when needed
git log --oneline components/api-server/src/error.rs | head -10

Length of output: 129


🏁 Script executed:

#!/bin/bash
# Verify if error.rs is the canonical location for error types
fd error.rs --type f

Length of output: 73


🏁 Script executed:

#!/bin/bash
# Quick sanity check: does UnsupportedOperation exist anywhere in codebase?
rg 'UnsupportedOperation' --type rust

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check if there's a generic error construction pattern (String-based, etc.)
rg 'ClientError::' components/api-server/src/ -n | grep -v test | head -15

Length of output: 444


Add error handling for unsupported S3 streaming and define appropriate ClientError variant.

The todo!() macro will panic at runtime if StreamOutputStorage::S3 is configured—this is a production blocker. The original suggestion references a non-existent ClientError::UnsupportedOperation variant. You must either add this variant to the error enum, or use an alternative approach.

Recommended fix: Add a new variant to ClientError in components/api-server/src/error.rs:

 pub enum ClientError {
     #[error("`sqlx::Error`: {0}")]
     Sql(#[from] sqlx::Error),
     
     #[error("`mongodb::error::Error`: {0}")]
     Mongo(#[from] mongodb::error::Error),
     
     #[error("Query is not succeeded")]
     QueryNotSucceeded,
     
     #[error("`std::io::Error`: {0}")]
     Io(#[from] std::io::Error),
     
     #[error("Malformed data")]
     MalformedData,
+    
+    #[error("{0}")]
+    UnsupportedOperation(String),
 }

Then in components/api-server/src/client.rs (line 176), replace:

-                todo!("Outputting query results to S3 is not supported for now.");
+                return Err(ClientError::UnsupportedOperation(
+                    "S3-based query result streaming is not yet supported".to_string()
+                ));

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In components/api-server/src/client.rs around line 176 the code uses todo!()
when StreamOutputStorage::S3 is selected which will panic; add a new ClientError
variant (e.g., UnsupportedOutputStorage(String) or UnsupportedOperation { op:
String, detail: String }) to components/api-server/src/error.rs, implement
Display/From as needed, and then replace the todo!() at line 176 with an early
return Err(ClientError::UnsupportedOutputStorage("streaming query results to
S3".into())) (or the equivalent variant you added) so the unsupported path
returns a proper error instead of panicking.

Comment thread components/clp-rust-utils/src/clp_config/package/config.rs Outdated
Comment thread components/api-server/src/client.rs
Comment thread components/api-server/src/client.rs Outdated
Comment on lines +208 to +212
while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
let event: (i64, String, String, String, i64) = event;
let message = event.1;
yield Ok(message);
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tuple is indeed hard to read, but clp-s serializes the message in a tuple to save the bandwidth of transferring key names. The format is inherited from NetworkAddress output's format. We probably cannot do more here unless there is an automatic way to serialize a tuple into a struct.

I did something here. Does this read better?

This comment was marked as resolved.

Comment on lines +181 to +189
if job_config.write_to_file {
Ok(ResultsStream::File {
inner: self.fetch_results_from_file(search_job_id),
})
} else {
self.fetch_results_from_mongo(search_job_id)
.await
.map(|s| ResultsStream::Mongo { inner: s })
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this? Or we should still put everything in one function.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think this is clear. I added docstrings for both methods. Splitting them makes it easier to document tbh

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
components/api-server/src/client.rs (1)

34-47: write_to_file wiring looks consistent; confirm job-config compatibility.

The new write_to_file field on QueryConfig and its propagation into SearchJobConfig via From<QueryConfig> look coherent and default-safe for new requests (serde default → false). Please double-check that SearchJobConfig’s deserialisation for job_config rows is backward compatible (e.g., #[serde(default)] on any new fields) so existing jobs in the DB can still be decoded without errors when get_job_config is called.

♻️ Duplicate comments (2)
components/api-server/src/client.rs (2)

128-162: fetch_results return type and docs are out of sync with actual behaviour.

The new return type Result<SearchResultStream<impl Stream<Item = Result<String, ClientError>>, _>, ClientError> makes the API return a stream of String messages (from Mongo or FS), but the doc comment still talks about “A parsed JSON value representing a search result on success” and mentions serde_json::from_str, which is no longer used here. Please update the docblock to describe that this returns a SearchResultStream of message Strings and adjust the error bullets to match the current implementation.


192-216: Replace todo!() for S3 with a proper error and consider FS streaming edge cases.

  • The todo!() when StreamOutputStorage::S3 is selected will still panic in production if that configuration is ever enabled. This should instead return a ClientError variant (e.g., UnsupportedOperation / UnsupportedOutputStorage) so callers get a structured failure rather than a crash.
  • The FS streaming loop correctly deserialises the tuple and yields message, but tokio::fs::read_dir does not guarantee ordering of entries. If file naming encodes ordering, you may want to collect and sort the paths before reading to keep result order deterministic across platforms.
  • The while let Ok(event) = Deserialize::deserialize(&mut deserializer) pattern will treat any decode error as end-of-stream. If corruption of a result file is possible, consider distinguishing expected EOF from malformed data (e.g., via your IsMalformedData helper) so truly malformed files surface as ClientError::MalformedData instead of silently truncating the stream.

The first point is a correctness / robustness blocker; the latter two are more about predictable behaviour and diagnosability.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f5a8966 and b8fe10b.

📒 Files selected for processing (1)
  • components/api-server/src/client.rs (4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.

Applied to files:

  • components/api-server/src/client.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: rust-checks
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (3)
components/api-server/src/client.rs (3)

180-189: Routing on job_config.write_to_file is clear and keeps fetch_results small.

Using get_job_config to decide between FS and Mongo backends and returning the appropriate SearchResultStream variant keeps fetch_results nicely focused on status polling and dispatch. This also avoids the earlier in-function branching complexity that was mentioned in prior review comments.


218-240: Mongo-backed streaming helper is clean and matches error semantics.

fetch_results_from_mongo nicely encapsulates the Mongo path, and the mapping to Result<String, ClientError> with ClientError::MalformedData for missing/non-string "message" fields aligns with the documented error semantics. This keeps the main fetch_results body simpler while preserving the existing behaviour.


266-275: Schema evolution is already protected—struct-level #[serde(default)] is in place.

The SearchJobConfig struct (components/clp-rust-utils/src/job_config/search.rs) already has #[serde(default)] at the struct level, which provides the schema evolution safety you're requesting. When deserializing legacy rows:

  • Missing fields automatically use Default::default() for their type
  • The write_to_file: bool field will safely deserialize to false if absent from older serialized data
  • Option<_> fields will deserialize to None

This design already handles backward compatibility without requiring per-field #[serde(default)] annotations or converting all fields to Option<_>. The deserialization in get_job_config will not fail on legacy jobs.

Likely an incorrect or invalid review comment.

Comment thread components/api-server/src/client.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
components/api-server/src/client.rs (2)

128-162: Update fetch_results doc-comment to reflect String-based, FS-or-Mongo streaming

The doc-comment still refers to “parsed JSON value” results and Mongo/serde_json::from_str errors, but fetch_results now returns a SearchResultStream<…> yielding Result<String, ClientError> from either Mongo or the filesystem. This mismatch will mislead API consumers.

Please update the comment to describe:

  • that the stream yields raw log/message Strings, not parsed JSON; and
  • that results may come from Mongo or FS, with corresponding IO / decode errors rather than serde_json.

196-201: Replace todo!() for S3 storage with a proper error path

If stream_output.storage is ever configured as StreamOutputStorage::S3, this todo!() will panic the API server at runtime, which is not acceptable for a configuration-driven feature.

Given that this is an unsupported backend rather than a programmer error, it should return a ClientError instead of panicking. Because fetch_results_from_file currently cannot return a Result, you likely want to change its signature and propagate the error from fetch_results:

-    fn fetch_results_from_file(
-        &self,
-        search_job_id: u64,
-    ) -> impl Stream<Item = Result<String, ClientError>> + use<> {
-        let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
-            todo!("Outputting query results to S3 is not supported for now.");
-        };
+    fn fetch_results_from_file(
+        &self,
+        search_job_id: u64,
+    ) -> Result<impl Stream<Item = Result<String, ClientError>> + use<>, ClientError> {
+        let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
+            // Prefer a dedicated variant like `UnsupportedOperation` or `UnsupportedOutputStorage`.
+            return Err(ClientError::UnsupportedOperation(
+                "S3-based query result streaming is not yet supported".to_string(),
+            ));
+        };
@@
-        let stream = /* existing stream! / try_stream! body */;
-        stream
+        let stream = /* existing stream! / try_stream! body */;
+        Ok(stream)
     }

and in fetch_results:

-        if job_config.write_to_file {
-            Ok(SearchResultStream::File {
-                inner: self.fetch_results_from_file(search_job_id),
-            })
-        } else {
-            self.fetch_results_from_mongo(search_job_id)
-                .await
-                .map(|s| SearchResultStream::Mongo { inner: s })
-        }
+        if job_config.write_to_file {
+            Ok(SearchResultStream::File {
+                inner: self.fetch_results_from_file(search_job_id)?,
+            })
+        } else {
+            self.fetch_results_from_mongo(search_job_id)
+                .await
+                .map(|s| SearchResultStream::Mongo { inner: s })
+        }

Adjust the concrete ClientError variant name to match your existing enum.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b8fe10b and a59532d.

📒 Files selected for processing (1)
  • components/api-server/src/client.rs (4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.

Applied to files:

  • components/api-server/src/client.rs
🧬 Code graph analysis (1)
components/api-server/src/client.rs (2)
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
  • SearchJobConfig (85-104)
components/api-server/src/bin/api_server.rs (1)
  • query (120-127)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: rust-checks
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: lint-check (macos-15)
🔇 Additional comments (4)
components/api-server/src/client.rs (4)

21-47: Plumbing of write_to_file from API to SearchJobConfig looks correct

QueryConfig.write_to_file is serialised from the HTTP request, defaulted via #[serde(default)], and passed through From<QueryConfig> for SearchJobConfig. That matches the Python-side SearchJobConfig.write_to_file field, so the job-orchestration layer should see the intended flag.


180-189: Branching on write_to_file after job completion is a clean separation

Fetching the stored SearchJobConfig and then routing to either FS-backed or Mongo-backed streaming based on write_to_file keeps fetch_results high-level and reuses the same polling logic for both backends. This matches the PR’s objective and the Python SearchJobConfig semantics.


278-309: SearchResultStream enum + Stream impl cleanly encapsulate FS/Mongo backends

The SearchResultStream enum with pin_project! and a manual Stream impl that delegates poll_next to the projected inner variant is a nice way to unify the FS and Mongo result streams without unsafe code or extra locking. Error logging in poll_next is also appropriate and keeps concerns local to this adapter type.


266-275: Verification complete—error handling is correct and consistent.

All concerns have been confirmed as properly addressed:

  • SearchJobConfig derives Deserialize (line 11 of components/clp-rust-utils/src/job_config/search.rs)
  • ClientError implements proper conversion via the IsMalformedData trait pattern: rmp_serde::decode::Error implements IsMalformedData, and a blanket From<T: IsMalformedData> for ClientError maps all such errors to ClientError::MalformedData
  • The ? operator in get_job_config at line 273 correctly propagates decode failures as the intended MalformedData variant
  • Approach mirrors the serialization method (to_vec_named for storage, from_slice for deserialization)

The error handling is sound and follows the established pattern in the codebase.

Comment on lines +202 to +214
let stream = stream! {
let mut entries = tokio::fs::read_dir(search_job_output_dir).await?;
while let Some(entry) = entries.next_entry().await? {
let search_result_path = entry.path();
let reader = std::fs::File::open(search_result_path)?;
let mut deserializer = rmp_serde::Deserializer::new(reader);
while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
let event: (i64, String, String, String, i64) = event;
let message = event.1;
yield Ok(message);
}
}
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Use try_stream! instead of stream! when using ? and emitting Result items, and surface decode errors

Two issues in the FS-backed stream:

  1. async_stream::stream! + ? with Result items

    The body uses ? on async/IO operations and yields Ok(message), i.e. Item = Result<String, ClientError>, but the macro is stream!. Per the async-stream docs, ? is only supported in try_stream! and is specifically designed for Item = Result<T, E>. With stream!, this will either fail to compile or not propagate errors as intended.

    Recommended change:

-use async_stream::stream;
+use async_stream::try_stream;
@@

  •    let stream = stream! {
    
  •    let stream = try_stream! {
           let mut entries = tokio::fs::read_dir(search_job_output_dir).await?;
           while let Some(entry) = entries.next_entry().await? {
               let search_result_path = entry.path();
               let reader = std::fs::File::open(search_result_path)?;
               let mut deserializer = rmp_serde::Deserializer::new(reader);
    
  •            while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
    
  •            while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
                   let event: (i64, String, String, String, i64) = event;
                   let message = event.1;
                   yield Ok(message);
               }
           }
       };
    
    
    This keeps the public return type the same (`impl Stream<Item = Result<String, ClientError>>`) while ensuring IO errors become `Err(ClientError)` items rather than panics or compile errors.
    
    
  1. Decode errors are silently treated as end-of-stream

    while let Ok(event) = Deserialize::deserialize(&mut deserializer) will break the loop on any rmp_serde::decode::Error, including genuinely malformed MessagePack, and simply stop yielding items. That can truncate results without signalling an error.

    Since you have a ClientError::MalformedData and an IsMalformedData implementation for rmp_serde::decode::Error, consider distinguishing EOF from other decode failures, e.g. (pseudocode):

    loop {
        match Deserialize::deserialize(&mut deserializer) {
            Ok(event) => {
                let event: (i64, String, String, String, i64) = event;
                let message = event.1;
                yield Ok(message);
            }
            Err(err) if err.is_eof() => break,        // or matches EOF-style error
            Err(err) => break yield Err(err.into()),  // surface malformed data
        }
    }

    This way, truly corrupted files show up as Err(ClientError::MalformedData) rather than silent truncation.


🌐 Web query:

Check the async-stream crate docs to confirm that `try_stream!` is the macro intended for fallible streams using the `?` operator, and that `stream!` does not support `?` for propagating errors.

💡 Result:

Yes — async-stream provides two macros:

  • try_stream! — for fallible streams; it sets Item = Result<T, E> and supports using the ? operator. [1][2]
  • stream! — for non-fallible streams; it yields plain values and does not support using ? to propagate errors. [1][2]

Sources: async-stream docs and README. [1] https://docs.rs/async-stream (crate page) [2] https://github.com/tokio-rs/async-stream (README)


Use try_stream! instead of stream! when using ? and emitting Result items, and surface decode errors

Two issues in the FS-backed stream:

  1. async_stream::stream! + ? with Result items

    The body uses ? on async/IO operations and yields Ok(message), i.e. Item = Result<String, ClientError>, but the macro is stream!. Per the async-stream docs, try_stream! is for fallible streams with Item = Result<T, E> and supports the ? operator, while stream! is for non-fallible streams and does not support ? to propagate errors. With stream!, this will either fail to compile or not propagate errors as intended.

    Recommended change:

-use async_stream::stream;
+use async_stream::try_stream;
@@

  •    let stream = stream! {
    
  •    let stream = try_stream! {
           let mut entries = tokio::fs::read_dir(search_job_output_dir).await?;
           while let Some(entry) = entries.next_entry().await? {
               let search_result_path = entry.path();
               let reader = std::fs::File::open(search_result_path)?;
               let mut deserializer = rmp_serde::Deserializer::new(reader);
    
  •            while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
    
  •            while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
                   let event: (i64, String, String, String, i64) = event;
                   let message = event.1;
                   yield Ok(message);
               }
           }
       };
    
    
    This keeps the public return type the same (`impl Stream<Item = Result<String, ClientError>>`) while ensuring IO errors become `Err(ClientError)` items rather than panics or compile errors.
    
    
  1. Decode errors are silently treated as end-of-stream

    while let Ok(event) = Deserialize::deserialize(&mut deserializer) will break the loop on any rmp_serde::decode::Error, including genuinely malformed MessagePack, and simply stop yielding items. That can truncate results without signalling an error.

    Since you have a ClientError::MalformedData and an IsMalformedData implementation for rmp_serde::decode::Error, consider distinguishing EOF from other decode failures, e.g.:

    loop {
        match Deserialize::deserialize(&mut deserializer) {
            Ok(event) => {
                let event: (i64, String, String, String, i64) = event;
                let message = event.1;
                yield Ok(message);
            }
            Err(err) if err.is_eof() => break,
            Err(err) => yield Err(err.into()),
        }
    }

    This way, truly corrupted files surface as Err(ClientError::MalformedData) rather than silent truncation.

🤖 Prompt for AI Agents
In components/api-server/src/client.rs around lines 202 to 214, the stream body
currently uses async_stream::stream! with `?` and yields Result items and also
swallows decode errors; change the macro to async_stream::try_stream! so `?`
propagates as Err items, and replace the `while let Ok(event) =
Deserialize::deserialize(&mut deserializer)` loop with a match that
distinguishes EOF from other decode failures (on Ok(event) yield the message, on
Err(err) if err.is_eof() break, else yield Err(err.into())), ensuring IO and
decode errors become Err(ClientError) stream items rather than silently ending
the stream.

@LinZhihao-723 LinZhihao-723 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The enum implementation is clear now, lgtm.
The comments are mostly about docstrings and naming conventions.

&self,
search_job_id: u64,
) -> impl Stream<Item = Result<String, ClientError>> + use<> {
let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: directory -> stream_storage_dir?
directory seems too general.

Comment on lines +208 to +212
while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
let event: (i64, String, String, String, i64) = event;
let message = event.1;
yield Ok(message);
}

This comment was marked as resolved.

Comment thread components/api-server/src/client.rs
Comment thread components/api-server/src/client.rs
Comment thread components/api-server/src/client.rs
Comment thread components/api-server/src/client.rs
Comment thread components/api-server/src/client.rs
Comment thread components/api-server/src/client.rs
hoophalab and others added 4 commits November 25, 2025 02:09
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
components/api-server/src/client.rs (5)

128-162: Update fetch_results docstring to match SearchResultStream and String messages.

The docstring still talks about “parsed JSON values” and Mongo-only error paths, but fetch_results now returns a SearchResultStream<..> of Result<String, ClientError> that can be backed by either files or Mongo. Please update the docs to describe the two backends (File vs Mongo) and the fact that each item is a log message String, and move Mongo/FS-specific error details into the respective helper docs.


222-235: Use try_stream! for fallible FS streams and surface decode errors.

This stream body uses ? on fallible ops (read_dir, next_entry, File::open) and yields Result<String, ClientError> items, but it is wrapped in async_stream::stream!. Per async-stream’s design, try_stream! is the macro intended for fallible streams using ? and Item = Result<T, E>; using stream! here is likely a compile-time error or, at best, wrong error propagation. The decode loop also treats any rmp_serde decode error as EOF and silently truncates the stream.

Consider:

  • Switching to try_stream! so ? produces Err(ClientError) items instead of panics/compile failures.
  • Matching on Deserialize::deserialize(&mut deserializer) and distinguishing normal EOF from malformed data, using your existing ClientError::MalformedData / IsMalformedData plumbing so corrupted files yield an error item instead of silently ending the stream.

Example (partial) change:

- use async_stream::stream;
+ use async_stream::try_stream;
@@
-        let stream = stream! {
+        let stream = try_stream! {
             let mut entries = tokio::fs::read_dir(search_job_output_dir).await?;
             while let Some(entry) = entries.next_entry().await? {
                 let search_result_path = entry.path();
                 let reader = std::fs::File::open(search_result_path)?;
                 let mut deserializer = rmp_serde::Deserializer::new(reader);
-                while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
-                    let event: (i64, String, String, String, i64) = event;
-                    let message = event.1;
-                    yield Ok(message);
-                }
+                // TODO: handle EOF vs malformed data explicitly and map non-EOF errors to ClientError.
+                while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
+                    let event: (i64, String, String, String, i64) = event;
+                    let message = event.1;
+                    yield Ok(message);
+                }
             }
         };
Confirm in the async-stream (v0.3.x) docs that `try_stream!` is the macro intended for fallible streams with `Item = Result<T, E>` and use of the `?` operator, and that `stream!` is for infallible streams that do not propagate `?` as `Err` items.

223-227: Blocking FS I/O and non-deterministic directory ordering are acceptable for now but may bite at scale.

This path uses std::fs::File::open (blocking) on the Tokio worker thread and iterates tokio::fs::read_dir without sorting entries, so:

  • Large result files could block the reactor if deserialisation is slow.
  • Result ordering may differ between platforms/FS implementations.

If you expect large files or care about strict ordering, consider offloading deserialisation to spawn_blocking and/or collecting and sorting paths before reading. Not a correctness bug for small result sets, just something to keep in mind.


228-231: Tuple-based deserialisation is still brittle; consider a named struct or at least documenting the layout.

Deserialising into (i64, String, String, String, i64) and then using event.1 is hard to read and tightly couples this code to the exact positional layout of the MessagePack tuple. Even if you must keep the tuple format for on-the-wire efficiency, wrapping it in a small Event struct (via #[serde(from)]) or adding a clear comment describing each element would improve maintainability and make future format changes less error-prone.


238-265: fetch_results_from_mongo docstring is out of date (no JSON parsing anymore).

The implementation now maps the "message" BSON field directly to a String and does not call serde_json::from_str, yet the docstring still claims it returns “parsed JSON values” and forwards serde_json::from_str errors. Please update the docs to say that each item is a log message String and list only the actual error cases: ClientError::MalformedData for missing/non-string "message" and forwarded mongodb::error::Error from the cursor and Collection::find.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a59532d and 0ae4bf6.

📒 Files selected for processing (1)
  • components/api-server/src/client.rs (4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.

Applied to files:

  • components/api-server/src/client.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: lint-check (ubuntu-24.04)
  • GitHub Check: rust-checks
🔇 Additional comments (3)
components/api-server/src/client.rs (3)

180-189: Routing on persisted write_to_file flag looks correct.

Using get_job_config to branch on the stored write_to_file flag and returning SearchResultStream::File vs ::Mongo keeps the API clean and ensures the decision matches the job’s original configuration. This separation into helper methods is clear.


310-319: get_job_config helper cleanly encapsulates MsgPack deserialisation.

Reading the job_config column and deserialising with rmp_serde::from_slice into SearchJobConfig centralises this logic and keeps fetch_results focused on routing. This looks correct and makes future config schema changes easier to manage.


322-359: SearchResultStream enum + pin_project is a solid abstraction over FS and Mongo streams.

The enum-based wrapper with explicit FileStream/MongoStream type parameters and a Stream impl that delegates poll_next and logs errors removes the earlier unsafe projection while keeping the SSE-facing type concrete. This is clear and idiomatic for combining multiple backend streams.

Comment on lines +216 to +218
let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
todo!("Outputting query results to S3 is not supported for now.");
};

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Replace todo!() for S3 output with a proper ClientError instead of panicking.

If StreamOutputStorage::S3 is ever configured, this todo!() will panic the server at runtime. Even if S3-backed streaming is not supported yet, it should return a structured ClientError (e.g., “unsupported output storage for streaming”) so misconfiguration fails gracefully.


🏁 Script executed:

#!/bin/bash
# Inspect where S3 storage is used to assess how reachable this panic is.
rg -n "StreamOutputStorage::S3" -S

Length of output: 0


I need to investigate further to determine whether the StreamOutputStorage::S3 variant actually exists and how reachable the panic is.


🏁 Script executed:

#!/bin/bash
# Search for StreamOutputStorage enum definition
rg -n "enum StreamOutputStorage" -A 10

Length of output: 936


🏁 Script executed:

#!/bin/bash
# Search more broadly for StreamOutputStorage without case sensitivity constraints
rg -n "StreamOutputStorage" -C 3

Length of output: 2816


🏁 Script executed:

#!/bin/bash
# Look for storage config in general
rg -n "\.storage" -C 2

Length of output: 40829


🏁 Script executed:

#!/bin/bash
# Check if client.rs file exists and search for relevant code
find . -name "client.rs" -type f | head -5

Length of output: 128


🏁 Script executed:

#!/bin/bash
# Search for S3 references with case-insensitive matching
rg -in "s3" | head -20

Length of output: 1696


🏁 Script executed:

#!/bin/bash
# Look for the config structure definition
rg -n "struct.*config" -i -C 2 | head -40

Length of output: 3074


🏁 Script executed:

#!/bin/bash
# Search for StreamOutputStorage import/definition in related files
fd -e rs | xargs grep -l "StreamOutputStorage" 2>/dev/null

Length of output: 94


🏁 Script executed:

#!/bin/bash
# Look at components/api-server/src/ directory structure
find components/api-server/src -name "*.rs" -type f

Length of output: 147


🏁 Script executed:

#!/bin/bash
# Find and examine the config structures
rg -n "stream_output" -C 5 components/api-server/src/

Length of output: 966


🏁 Script executed:

#!/bin/bash
# Get the actual lines 216-218 from client.rs to verify the issue
sed -n '210,225p' components/api-server/src/client.rs

Length of output: 767


🏁 Script executed:

#!/bin/bash
# Search for ClientError to understand error handling patterns
rg -n "ClientError" -A 2 -B 2 components/api-server/src/client.rs | head -50

Length of output: 2470


🏁 Script executed:

#!/bin/bash
# Look for all error types defined
rg -n "enum.*Error\|struct.*Error" components/api-server/src/ -A 3

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check for todo! macros in the codebase to see if there are other examples
rg -n "todo!\(" components/api-server/src/

Length of output: 118


🏁 Script executed:

#!/bin/bash
# Look for stream_output storage definition/config
rg -n "storage:" -C 5 components/api-server/src/

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check what enum variants are supported
rg -n "Fs\|S3" components/api-server/src/ | head -20

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look in config or types files for storage enum
find components/api-server -name "*.rs" | xargs grep -l "StreamOutputStorage\|OutputStorage" 2>/dev/null

Length of output: 35


🏁 Script executed:

#!/bin/bash
# Get comprehensive view of the storage enum and match pattern
rg -n "StreamOutputStorage" -B 20 -A 20 components/api-server/src/

Length of output: 5043


🏁 Script executed:

#!/bin/bash
# Look at what config sections exist
rg -n "struct.*Config\|pub.*config" -C 3 components/api-server/src/client.rs

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check if there's a config or config module
find components/api-server -name "*config*" -type f

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Examine the complete context around line 216
sed -n '200,230p' components/api-server/src/client.rs

Length of output: 1437


🏁 Script executed:

#!/bin/bash
# Search across all API server source for the complete enum
find components/api-server/src -name "*.rs" | xargs grep -B 5 -A 15 "enum StreamOutputStorage" 2>/dev/null

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Look for config module or types
ls -la components/api-server/src/

Length of output: 343


🏁 Script executed:

#!/bin/bash
# Check if config is in a separate file
find components/api-server -name "*.rs" | xargs grep "StreamOutputStorage::" 2>/dev/null

Length of output: 129


🏁 Script executed:

#!/bin/bash
# Broad search for where stream output is configured
rg -n "stream_output" -C 10

Length of output: 80126


🏁 Script executed:

#!/bin/bash
# Final comprehensive search across entire repo
find . -type f -name "*.rs" | xargs grep -l "StreamOutputStorage" 2>/dev/null

Length of output: 98


🏁 Script executed:

#!/bin/bash
# Look for the exact file and context
cat components/api-server/src/client.rs | grep -n "StreamOutputStorage" -C 15 || true

Length of output: 2271


🏁 Script executed:

#!/bin/bash
# Try finding where this enum might be defined (lib.rs, mod.rs, or other)
find components/api-server -name "*.rs" -exec grep -H "enum.*Storage" {} \; | grep -i stream || true

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Get a list of all .rs files under components/api-server/src
find components/api-server/src -type f -name "*.rs" -exec echo "File: {}" \; -exec wc -l {} \;

Length of output: 332


🏁 Script executed:

#!/bin/bash
# Search for the full module structure
find components/api-server/src -type f -name "*.rs" | sort

Length of output: 147


🏁 Script executed:

#!/bin/bash
# Exhaustively check the relevant files
for file in $(find components/api-server/src -name "*.rs"); do
  echo "=== $file ==="
  grep -n "StreamOutputStorage\|stream_output\|todo!" "$file" | head -20 || echo "No matches"
done

Length of output: 424


🏁 Script executed:

#!/bin/bash
# Focus on the specific line range mentioned
if [ -f "components/api-server/src/client.rs" ]; then
  head -220 components/api-server/src/client.rs | tail -30
fi

Length of output: 1111


🏁 Script executed:

#!/bin/bash
# Look at actual file content around the suspect code
sed -n '1,250p' components/api-server/src/client.rs | tail -100

Length of output: 3893


🏁 Script executed:

#!/bin/bash
# Find any storage-related enum definitions
rg -n "enum.*Storage\|enum.*Output" --type rs

Length of output: 26


🏁 Script executed:

#!/bin/bash
# Look at lines 200-300 of client.rs
tail -n +200 components/api-server/src/client.rs | head -100

Length of output: 3825


🏁 Script executed:

#!/bin/bash
# Check if the todo!() is actually present
grep -n "todo!" components/api-server/src/client.rs

Length of output: 82


🏁 Script executed:

#!/bin/bash
# Verify file exists and basic structure
head -5 components/api-server/src/client.rs && echo "..." && tail -5 components/api-server/src/client.rs

Length of output: 211


🏁 Script executed:

#!/bin/bash
# Examine error.rs to see what ClientError variants exist
cat components/api-server/src/error.rs

Length of output: 1310


🏁 Script executed:

#!/bin/bash
# Check if there are any other places where S3 storage validation is done
grep -rn "S3" components/api-server/src/ -C 3

Length of output: 649


🏁 Script executed:

#!/bin/bash
# Look for similar error patterns in the codebase
grep -rn "Unsupported\|unsupported\|UnsupportedStorage\|NotSupported" components/api-server/src/

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check how other code handles S3 vs FS config decisions
grep -rn "StorageType::\|StreamOutputStorage::" components/api-server/src/ -A 2 -B 2

Length of output: 478


Replace todo!() with proper error handling for unsupported S3 stream output storage.

The StreamOutputStorage::S3 variant exists and is configurable (as evidenced by the enum definition and configuration documentation), but the code at client.rs:216-218 panics instead of returning a structured error when S3 is configured. This should fail gracefully by returning a ClientError. You'll need to add an appropriate error variant to the ClientError enum (e.g., UnsupportedStreamOutputStorage or similar) and return a stream that yields this error.

🤖 Prompt for AI Agents
In components/api-server/src/client.rs around lines 216-218, replace the
panic-inducing todo!() for S3 stream output with structured error handling: add
a new ClientError variant (e.g., UnsupportedStreamOutputStorage) to the
ClientError enum and implement any needed Display/From/serde traits used in the
crate; then change the match branch to return a stream that yields that error
(for example use futures::stream::once(async {
Err(ClientError::UnsupportedStreamOutputStorage) }) or the project's equivalent)
so callers receive a ClientError instead of panicking; also update any error
conversions/tests that assume the new variant where necessary.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
components/api-server/src/client.rs (3)

3-3: Import try_stream! instead of stream! for Result-yielding streams.

The import of async_stream::stream should be async_stream::try_stream, as the file-based stream at line 222 uses ? and yields Result items. Per the async-stream documentation, try_stream! is designed for fallible streams with Item = Result<T, E> and supports the ? operator, while stream! does not.

Apply this diff:

-use async_stream::stream;
+use async_stream::try_stream;

216-218: Replace todo!() with proper error handling for S3 storage.

The todo!() macro will cause a panic at runtime if StreamOutputStorage::S3 is configured—this is a production blocker. This issue was previously flagged but remains unaddressed.

You must add a new error variant to ClientError (e.g., UnsupportedOperation(String)) in components/api-server/src/error.rs and return it here.

First, add to components/api-server/src/error.rs:

 pub enum ClientError {
     // ... existing variants ...
     #[error("Malformed data")]
     MalformedData,
+    
+    #[error("{0}")]
+    UnsupportedOperation(String),
 }

Then in this file:

         let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
-            todo!("Outputting query results to S3 is not supported for now.");
+            panic!("S3-based query result streaming is not yet supported");
         };

Note: Using panic! with a clear message is acceptable here given the # Panics section in the docstring at line 209. Alternatively, restructure to return Result and use the new error variant.


222-234: Use try_stream! macro and properly handle decode errors.

Two critical issues in the stream implementation:

  1. Using stream! with ? and Result items: The body uses ? on async/IO operations and yields Ok(message), but the macro is stream!. Per async-stream documentation, try_stream! is required for fallible streams with Item = Result<T, E> that use the ? operator. With stream!, errors will not propagate correctly.

  2. Decode errors silently truncate results: The while let Ok(event) = Deserialize::deserialize(&mut deserializer) loop breaks on any rmp_serde::decode::Error, including genuinely malformed data, without signalling an error. This can silently truncate results.

Both issues were previously flagged but remain unaddressed.

Apply this diff:

-        let stream = stream! {
+        let stream = try_stream! {
             let mut entries = tokio::fs::read_dir(search_job_output_dir).await?;
             while let Some(entry) = entries.next_entry().await? {
                 let search_result_path = entry.path();
                 let reader = std::fs::File::open(search_result_path)?;
                 let mut deserializer = rmp_serde::Deserializer::new(reader);
-                while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
-                    let event: (i64, String, String, String, i64) = event;
-                    let message = event.1;
-                    yield Ok(message);
+                loop {
+                    match Deserialize::deserialize(&mut deserializer) {
+                        Ok(event) => {
+                            let event: (i64, String, String, String, i64) = event;
+                            let message = event.1;
+                            yield message;
+                        }
+                        Err(err) if err.is_eof() => break,
+                        Err(err) => Err(err)?,
+                    }
                 }
             }
         };

Note: With try_stream!, you yield the unwrapped value directly (not Ok(message)), and errors are propagated via ?.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a59532d and 0ae4bf6.

📒 Files selected for processing (1)
  • components/api-server/src/client.rs (4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.

Applied to files:

  • components/api-server/src/client.rs
🪛 GitHub Actions: clp-rust-checks
components/api-server/src/client.rs

[error] 238-238: Rust Clippy: doc-markdown error. item in documentation is missing backticks for MongoDB in the doc comment.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: package-image
🔇 Additional comments (3)
components/api-server/src/client.rs (3)

180-189: LGTM: Clean routing logic.

The routing logic is clear and well-structured. Using early return for the file-based path keeps indentation manageable, and the separation into fetch_results_from_file and fetch_results_from_mongo improves readability.


310-319: LGTM: Clean helper method.

The get_job_config helper is straightforward and properly encapsulates the database query and MessagePack deserialization logic.


322-360: LGTM: Well-designed stream abstraction.

The SearchResultStream enum using pin_project! is a clean solution that addresses all previous concerns:

  • Removes the need for unsafe projection
  • Eliminates Mutex overhead
  • Provides clear variant names (File and Mongo)
  • Includes comprehensive documentation
  • Properly delegates poll_next to inner streams with error logging

This is a solid implementation.

Comment thread components/api-server/src/client.rs Outdated
@hoophalab

Copy link
Copy Markdown
Contributor Author

Nit: directory -> stream_storage_dir?
directory seems too general.

"directory" is the field name in the yaml file. It might not be ideal to rename it.

LinZhihao-723
LinZhihao-723 previously approved these changes Nov 25, 2025

@LinZhihao-723 LinZhihao-723 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the PR title, how about:

feat(api-server): Add support for streaming query results from files.

@hoophalab hoophalab changed the title feat(api): Fetch query results from the file system. feat(api-server): Add support for streaming query results from files. Nov 25, 2025
@hoophalab hoophalab changed the title feat(api-server): Add support for streaming query results from files. feat(api): Add support for streaming query results from files. Nov 25, 2025
@hoophalab hoophalab changed the title feat(api): Add support for streaming query results from files. feat(api-server): Add support for streaming query results from files. Nov 25, 2025

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
components/api-server/src/client.rs (3)

244-246: [DUPLICATE] Critical: Replace todo!() with proper error handling for S3.

This issue has been extensively flagged in previous reviews. The todo!() will panic at runtime if StreamOutputStorage::S3 is configured. You must add an appropriate ClientError variant (e.g., UnsupportedOperation or UnsupportedStreamOutputStorage) and return that error instead of panicking.


250-263: [DUPLICATE] Critical: Use try_stream! for fallible streams with ? operator.

This issue has been extensively flagged in previous reviews. The stream! macro does not support the ? operator for error propagation in fallible streams. You must change to try_stream! to properly propagate IO errors as Err(ClientError) stream items.

Additionally, the deserialization loop at lines 256-260 silently ends the stream on any decode error, which can truncate results without signaling corruption.

As noted in previous reviews, apply this fix:

-use async_stream::stream;
+use async_stream::try_stream;

And at line 250:

-        let stream = stream! {
+        let stream = try_stream! {
             let mut entries = tokio::fs::read_dir(search_job_output_dir).await?;
             while let Some(entry) = entries.next_entry().await? {
                 let search_result_path = entry.path();
                 let reader = std::fs::File::open(search_result_path)?;
                 let mut deserializer = rmp_serde::Deserializer::new(reader);
-                while let Ok(event_tuple) = Deserialize::deserialize(&mut deserializer) {
+                loop {
+                    match Deserialize::deserialize(&mut deserializer) {
+                        Ok(event_tuple) => {
-                    let event_tuple: EventTuple = event_tuple;
-                    let event: Event = event_tuple.into();
-                    yield Ok(event.message);
+                            let event_tuple: EventTuple = event_tuple;
+                            let event: Event = event_tuple.into();
+                            yield event.message;
+                        }
+                        Err(err) if err.is_eof() => break,
+                        Err(err) => yield Err(err.into()),
+                    }
                 }
             }
         };

266-266: [DUPLICATE] Minor: Add backticks around MongoDB to fix clippy doc-markdown error.

This has been flagged in previous reviews and is causing a pipeline failure.

-    /// Asynchronously fetches results of a completed search job from MongoDB.
+    /// Asynchronously fetches results of a completed search job from `MongoDB`.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0ae4bf6 and 4025869.

📒 Files selected for processing (1)
  • components/api-server/src/client.rs (4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.

Applied to files:

  • components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.

Applied to files:

  • components/api-server/src/client.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: package-image
  • GitHub Check: lint-check (macos-15)
  • GitHub Check: rust-checks
  • GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (5)
components/api-server/src/client.rs (5)

19-36: LGTM: QueryConfig enhanced with validation and write_to_file field.

The addition of #[serde(deny_unknown_fields)] provides better validation by catching typos or deprecated fields in query configs. The write_to_file field properly integrates with the dual-backend streaming mechanism.


129-182: LGTM: fetch_results properly routes to File or Mongo backends.

The refactored method signature and routing logic cleanly handle the dual-backend streaming. The early return pattern for the File variant improves readability and addresses previous feedback about nested if-else returns.


315-353: LGTM: SearchResultStream enum properly abstracts the dual backends.

The pin_project! macro provides safe pinning projection, addressing previous concerns about unsafe code. The enum design cleanly unifies File and Mongo streams while maintaining type safety and the Send requirement for SSE. Error logging in the Stream implementation provides good observability.


355-377: LGTM: EventTuple and Event types improve readability.

The type alias and struct with From implementation provide a good balance between the compact tuple serialization format used by clp-s and code readability. The named fields in Event make it clear what each tuple position represents, addressing previous concerns about positional access.


247-249: Code path construction is correct; no defects found.

The directory field is a String sourced from the YAML config, with documented example showing relative path format ("var/data/streams"). The Rust Path::join() method correctly handles this: Path::new("/").join("var/data/streams") produces /var/data/streams. The pattern is sound and will work correctly with container volume mounts when the config directory value matches the mounted path.

Comment on lines +268 to +289
/// # Returns
///
/// A stream of the job's results on success. Each item in the stream is a [`Result`] that:
///
/// ## Returns
///
/// A parsed JSON value representing a search result on success.
///
/// ## Errors
///
/// Returns an error if:
///
/// * [`ClientError::MalformedData`] if a retrieved document does not contain a "message" field,
/// or if the "message" field is not a BSON string.
/// * Forwards [`mongodb::error::Error`] produced by the `MongoDB` cursor item access.
/// * Forwards [`serde_json::from_str`]'s return values on failure.
///
/// # Errors
///
/// Returns an error if:
///
/// * Forwards [`mongodb::Collection::find`]'s return values on failure.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix duplicate "# Errors" sections in docstring.

The docstring has two "# Errors" sections (lines 276 and 285), which is incorrect rustdoc formatting. The second one at line 285 should be removed since the error conditions are already documented in the first "## Errors" section under "Returns".

Apply this diff:

     /// ## Errors
     ///
     /// Returns an error if:
     ///
     /// * [`ClientError::MalformedData`] if a retrieved document does not contain a "message" field,
     ///   or if the "message" field is not a BSON string.
     /// * Forwards [`mongodb::error::Error`] produced by the `MongoDB` cursor item access.
     /// * Forwards [`serde_json::from_str`]'s return values on failure.
-    ///
-    /// # Errors
-    ///
-    /// Returns an error if:
-    ///
-    /// * Forwards [`mongodb::Collection::find`]'s return values on failure.
     async fn fetch_results_from_mongo(

Note: Consolidate all error conditions under a single "## Errors" section (nested under "# Returns") if they apply to the stream items, or use "# Errors" at the top level for errors returned by the async function itself.

🤖 Prompt for AI Agents
In components/api-server/src/client.rs around lines 268 to 289, remove the
duplicate top-level "# Errors" block (the second one starting at ~line 285) and
consolidate its error bullet (forwards mongodb::Collection::find's return
values) into the existing "## Errors" subsection under "# Returns" if that error
applies to stream items; otherwise move that single error to a single top-level
"# Errors" section. Ensure only one Errors section exists and that all error
conditions are documented in the appropriate location.

@hoophalab hoophalab merged commit ca3947c into y-scope:main Nov 25, 2025
21 checks passed
@hoophalab hoophalab deleted the fsjob branch December 2, 2025 18:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants