feat(api-server): Add support for streaming query results from files.#1636
Conversation
WalkthroughAdds stream-result delivery: new async/stream dependencies, API client returns a pinned enum stream selecting filesystem or Mongo backends based on job config, new stream-related config types and defaults, an rmp decode IsMalformedData impl, and Docker volume mounts for stream directories. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant APIClient as Client
participant DB as JobConfig_DB
participant FS as FileSystem
participant Mongo as MongoDB
Caller->>APIClient: fetch_results(search_job_id)
APIClient->>DB: get_job_config(search_job_id)
DB-->>APIClient: SearchJobConfig
alt write_to_file == true
APIClient->>FS: open/read stream files
FS-->>APIClient: stream of serialized records
APIClient->>APIClient: deserialize/map -> Result<String, ClientError>
APIClient-->>Caller: SearchResultStream::File (stream)
else
APIClient->>Mongo: open cursor / stream docs
Mongo-->>APIClient: cursor stream of docs
APIClient->>APIClient: map docs -> Result<String, ClientError>
APIClient-->>Caller: SearchResultStream::Mongo (stream)
end
Note right of APIClient `#DDEBF7`: SearchResultStream is a pinned enum delegating poll_next and logging errors
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: ASSERTIVE Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used🧠 Learnings (2)📚 Learning: 2025-10-22T21:01:31.391ZApplied to files:
📚 Learning: 2024-10-08T15:52:50.753ZApplied to files:
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
🔇 Additional comments (3)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
components/api-server/src/error.rs (1)
25-37: Remove blanketIsMalformedDataimpl forrmp_serde::decode::Error— IO failures are being misclassifiedThe blanket implementation treats all
rmp_serde::decode::Errorvariants as malformed data, but the error enum explicitly distinguishes IO errors (InvalidMarkerRead, InvalidDataRead) from format/semantic problems (TypeMismatch, Syntax, Utf8Error, LengthMismatch, OutOfRange, Uncategorized, DepthLimitExceeded). Idiomatic Rust practice is to handle these separately—IO failures should not surface asClientError::MalformedData, as this masks real transport/connection problems.Either:
- Remove the blanket impl and match on error variants to classify only format/semantic errors as malformed data, or
- Use
from_slice(which only produces deserialization/format errors) instead offrom_readto guarantee malformed-data classification.components/api-server/src/client.rs (1)
152-195: Fixstream!→try_stream!and replacetodo!()with structured errorTwo critical issues block compilation and safe production deployment:
stream!macro does not support?operatorThe
?operator onread_dir,next_entry, andFile::openinside thestream!block cannot work as intended. Thestream!macro produces a generator that yields items; the?operator would attempt to propagate errors out of the closure, not into the stream. This fails to compile. Usetry_stream!instead, which supports?and yields bareStringvalues while automatically converting errors through existingFromimpls.Suggested change:
- use async_stream::stream;
- use async_stream::try_stream;
…
let stream = stream! {
let stream = try_stream! { let dir_path = dir_path; let mut entries = tokio::fs::read_dir(dir_path).await?; while let Some(entry) = entries.next_entry().await? { let file_path = entry.path(); let reader = std::fs::File::open(file_path)?; let mut deserializer = rmp_serde::Deserializer::new(reader); while let Ok(e) = Deserialize::deserialize(&mut deserializer) { let e: (i64, String, String, String, i64) = e;
yield Ok(e.1);
yield e.1; } } };
todo!()creates a production panic pathIf the api-server is configured with S3 storage and
write_to_file=true, thetodo!()at line 176 will crash the process. Replace it with a structured error using the existingClientError::Iovariant:
let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {todo!("Outputting query results to S3 is not supported for now.");};
let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {return Err(ClientError::Io(std::io::Error::new(std::io::ErrorKind::Other,"S3 stream output is not yet supported; please configure fs storage",)));};components/clp-rust-utils/src/clp_config/package/config.rs (1)
18-31: Fix incomplete documentation comment for StreamOutput structThe doc comment at lines 153–158 in
components/clp-rust-utils/src/clp_config/package/config.rscontains a sentence fragment. Line 156 reads/// deserialization.without context, breaking the documentation clarity.Verification confirms the default values are correctly synchronized with Python: both use
"var/data/streams"for the Fs storage variant, and docker-compose mounts align with this path. However, the doc comment should be completed—likely intended to say something like "* This type is partially defined: unused fields are omitted and discarded through deserialization."
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
components/api-server/Cargo.toml(2 hunks)components/api-server/src/client.rs(3 hunks)components/api-server/src/error.rs(1 hunks)components/clp-rust-utils/src/clp_config/package/config.rs(3 hunks)tools/deployment/package/docker-compose-all.yaml(1 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-10-22T21:02:31.113Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:02:31.113Z
Learning: Repository y-scope/clp: Maintain deterministic CI/builds for Rust; add a check to verify Cargo.lock is in sync with Cargo.toml without updating dependencies (non-mutating verification in clp-rust-checks workflow).
Applied to files:
components/api-server/Cargo.toml
📚 Learning: 2025-11-03T16:17:40.223Z
Learnt from: hoophalab
Repo: y-scope/clp PR: 1535
File: components/clp-rust-utils/src/clp_config/package/config.rs:47-61
Timestamp: 2025-11-03T16:17:40.223Z
Learning: In the y-scope/clp repository, the `ApiServer` struct in `components/clp-rust-utils/src/clp_config/package/config.rs` is a Rust-native configuration type and does not mirror any Python code, unlike other structs in the same file (Config, Database, ResultsCache, Package) which are mirrors of Python definitions.
Applied to files:
components/clp-rust-utils/src/clp_config/package/config.rs
📚 Learning: 2025-11-10T05:19:56.600Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 1575
File: components/clp-py-utils/clp_py_utils/clp_config.py:602-607
Timestamp: 2025-11-10T05:19:56.600Z
Learning: In the y-scope/clp repository, the `ApiServer` class in `components/clp-py-utils/clp_py_utils/clp_config.py` does not need a `transform_for_container()` method because no other containerized service depends on the API server - it's only accessed from the host, so no docker-network communication is expected.
Applied to files:
tools/deployment/package/docker-compose-all.yaml
🧬 Code graph analysis (2)
components/clp-rust-utils/src/clp_config/package/config.rs (1)
components/clp-py-utils/clp_py_utils/clp_config.py (1)
StreamOutput(553-561)
components/api-server/src/client.rs (1)
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
SearchJobConfig(85-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: rust-checks
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (4)
tools/deployment/package/docker-compose-all.yaml (1)
507-508: Streaming volume wiring for api-server looks consistentMounting
CLP_STAGED_STREAM_OUTPUT_DIR_HOSTandCLP_STREAM_OUTPUT_DIR_HOSTinto the api-server matches how other services (query-worker / webui / garbage-collector) see the same paths, so the filesystem-based result streaming should have access to the same directories. Just ensure the host dirs exist with the right ownership in packaged envs.components/api-server/src/client.rs (2)
18-49: write_to_file is correctly plumbed into SearchJobConfigThe addition of
write_to_filetoQueryConfigand theFrom<QueryConfig> for SearchJobConfigmapping looks clean and keeps the Rust job config aligned with the PythonSearchJobConfigfield. This should make the DB-storedjob_configsufficient to decide between Mongo- and FS-backed result streaming.
241-250: Original review comment is incorrect; the schema concern is invertedThe review suggests Python changes must remain compatible with Rust's deserialization, but verification shows the opposite is true. Python's
SearchJobConfigcontains fewer fields (6) than Rust's struct (11), not more. When Python serializes to MsgPack, Rust deserializes safely via the#[serde(default)]attribute, which fills missing fields (likewrite_to_file,ignore_case,path_filter) with defaults. Theget_job_configcall is read-only and does not write back to the database, so no round-trip risk exists. The design already handles Python having a subset of Rust fields without issue.Likely an incorrect or invalid review comment.
components/api-server/Cargo.toml (1)
16-30: I'll verify the crate versions and their compatibility to ensure there are no known issues:Cargo.lock is already committed; dependencies are compatible.
The async-stream 0.3.6 minimum supported Rust version is 1.65, and Tokio's current MSRV is 1.71. Since tokio-stream 0.1.17 is part of the Tokio ecosystem, it follows the same MSRV policy. The dependencies are compatible with no known issues.
The git status output confirms that Cargo.lock is already committed and in sync with Cargo.toml (working tree clean), so both concerns from the review have been addressed.
| todo!("Outputting query results to S3 is not supported for now."); | ||
| }; | ||
| let mongodb::bson::Bson::String(message) = msg else { | ||
| return Err(ClientError::MalformedData); | ||
| let dir_path = std::path::Path::new("/") | ||
| .join(directory) | ||
| .join(search_job_id.to_string()); | ||
| let stream = stream! { | ||
| let dir_path = dir_path; | ||
| let mut entries = tokio::fs::read_dir(dir_path).await?; | ||
| while let Some(entry) = entries.next_entry().await? { | ||
| let file_path = entry.path(); | ||
| let reader = std::fs::File::open(file_path)?; | ||
| let mut deserializer = rmp_serde::Deserializer::new(reader); | ||
| while let Ok(e) = Deserialize::deserialize(&mut deserializer) { | ||
| let e: (i64, String, String, String, i64) = e; | ||
| yield Ok(e.1); | ||
| } | ||
| } | ||
| }; |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider non-blocking I/O and deterministic ordering for FS results
Within the FS-backed stream:
std::fs::File::openand thermp_serde::Deserializerover a blocking reader run on the Tokio worker thread. For large result files, this can block the reactor. Long term, consider usingtokio::fs::Fileplus an async adapter if performance under load becomes an issue.tokio::fs::read_dirdoes not guarantee directory entry order. If result files are named to encode ordering (e.g., numeric suffixes), you may want to collect and sort the paths before reading to keep the stream order deterministic across platforms.
Neither is a correctness bug for small results, but both may affect behaviour at scale.
🤖 Prompt for AI Agents
In components/api-server/src/client.rs around lines 176-193, the FS-backed
stream uses blocking std::fs::File::open and an rmp_serde Deserializer on the
Tokio worker thread and iterates tokio::fs::read_dir without a defined order;
change it to use non-blocking IO and deterministic ordering by collecting all
directory entries into a Vec<PathBuf> (await all entries from
tokio::fs::read_dir), sort that Vec by filename (or numeric suffix) to enforce
order, then for each path open the file asynchronously with
tokio::fs::File::open and convert it to an async reader (e.g.,
tokio_util::io::ReaderStream or tokio::io::AsyncReadExt adapters) or, if
rmp_serde requires a blocking reader, offload decoding to
tokio::task::spawn_blocking so deserialization does not block the reactor;
iterate the sorted paths and yield results from the async-safe reader/decoding
path.
There was a problem hiding this comment.
rmp_serde::Deserializer::new requires a blocking reader. We yield on each message. This shouldn't block the thread too long.
There was a problem hiding this comment.
@hoophalab, understood—if rmp_serde requires a blocking reader and you're yielding between messages, that's a reasonable tradeoff. As long as individual messages stay small, the reactor should get control back frequently enough. Thanks for clarifying!
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/api-server/src/client.rs (1)
127-151: Update doc comment to reflect dual streaming paths.The doc comment still mentions "parsed JSON value" but the function now returns
Stringmessages from either filesystem or MongoDB. Additionally, filesystem-specific errors (I/O errors, deserialization errors) are not documented.Apply this diff to update the documentation:
/// Asynchronously fetches the results of a completed search job. /// /// # Returns /// - /// A stream of the job's results on success. Each item in the stream is a [`Result`] that: + /// A stream of query result messages. Each item in the stream is a [`Result`] that: /// /// ## Returns /// - /// A parsed JSON value representing a search result on success. + /// A String message representing a search result on success. /// /// ## Errors /// /// Returns an error if: /// - /// * [`ClientError::MalformedData`] if a retrieved document does not contain a "message" field, - /// or if the "message" field is not a BSON string. - /// * Forwards [`mongodb::error::Error`] produced by the `MongoDB` cursor item access. - /// * Forwards [`serde_json::from_str`]'s return values on failure. + /// * For filesystem-backed results: I/O errors when reading result files or deserialization + /// errors when decoding MessagePack records. + /// * For MongoDB-backed results: [`ClientError::MalformedData`] if a document is missing + /// the "message" field or it's not a BSON string, or [`mongodb::error::Error`] from cursor access. /// /// # Errors /// /// Returns an error if: /// /// * Forwards [`Client::get_status`]'s return values on failure. - /// * Forwards [`mongodb::Collection::find`]'s return values on failure. + /// * Forwards [`Client::get_job_config`]'s return values on failure.
♻️ Duplicate comments (1)
components/api-server/src/client.rs (1)
183-192: File ordering is non-deterministic.
tokio::fs::read_dirdoes not guarantee directory entry order. If query results are split across multiple files and order matters for result correctness or user experience, results may be returned in unpredictable order across different platforms or filesystems.This was flagged in past review comments. If ordering matters, apply this approach:
let stream = stream! { let dir_path = dir_path; - let mut entries = tokio::fs::read_dir(dir_path).await?; - while let Some(entry) = entries.next_entry().await? { - let file_path = entry.path(); + let mut entries = tokio::fs::read_dir(&dir_path).await?; + let mut paths = Vec::new(); + while let Some(entry) = entries.next_entry().await? { + paths.push(entry.path()); + } + // Sort by filename to ensure deterministic order + paths.sort(); + + for file_path in paths { let reader = std::fs::File::open(file_path)?; let mut deserializer = rmp_serde::Deserializer::new(reader); while let Ok(e) = Deserialize::deserialize(&mut deserializer) { let e: (i64, String, String, String, i64) = e; yield Ok(e.1); } } };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/api-server/src/client.rs(3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/api-server/src/client.rs
🧬 Code graph analysis (1)
components/api-server/src/client.rs (1)
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
SearchJobConfig(85-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks
🔇 Additional comments (2)
components/api-server/src/client.rs (2)
241-250: Erreur conversion is correctly implemented—no issues found.The verification confirms that
rmp_serde::decode::Errorproperly converts toClientErrorthrough the blanketimpl<T: IsMalformedData> From<T> for ClientErrorin error.rs (lines 31–33). Sincermp_serde::decode::ErrorimplementsIsMalformedData(line 29), the?operator inget_job_configat line 249 will correctly propagate deserialization errors asClientError::MalformedData.
178-180: Verify path construction with absolute directory values.The path construction
Path::new("/").join(directory).join(search_job_id.to_string())assumesdirectoryis relative. Ifdirectoryis absolute (starts with/),Path::joinwill replace the base path rather than appending. The default config uses"var/data/streams"(relative), which produces/var/data/streams/{job_id}as expected. However, if users configure an absolute path like/custom/streams, the leading/will be ignored and replaced.Run the following script to verify how directory paths are configured and whether absolute paths are supported:
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
components/api-server/src/client.rs (3)
127-151: Update doc comment to reflect actual return type.As noted in previous review comments, the doc comment states the stream returns "parsed JSON values", but both streaming paths (filesystem and MongoDB) actually return
Stringmessages.Update the doc comment to accurately describe both streaming paths:
/// Asynchronously fetches the results of a completed search job. /// /// # Returns /// -/// A stream of the job's results on success. Each item in the stream is a [`Result`] that: +/// A stream of String messages representing the job's results on success. +/// Results are streamed from either the filesystem (when `write_to_file` is true) +/// or MongoDB (when `write_to_file` is false). Each item in the stream is a [`Result`] that: /// /// ## Returns /// -/// A parsed JSON value representing a search result on success. +/// A String message representing a search result on success.
188-191: Hardcoded tuple deserialization remains unaddressed.This concern was previously raised but not yet addressed. The hardcoded tuple
(i64, String, String, String, i64)is fragile and not self-documenting. If the serialisation format changes, this code will fail silently or produce incorrect results.As suggested in the previous review, define a named struct:
#[derive(Deserialize)] struct QueryResultRecord { timestamp: i64, message: String, file_path: String, orig_file_id: String, line_num: i64, }Then update the deserialization:
- while let Ok(e) = Deserialize::deserialize(&mut deserializer) { - let e: (i64, String, String, String, i64) = e; - yield Ok(e.1); + while let Ok(record) = QueryResultRecord::deserialize(&mut deserializer) { + yield Ok(record.message);
269-271: Use safe projection instead ofmap_unchecked_mut.This was previously flagged with a safe alternative, but the
unsafecode remains unaddressed. While the safety reasoning is sound, eliminatingunsafeimproves code maintainability and auditability.Apply the safe alternative suggested in the previous review:
fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll<Option<Self::Item>> { - // SAFETY: The async runtime guarantees that `poll_next` is called exclusively. We only poll - // `self.0` and never move it out. - let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) }; - - let poll = inner.poll_next(cx); + let poll = self.0.as_mut().poll_next(cx); if let std::task::Poll::Ready(Some(Err(err))) = &poll { tracing::error!("An error occurred when streaming results: {}", err); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/api-server/src/client.rs(3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/api-server/src/client.rs
🧬 Code graph analysis (1)
components/api-server/src/client.rs (1)
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
SearchJobConfig(85-104)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks
🔇 Additional comments (5)
components/api-server/src/client.rs (5)
1-16: Imports look correct.The new imports for
Pin,async_stream, andStreamOutputStorageare appropriate for the filesystem-based streaming functionality.
195-214: MongoDB streaming path implementation is correct.The error handling for missing or malformed
"message"fields is appropriate, and the use ofResultsStream::newis consistent with the filesystem path.
241-251:get_job_configimplementation looks correct.The SQL query is properly parameterised, and the MessagePack deserialisation is appropriate for retrieving the job configuration.
181-193: Les conversions d'erreur pourClientErrorsont correctement implémentées.Vérification complète :
std::io::Errorest convertie via l'attribut#[from]sur la varianteIo(ligne 15 de error.rs)rmp_serde::decode::Errorest convertie via l'implémentation génériqueimpl<T: IsMalformedData> From<T> for ClientError(lignes 30-33), qui mappe versClientError::MalformedDataLes opérations
tokio::fs::read_dir(),entries.next_entry()etstd::fs::File::open()propagent toutesstd::io::Error, qui est correctement gérée. La désérialisation utilisewhile let Ok(e)plutôt que l'opérateur?, donc les erreurs de décodage sont implicitement gérées par l'implémentation générique.
178-180: Path construction relies on undocumented assumption about directory being relative.The code joins
directorywith"/"to create an absolute path, but the Rust config loading (unlike Python) does not normalize paths during deserialization. This means:
- Configuration defaults show relative paths:
"var/data/streams"- The code assumes
directoryremains relative and forces it absolute by prepending"/"- If the configuration is later changed or normalized elsewhere, this logic breaks
Verify that:
directorywill always be a relative path when passed to this code- This is the intended way to construct the path (not use it as-is if already absolute)
- Consider aligning with Python's path normalization approach during config loading instead of at usage time
| let job_config = self.get_job_config(search_job_id).await?; | ||
| if job_config.write_to_file { | ||
| let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else { | ||
| todo!("Outputting query results to S3 is not supported for now."); |
There was a problem hiding this comment.
🧩 Analysis chain
todo!() macro will panic if S3 storage is configured.
This is a production blocker. If StreamOutputStorage::S3 is configured, the code will panic at runtime rather than returning a proper error.
Replace the todo!() with a proper error return:
- let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
- todo!("Outputting query results to S3 is not supported for now.");
+ let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else {
+ return Err(ClientError::UnsupportedOperation(
+ "S3-based query result streaming is not yet supported".to_string()
+ ));
};Note: This assumes ClientError has an UnsupportedOperation variant or similar. If not, use an appropriate existing variant or add one.
Run this script to check if ClientError has an appropriate variant for unsupported operations:
🏁 Script executed:
#!/bin/bash
# Check ClientError enum variants
ast-grep --pattern 'pub enum ClientError {
$$$
}'Length of output: 965
I need to inspect the full code context around the todo!() to verify all concerns and determine the appropriate error handling approach.
🏁 Script executed:
#!/bin/bash
# Inspect lines 170-200 of client.rs to see full context
sed -n '170,200p' components/api-server/src/client.rs | cat -nLength of output: 1670
🏁 Script executed:
#!/bin/bash
# Check if ClientError is used in result returns in client.rs
rg -n 'Result.*ClientError' components/api-server/src/client.rs -A2 -B2 | head -40Length of output: 2008
🏁 Script executed:
#!/bin/bash
# Search for other error handling patterns in the file
rg -n 'Err\(' components/api-server/src/client.rs | head -20Length of output: 263
🏁 Script executed:
#!/bin/bash
# Check if there's a pattern of adding new error variants when needed
git log --oneline components/api-server/src/error.rs | head -10Length of output: 129
🏁 Script executed:
#!/bin/bash
# Verify if error.rs is the canonical location for error types
fd error.rs --type fLength of output: 73
🏁 Script executed:
#!/bin/bash
# Quick sanity check: does UnsupportedOperation exist anywhere in codebase?
rg 'UnsupportedOperation' --type rustLength of output: 0
🏁 Script executed:
#!/bin/bash
# Check if there's a generic error construction pattern (String-based, etc.)
rg 'ClientError::' components/api-server/src/ -n | grep -v test | head -15Length of output: 444
Add error handling for unsupported S3 streaming and define appropriate ClientError variant.
The todo!() macro will panic at runtime if StreamOutputStorage::S3 is configured—this is a production blocker. The original suggestion references a non-existent ClientError::UnsupportedOperation variant. You must either add this variant to the error enum, or use an alternative approach.
Recommended fix: Add a new variant to ClientError in components/api-server/src/error.rs:
pub enum ClientError {
#[error("`sqlx::Error`: {0}")]
Sql(#[from] sqlx::Error),
#[error("`mongodb::error::Error`: {0}")]
Mongo(#[from] mongodb::error::Error),
#[error("Query is not succeeded")]
QueryNotSucceeded,
#[error("`std::io::Error`: {0}")]
Io(#[from] std::io::Error),
#[error("Malformed data")]
MalformedData,
+
+ #[error("{0}")]
+ UnsupportedOperation(String),
}Then in components/api-server/src/client.rs (line 176), replace:
- todo!("Outputting query results to S3 is not supported for now.");
+ return Err(ClientError::UnsupportedOperation(
+ "S3-based query result streaming is not yet supported".to_string()
+ ));Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In components/api-server/src/client.rs around line 176 the code uses todo!()
when StreamOutputStorage::S3 is selected which will panic; add a new ClientError
variant (e.g., UnsupportedOutputStorage(String) or UnsupportedOperation { op:
String, detail: String }) to components/api-server/src/error.rs, implement
Display/From as needed, and then replace the todo!() at line 176 with an early
return Err(ClientError::UnsupportedOutputStorage("streaming query results to
S3".into())) (or the equivalent variant you added) so the unsupported path
returns a proper error instead of panicking.
| while let Ok(event) = Deserialize::deserialize(&mut deserializer) { | ||
| let event: (i64, String, String, String, i64) = event; | ||
| let message = event.1; | ||
| yield Ok(message); | ||
| } |
There was a problem hiding this comment.
Tuple is indeed hard to read, but clp-s serializes the message in a tuple to save the bandwidth of transferring key names. The format is inherited from NetworkAddress output's format. We probably cannot do more here unless there is an automatic way to serialize a tuple into a struct.
I did something here. Does this read better?
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| if job_config.write_to_file { | ||
| Ok(ResultsStream::File { | ||
| inner: self.fetch_results_from_file(search_job_id), | ||
| }) | ||
| } else { | ||
| self.fetch_results_from_mongo(search_job_id) | ||
| .await | ||
| .map(|s| ResultsStream::Mongo { inner: s }) | ||
| } |
There was a problem hiding this comment.
How about this? Or we should still put everything in one function.
There was a problem hiding this comment.
Yeah I think this is clear. I added docstrings for both methods. Splitting them makes it easier to document tbh
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/api-server/src/client.rs (1)
34-47: write_to_file wiring looks consistent; confirm job-config compatibility.The new
write_to_filefield onQueryConfigand its propagation intoSearchJobConfigviaFrom<QueryConfig>look coherent and default-safe for new requests (serde default →false). Please double-check thatSearchJobConfig’s deserialisation forjob_configrows is backward compatible (e.g.,#[serde(default)]on any new fields) so existing jobs in the DB can still be decoded without errors whenget_job_configis called.
♻️ Duplicate comments (2)
components/api-server/src/client.rs (2)
128-162: fetch_results return type and docs are out of sync with actual behaviour.The new return type
Result<SearchResultStream<impl Stream<Item = Result<String, ClientError>>, _>, ClientError>makes the API return a stream ofStringmessages (from Mongo or FS), but the doc comment still talks about “A parsed JSON value representing a search result on success” and mentionsserde_json::from_str, which is no longer used here. Please update the docblock to describe that this returns aSearchResultStreamof messageStrings and adjust the error bullets to match the current implementation.
192-216: Replacetodo!()for S3 with a proper error and consider FS streaming edge cases.
- The
todo!()whenStreamOutputStorage::S3is selected will still panic in production if that configuration is ever enabled. This should instead return aClientErrorvariant (e.g.,UnsupportedOperation/UnsupportedOutputStorage) so callers get a structured failure rather than a crash.- The FS streaming loop correctly deserialises the tuple and yields
message, buttokio::fs::read_dirdoes not guarantee ordering of entries. If file naming encodes ordering, you may want to collect and sort the paths before reading to keep result order deterministic across platforms.- The
while let Ok(event) = Deserialize::deserialize(&mut deserializer)pattern will treat any decode error as end-of-stream. If corruption of a result file is possible, consider distinguishing expected EOF from malformed data (e.g., via yourIsMalformedDatahelper) so truly malformed files surface asClientError::MalformedDatainstead of silently truncating the stream.The first point is a correctness / robustness blocker; the latter two are more about predictable behaviour and diagnosability.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/api-server/src/client.rs(4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.
Applied to files:
components/api-server/src/client.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: rust-checks
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (3)
components/api-server/src/client.rs (3)
180-189: Routing on job_config.write_to_file is clear and keeps fetch_results small.Using
get_job_configto decide between FS and Mongo backends and returning the appropriateSearchResultStreamvariant keepsfetch_resultsnicely focused on status polling and dispatch. This also avoids the earlier in-function branching complexity that was mentioned in prior review comments.
218-240: Mongo-backed streaming helper is clean and matches error semantics.
fetch_results_from_mongonicely encapsulates the Mongo path, and the mapping toResult<String, ClientError>withClientError::MalformedDatafor missing/non-string"message"fields aligns with the documented error semantics. This keeps the mainfetch_resultsbody simpler while preserving the existing behaviour.
266-275: Schema evolution is already protected—struct-level#[serde(default)]is in place.The
SearchJobConfigstruct (components/clp-rust-utils/src/job_config/search.rs) already has#[serde(default)]at the struct level, which provides the schema evolution safety you're requesting. When deserializing legacy rows:
- Missing fields automatically use
Default::default()for their type- The
write_to_file: boolfield will safely deserialize tofalseif absent from older serialized dataOption<_>fields will deserialize toNoneThis design already handles backward compatibility without requiring per-field
#[serde(default)]annotations or converting all fields toOption<_>. The deserialization inget_job_configwill not fail on legacy jobs.Likely an incorrect or invalid review comment.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
components/api-server/src/client.rs (2)
128-162: Updatefetch_resultsdoc-comment to reflect String-based, FS-or-Mongo streamingThe doc-comment still refers to “parsed JSON value” results and Mongo/
serde_json::from_strerrors, butfetch_resultsnow returns aSearchResultStream<…>yieldingResult<String, ClientError>from either Mongo or the filesystem. This mismatch will mislead API consumers.Please update the comment to describe:
- that the stream yields raw log/message
Strings, not parsed JSON; and- that results may come from Mongo or FS, with corresponding IO / decode errors rather than
serde_json.
196-201: Replacetodo!()for S3 storage with a proper error pathIf
stream_output.storageis ever configured asStreamOutputStorage::S3, thistodo!()will panic the API server at runtime, which is not acceptable for a configuration-driven feature.Given that this is an unsupported backend rather than a programmer error, it should return a
ClientErrorinstead of panicking. Becausefetch_results_from_filecurrently cannot return aResult, you likely want to change its signature and propagate the error fromfetch_results:- fn fetch_results_from_file( - &self, - search_job_id: u64, - ) -> impl Stream<Item = Result<String, ClientError>> + use<> { - let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else { - todo!("Outputting query results to S3 is not supported for now."); - }; + fn fetch_results_from_file( + &self, + search_job_id: u64, + ) -> Result<impl Stream<Item = Result<String, ClientError>> + use<>, ClientError> { + let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else { + // Prefer a dedicated variant like `UnsupportedOperation` or `UnsupportedOutputStorage`. + return Err(ClientError::UnsupportedOperation( + "S3-based query result streaming is not yet supported".to_string(), + )); + }; @@ - let stream = /* existing stream! / try_stream! body */; - stream + let stream = /* existing stream! / try_stream! body */; + Ok(stream) }and in
fetch_results:- if job_config.write_to_file { - Ok(SearchResultStream::File { - inner: self.fetch_results_from_file(search_job_id), - }) - } else { - self.fetch_results_from_mongo(search_job_id) - .await - .map(|s| SearchResultStream::Mongo { inner: s }) - } + if job_config.write_to_file { + Ok(SearchResultStream::File { + inner: self.fetch_results_from_file(search_job_id)?, + }) + } else { + self.fetch_results_from_mongo(search_job_id) + .await + .map(|s| SearchResultStream::Mongo { inner: s }) + }Adjust the concrete
ClientErrorvariant name to match your existing enum.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/api-server/src/client.rs(4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.
Applied to files:
components/api-server/src/client.rs
🧬 Code graph analysis (1)
components/api-server/src/client.rs (2)
components/job-orchestration/job_orchestration/scheduler/job_config.py (1)
SearchJobConfig(85-104)components/api-server/src/bin/api_server.rs (1)
query(120-127)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: rust-checks
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: lint-check (macos-15)
🔇 Additional comments (4)
components/api-server/src/client.rs (4)
21-47: Plumbing ofwrite_to_filefrom API toSearchJobConfiglooks correct
QueryConfig.write_to_fileis serialised from the HTTP request, defaulted via#[serde(default)], and passed throughFrom<QueryConfig> for SearchJobConfig. That matches the Python-sideSearchJobConfig.write_to_filefield, so the job-orchestration layer should see the intended flag.
180-189: Branching onwrite_to_fileafter job completion is a clean separationFetching the stored
SearchJobConfigand then routing to either FS-backed or Mongo-backed streaming based onwrite_to_filekeepsfetch_resultshigh-level and reuses the same polling logic for both backends. This matches the PR’s objective and the PythonSearchJobConfigsemantics.
278-309:SearchResultStreamenum +Streamimpl cleanly encapsulate FS/Mongo backendsThe
SearchResultStreamenum withpin_project!and a manualStreamimpl that delegatespoll_nextto the projected inner variant is a nice way to unify the FS and Mongo result streams without unsafe code or extra locking. Error logging inpoll_nextis also appropriate and keeps concerns local to this adapter type.
266-275: Verification complete—error handling is correct and consistent.All concerns have been confirmed as properly addressed:
SearchJobConfigderivesDeserialize(line 11 ofcomponents/clp-rust-utils/src/job_config/search.rs)ClientErrorimplements proper conversion via theIsMalformedDatatrait pattern:rmp_serde::decode::ErrorimplementsIsMalformedData, and a blanketFrom<T: IsMalformedData> for ClientErrormaps all such errors toClientError::MalformedData- The
?operator inget_job_configat line 273 correctly propagates decode failures as the intendedMalformedDatavariant- Approach mirrors the serialization method (
to_vec_namedfor storage,from_slicefor deserialization)The error handling is sound and follows the established pattern in the codebase.
| let stream = stream! { | ||
| let mut entries = tokio::fs::read_dir(search_job_output_dir).await?; | ||
| while let Some(entry) = entries.next_entry().await? { | ||
| let search_result_path = entry.path(); | ||
| let reader = std::fs::File::open(search_result_path)?; | ||
| let mut deserializer = rmp_serde::Deserializer::new(reader); | ||
| while let Ok(event) = Deserialize::deserialize(&mut deserializer) { | ||
| let event: (i64, String, String, String, i64) = event; | ||
| let message = event.1; | ||
| yield Ok(message); | ||
| } | ||
| } | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
Use try_stream! instead of stream! when using ? and emitting Result items, and surface decode errors
Two issues in the FS-backed stream:
-
async_stream::stream!+?withResultitemsThe body uses
?on async/IO operations and yieldsOk(message), i.e.Item = Result<String, ClientError>, but the macro isstream!. Per theasync-streamdocs,?is only supported intry_stream!and is specifically designed forItem = Result<T, E>. Withstream!, this will either fail to compile or not propagate errors as intended.Recommended change:
-use async_stream::stream;
+use async_stream::try_stream;
@@
-
let stream = stream! {
-
let stream = try_stream! { let mut entries = tokio::fs::read_dir(search_job_output_dir).await?; while let Some(entry) = entries.next_entry().await? { let search_result_path = entry.path(); let reader = std::fs::File::open(search_result_path)?; let mut deserializer = rmp_serde::Deserializer::new(reader);
-
while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
-
while let Ok(event) = Deserialize::deserialize(&mut deserializer) { let event: (i64, String, String, String, i64) = event; let message = event.1; yield Ok(message); } } };This keeps the public return type the same (`impl Stream<Item = Result<String, ClientError>>`) while ensuring IO errors become `Err(ClientError)` items rather than panics or compile errors.
-
Decode errors are silently treated as end-of-stream
while let Ok(event) = Deserialize::deserialize(&mut deserializer)will break the loop on anyrmp_serde::decode::Error, including genuinely malformed MessagePack, and simply stop yielding items. That can truncate results without signalling an error.Since you have a
ClientError::MalformedDataand anIsMalformedDataimplementation forrmp_serde::decode::Error, consider distinguishing EOF from other decode failures, e.g. (pseudocode):loop { match Deserialize::deserialize(&mut deserializer) { Ok(event) => { let event: (i64, String, String, String, i64) = event; let message = event.1; yield Ok(message); } Err(err) if err.is_eof() => break, // or matches EOF-style error Err(err) => break yield Err(err.into()), // surface malformed data } }
This way, truly corrupted files show up as
Err(ClientError::MalformedData)rather than silent truncation.
🌐 Web query:
Check the async-stream crate docs to confirm that `try_stream!` is the macro intended for fallible streams using the `?` operator, and that `stream!` does not support `?` for propagating errors.
💡 Result:
Yes — async-stream provides two macros:
- try_stream! — for fallible streams; it sets Item = Result<T, E> and supports using the ? operator. [1][2]
- stream! — for non-fallible streams; it yields plain values and does not support using ? to propagate errors. [1][2]
Sources: async-stream docs and README. [1] https://docs.rs/async-stream (crate page) [2] https://github.com/tokio-rs/async-stream (README)
Use try_stream! instead of stream! when using ? and emitting Result items, and surface decode errors
Two issues in the FS-backed stream:
-
async_stream::stream!+?withResultitemsThe body uses
?on async/IO operations and yieldsOk(message), i.e.Item = Result<String, ClientError>, but the macro isstream!. Per the async-stream docs,try_stream!is for fallible streams withItem = Result<T, E>and supports the?operator, whilestream!is for non-fallible streams and does not support?to propagate errors. Withstream!, this will either fail to compile or not propagate errors as intended.Recommended change:
-use async_stream::stream;
+use async_stream::try_stream;
@@
-
let stream = stream! {
-
let stream = try_stream! { let mut entries = tokio::fs::read_dir(search_job_output_dir).await?; while let Some(entry) = entries.next_entry().await? { let search_result_path = entry.path(); let reader = std::fs::File::open(search_result_path)?; let mut deserializer = rmp_serde::Deserializer::new(reader);
-
while let Ok(event) = Deserialize::deserialize(&mut deserializer) {
-
while let Ok(event) = Deserialize::deserialize(&mut deserializer) { let event: (i64, String, String, String, i64) = event; let message = event.1; yield Ok(message); } } };This keeps the public return type the same (`impl Stream<Item = Result<String, ClientError>>`) while ensuring IO errors become `Err(ClientError)` items rather than panics or compile errors.
-
Decode errors are silently treated as end-of-stream
while let Ok(event) = Deserialize::deserialize(&mut deserializer)will break the loop on anyrmp_serde::decode::Error, including genuinely malformed MessagePack, and simply stop yielding items. That can truncate results without signalling an error.Since you have a
ClientError::MalformedDataand anIsMalformedDataimplementation forrmp_serde::decode::Error, consider distinguishing EOF from other decode failures, e.g.:loop { match Deserialize::deserialize(&mut deserializer) { Ok(event) => { let event: (i64, String, String, String, i64) = event; let message = event.1; yield Ok(message); } Err(err) if err.is_eof() => break, Err(err) => yield Err(err.into()), } }
This way, truly corrupted files surface as
Err(ClientError::MalformedData)rather than silent truncation.
🤖 Prompt for AI Agents
In components/api-server/src/client.rs around lines 202 to 214, the stream body
currently uses async_stream::stream! with `?` and yields Result items and also
swallows decode errors; change the macro to async_stream::try_stream! so `?`
propagates as Err items, and replace the `while let Ok(event) =
Deserialize::deserialize(&mut deserializer)` loop with a match that
distinguishes EOF from other decode failures (on Ok(event) yield the message, on
Err(err) if err.is_eof() break, else yield Err(err.into())), ensuring IO and
decode errors become Err(ClientError) stream items rather than silently ending
the stream.
LinZhihao-723
left a comment
There was a problem hiding this comment.
The enum implementation is clear now, lgtm.
The comments are mostly about docstrings and naming conventions.
| &self, | ||
| search_job_id: u64, | ||
| ) -> impl Stream<Item = Result<String, ClientError>> + use<> { | ||
| let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else { |
There was a problem hiding this comment.
Nit: directory -> stream_storage_dir?
directory seems too general.
| while let Ok(event) = Deserialize::deserialize(&mut deserializer) { | ||
| let event: (i64, String, String, String, i64) = event; | ||
| let message = event.1; | ||
| yield Ok(message); | ||
| } |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
Co-authored-by: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (5)
components/api-server/src/client.rs (5)
128-162: Updatefetch_resultsdocstring to matchSearchResultStreamand String messages.The docstring still talks about “parsed JSON values” and Mongo-only error paths, but
fetch_resultsnow returns aSearchResultStream<..>ofResult<String, ClientError>that can be backed by either files or Mongo. Please update the docs to describe the two backends (FilevsMongo) and the fact that each item is a log messageString, and move Mongo/FS-specific error details into the respective helper docs.
222-235: Usetry_stream!for fallible FS streams and surface decode errors.This stream body uses
?on fallible ops (read_dir,next_entry,File::open) and yieldsResult<String, ClientError>items, but it is wrapped inasync_stream::stream!. Per async-stream’s design,try_stream!is the macro intended for fallible streams using?andItem = Result<T, E>; usingstream!here is likely a compile-time error or, at best, wrong error propagation. The decode loop also treats anyrmp_serdedecode error as EOF and silently truncates the stream.Consider:
- Switching to
try_stream!so?producesErr(ClientError)items instead of panics/compile failures.- Matching on
Deserialize::deserialize(&mut deserializer)and distinguishing normal EOF from malformed data, using your existingClientError::MalformedData/IsMalformedDataplumbing so corrupted files yield an error item instead of silently ending the stream.Example (partial) change:
- use async_stream::stream; + use async_stream::try_stream; @@ - let stream = stream! { + let stream = try_stream! { let mut entries = tokio::fs::read_dir(search_job_output_dir).await?; while let Some(entry) = entries.next_entry().await? { let search_result_path = entry.path(); let reader = std::fs::File::open(search_result_path)?; let mut deserializer = rmp_serde::Deserializer::new(reader); - while let Ok(event) = Deserialize::deserialize(&mut deserializer) { - let event: (i64, String, String, String, i64) = event; - let message = event.1; - yield Ok(message); - } + // TODO: handle EOF vs malformed data explicitly and map non-EOF errors to ClientError. + while let Ok(event) = Deserialize::deserialize(&mut deserializer) { + let event: (i64, String, String, String, i64) = event; + let message = event.1; + yield Ok(message); + } } };Confirm in the async-stream (v0.3.x) docs that `try_stream!` is the macro intended for fallible streams with `Item = Result<T, E>` and use of the `?` operator, and that `stream!` is for infallible streams that do not propagate `?` as `Err` items.
223-227: Blocking FS I/O and non-deterministic directory ordering are acceptable for now but may bite at scale.This path uses
std::fs::File::open(blocking) on the Tokio worker thread and iteratestokio::fs::read_dirwithout sorting entries, so:
- Large result files could block the reactor if deserialisation is slow.
- Result ordering may differ between platforms/FS implementations.
If you expect large files or care about strict ordering, consider offloading deserialisation to
spawn_blockingand/or collecting and sorting paths before reading. Not a correctness bug for small result sets, just something to keep in mind.
228-231: Tuple-based deserialisation is still brittle; consider a named struct or at least documenting the layout.Deserialising into
(i64, String, String, String, i64)and then usingevent.1is hard to read and tightly couples this code to the exact positional layout of the MessagePack tuple. Even if you must keep the tuple format for on-the-wire efficiency, wrapping it in a smallEventstruct (via#[serde(from)]) or adding a clear comment describing each element would improve maintainability and make future format changes less error-prone.
238-265:fetch_results_from_mongodocstring is out of date (no JSON parsing anymore).The implementation now maps the
"message"BSON field directly to aStringand does not callserde_json::from_str, yet the docstring still claims it returns “parsed JSON values” and forwardsserde_json::from_strerrors. Please update the docs to say that each item is a log messageStringand list only the actual error cases:ClientError::MalformedDatafor missing/non-string"message"and forwardedmongodb::error::Errorfrom the cursor andCollection::find.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/api-server/src/client.rs(4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.
Applied to files:
components/api-server/src/client.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: lint-check (ubuntu-24.04)
- GitHub Check: rust-checks
🔇 Additional comments (3)
components/api-server/src/client.rs (3)
180-189: Routing on persistedwrite_to_fileflag looks correct.Using
get_job_configto branch on the storedwrite_to_fileflag and returningSearchResultStream::Filevs::Mongokeeps the API clean and ensures the decision matches the job’s original configuration. This separation into helper methods is clear.
310-319:get_job_confighelper cleanly encapsulates MsgPack deserialisation.Reading the
job_configcolumn and deserialising withrmp_serde::from_sliceintoSearchJobConfigcentralises this logic and keepsfetch_resultsfocused on routing. This looks correct and makes future config schema changes easier to manage.
322-359:SearchResultStreamenum +pin_projectis a solid abstraction over FS and Mongo streams.The enum-based wrapper with explicit
FileStream/MongoStreamtype parameters and aStreamimpl that delegatespoll_nextand logs errors removes the earlier unsafe projection while keeping the SSE-facing type concrete. This is clear and idiomatic for combining multiple backend streams.
| let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else { | ||
| todo!("Outputting query results to S3 is not supported for now."); | ||
| }; |
There was a problem hiding this comment.
🧩 Analysis chain
Replace todo!() for S3 output with a proper ClientError instead of panicking.
If StreamOutputStorage::S3 is ever configured, this todo!() will panic the server at runtime. Even if S3-backed streaming is not supported yet, it should return a structured ClientError (e.g., “unsupported output storage for streaming”) so misconfiguration fails gracefully.
🏁 Script executed:
#!/bin/bash
# Inspect where S3 storage is used to assess how reachable this panic is.
rg -n "StreamOutputStorage::S3" -SLength of output: 0
I need to investigate further to determine whether the StreamOutputStorage::S3 variant actually exists and how reachable the panic is.
🏁 Script executed:
#!/bin/bash
# Search for StreamOutputStorage enum definition
rg -n "enum StreamOutputStorage" -A 10Length of output: 936
🏁 Script executed:
#!/bin/bash
# Search more broadly for StreamOutputStorage without case sensitivity constraints
rg -n "StreamOutputStorage" -C 3Length of output: 2816
🏁 Script executed:
#!/bin/bash
# Look for storage config in general
rg -n "\.storage" -C 2Length of output: 40829
🏁 Script executed:
#!/bin/bash
# Check if client.rs file exists and search for relevant code
find . -name "client.rs" -type f | head -5Length of output: 128
🏁 Script executed:
#!/bin/bash
# Search for S3 references with case-insensitive matching
rg -in "s3" | head -20Length of output: 1696
🏁 Script executed:
#!/bin/bash
# Look for the config structure definition
rg -n "struct.*config" -i -C 2 | head -40Length of output: 3074
🏁 Script executed:
#!/bin/bash
# Search for StreamOutputStorage import/definition in related files
fd -e rs | xargs grep -l "StreamOutputStorage" 2>/dev/nullLength of output: 94
🏁 Script executed:
#!/bin/bash
# Look at components/api-server/src/ directory structure
find components/api-server/src -name "*.rs" -type fLength of output: 147
🏁 Script executed:
#!/bin/bash
# Find and examine the config structures
rg -n "stream_output" -C 5 components/api-server/src/Length of output: 966
🏁 Script executed:
#!/bin/bash
# Get the actual lines 216-218 from client.rs to verify the issue
sed -n '210,225p' components/api-server/src/client.rsLength of output: 767
🏁 Script executed:
#!/bin/bash
# Search for ClientError to understand error handling patterns
rg -n "ClientError" -A 2 -B 2 components/api-server/src/client.rs | head -50Length of output: 2470
🏁 Script executed:
#!/bin/bash
# Look for all error types defined
rg -n "enum.*Error\|struct.*Error" components/api-server/src/ -A 3Length of output: 0
🏁 Script executed:
#!/bin/bash
# Check for todo! macros in the codebase to see if there are other examples
rg -n "todo!\(" components/api-server/src/Length of output: 118
🏁 Script executed:
#!/bin/bash
# Look for stream_output storage definition/config
rg -n "storage:" -C 5 components/api-server/src/Length of output: 0
🏁 Script executed:
#!/bin/bash
# Check what enum variants are supported
rg -n "Fs\|S3" components/api-server/src/ | head -20Length of output: 0
🏁 Script executed:
#!/bin/bash
# Look in config or types files for storage enum
find components/api-server -name "*.rs" | xargs grep -l "StreamOutputStorage\|OutputStorage" 2>/dev/nullLength of output: 35
🏁 Script executed:
#!/bin/bash
# Get comprehensive view of the storage enum and match pattern
rg -n "StreamOutputStorage" -B 20 -A 20 components/api-server/src/Length of output: 5043
🏁 Script executed:
#!/bin/bash
# Look at what config sections exist
rg -n "struct.*Config\|pub.*config" -C 3 components/api-server/src/client.rsLength of output: 0
🏁 Script executed:
#!/bin/bash
# Check if there's a config or config module
find components/api-server -name "*config*" -type fLength of output: 0
🏁 Script executed:
#!/bin/bash
# Examine the complete context around line 216
sed -n '200,230p' components/api-server/src/client.rsLength of output: 1437
🏁 Script executed:
#!/bin/bash
# Search across all API server source for the complete enum
find components/api-server/src -name "*.rs" | xargs grep -B 5 -A 15 "enum StreamOutputStorage" 2>/dev/nullLength of output: 0
🏁 Script executed:
#!/bin/bash
# Look for config module or types
ls -la components/api-server/src/Length of output: 343
🏁 Script executed:
#!/bin/bash
# Check if config is in a separate file
find components/api-server -name "*.rs" | xargs grep "StreamOutputStorage::" 2>/dev/nullLength of output: 129
🏁 Script executed:
#!/bin/bash
# Broad search for where stream output is configured
rg -n "stream_output" -C 10Length of output: 80126
🏁 Script executed:
#!/bin/bash
# Final comprehensive search across entire repo
find . -type f -name "*.rs" | xargs grep -l "StreamOutputStorage" 2>/dev/nullLength of output: 98
🏁 Script executed:
#!/bin/bash
# Look for the exact file and context
cat components/api-server/src/client.rs | grep -n "StreamOutputStorage" -C 15 || trueLength of output: 2271
🏁 Script executed:
#!/bin/bash
# Try finding where this enum might be defined (lib.rs, mod.rs, or other)
find components/api-server -name "*.rs" -exec grep -H "enum.*Storage" {} \; | grep -i stream || trueLength of output: 0
🏁 Script executed:
#!/bin/bash
# Get a list of all .rs files under components/api-server/src
find components/api-server/src -type f -name "*.rs" -exec echo "File: {}" \; -exec wc -l {} \;Length of output: 332
🏁 Script executed:
#!/bin/bash
# Search for the full module structure
find components/api-server/src -type f -name "*.rs" | sortLength of output: 147
🏁 Script executed:
#!/bin/bash
# Exhaustively check the relevant files
for file in $(find components/api-server/src -name "*.rs"); do
echo "=== $file ==="
grep -n "StreamOutputStorage\|stream_output\|todo!" "$file" | head -20 || echo "No matches"
doneLength of output: 424
🏁 Script executed:
#!/bin/bash
# Focus on the specific line range mentioned
if [ -f "components/api-server/src/client.rs" ]; then
head -220 components/api-server/src/client.rs | tail -30
fiLength of output: 1111
🏁 Script executed:
#!/bin/bash
# Look at actual file content around the suspect code
sed -n '1,250p' components/api-server/src/client.rs | tail -100Length of output: 3893
🏁 Script executed:
#!/bin/bash
# Find any storage-related enum definitions
rg -n "enum.*Storage\|enum.*Output" --type rsLength of output: 26
🏁 Script executed:
#!/bin/bash
# Look at lines 200-300 of client.rs
tail -n +200 components/api-server/src/client.rs | head -100Length of output: 3825
🏁 Script executed:
#!/bin/bash
# Check if the todo!() is actually present
grep -n "todo!" components/api-server/src/client.rsLength of output: 82
🏁 Script executed:
#!/bin/bash
# Verify file exists and basic structure
head -5 components/api-server/src/client.rs && echo "..." && tail -5 components/api-server/src/client.rsLength of output: 211
🏁 Script executed:
#!/bin/bash
# Examine error.rs to see what ClientError variants exist
cat components/api-server/src/error.rsLength of output: 1310
🏁 Script executed:
#!/bin/bash
# Check if there are any other places where S3 storage validation is done
grep -rn "S3" components/api-server/src/ -C 3Length of output: 649
🏁 Script executed:
#!/bin/bash
# Look for similar error patterns in the codebase
grep -rn "Unsupported\|unsupported\|UnsupportedStorage\|NotSupported" components/api-server/src/Length of output: 0
🏁 Script executed:
#!/bin/bash
# Check how other code handles S3 vs FS config decisions
grep -rn "StorageType::\|StreamOutputStorage::" components/api-server/src/ -A 2 -B 2Length of output: 478
Replace todo!() with proper error handling for unsupported S3 stream output storage.
The StreamOutputStorage::S3 variant exists and is configurable (as evidenced by the enum definition and configuration documentation), but the code at client.rs:216-218 panics instead of returning a structured error when S3 is configured. This should fail gracefully by returning a ClientError. You'll need to add an appropriate error variant to the ClientError enum (e.g., UnsupportedStreamOutputStorage or similar) and return a stream that yields this error.
🤖 Prompt for AI Agents
In components/api-server/src/client.rs around lines 216-218, replace the
panic-inducing todo!() for S3 stream output with structured error handling: add
a new ClientError variant (e.g., UnsupportedStreamOutputStorage) to the
ClientError enum and implement any needed Display/From/serde traits used in the
crate; then change the match branch to return a stream that yields that error
(for example use futures::stream::once(async {
Err(ClientError::UnsupportedStreamOutputStorage) }) or the project's equivalent)
so callers receive a ClientError instead of panicking; also update any error
conversions/tests that assume the new variant where necessary.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
components/api-server/src/client.rs (3)
3-3: Importtry_stream!instead ofstream!for Result-yielding streams.The import of
async_stream::streamshould beasync_stream::try_stream, as the file-based stream at line 222 uses?and yieldsResultitems. Per the async-stream documentation,try_stream!is designed for fallible streams withItem = Result<T, E>and supports the?operator, whilestream!does not.Apply this diff:
-use async_stream::stream; +use async_stream::try_stream;
216-218: Replacetodo!()with proper error handling for S3 storage.The
todo!()macro will cause a panic at runtime ifStreamOutputStorage::S3is configured—this is a production blocker. This issue was previously flagged but remains unaddressed.You must add a new error variant to
ClientError(e.g.,UnsupportedOperation(String)) incomponents/api-server/src/error.rsand return it here.First, add to
components/api-server/src/error.rs:pub enum ClientError { // ... existing variants ... #[error("Malformed data")] MalformedData, + + #[error("{0}")] + UnsupportedOperation(String), }Then in this file:
let StreamOutputStorage::Fs { directory } = &self.config.stream_output.storage else { - todo!("Outputting query results to S3 is not supported for now."); + panic!("S3-based query result streaming is not yet supported"); };Note: Using
panic!with a clear message is acceptable here given the# Panicssection in the docstring at line 209. Alternatively, restructure to returnResultand use the new error variant.
222-234: Usetry_stream!macro and properly handle decode errors.Two critical issues in the stream implementation:
Using
stream!with?andResultitems: The body uses?on async/IO operations and yieldsOk(message), but the macro isstream!. Per async-stream documentation,try_stream!is required for fallible streams withItem = Result<T, E>that use the?operator. Withstream!, errors will not propagate correctly.Decode errors silently truncate results: The
while let Ok(event) = Deserialize::deserialize(&mut deserializer)loop breaks on anyrmp_serde::decode::Error, including genuinely malformed data, without signalling an error. This can silently truncate results.Both issues were previously flagged but remain unaddressed.
Apply this diff:
- let stream = stream! { + let stream = try_stream! { let mut entries = tokio::fs::read_dir(search_job_output_dir).await?; while let Some(entry) = entries.next_entry().await? { let search_result_path = entry.path(); let reader = std::fs::File::open(search_result_path)?; let mut deserializer = rmp_serde::Deserializer::new(reader); - while let Ok(event) = Deserialize::deserialize(&mut deserializer) { - let event: (i64, String, String, String, i64) = event; - let message = event.1; - yield Ok(message); + loop { + match Deserialize::deserialize(&mut deserializer) { + Ok(event) => { + let event: (i64, String, String, String, i64) = event; + let message = event.1; + yield message; + } + Err(err) if err.is_eof() => break, + Err(err) => Err(err)?, + } } } };Note: With
try_stream!, you yield the unwrapped value directly (notOk(message)), and errors are propagated via?.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/api-server/src/client.rs(4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.
Applied to files:
components/api-server/src/client.rs
🪛 GitHub Actions: clp-rust-checks
components/api-server/src/client.rs
[error] 238-238: Rust Clippy: doc-markdown error. item in documentation is missing backticks for MongoDB in the doc comment.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: package-image
🔇 Additional comments (3)
components/api-server/src/client.rs (3)
180-189: LGTM: Clean routing logic.The routing logic is clear and well-structured. Using early return for the file-based path keeps indentation manageable, and the separation into
fetch_results_from_fileandfetch_results_from_mongoimproves readability.
310-319: LGTM: Clean helper method.The
get_job_confighelper is straightforward and properly encapsulates the database query and MessagePack deserialization logic.
322-360: LGTM: Well-designed stream abstraction.The
SearchResultStreamenum usingpin_project!is a clean solution that addresses all previous concerns:
- Removes the need for
unsafeprojection- Eliminates
Mutexoverhead- Provides clear variant names (
FileandMongo)- Includes comprehensive documentation
- Properly delegates
poll_nextto inner streams with error loggingThis is a solid implementation.
"directory" is the field name in the yaml file. It might not be ideal to rename it. |
LinZhihao-723
left a comment
There was a problem hiding this comment.
For the PR title, how about:
feat(api-server): Add support for streaming query results from files.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (3)
components/api-server/src/client.rs (3)
244-246: [DUPLICATE] Critical: Replacetodo!()with proper error handling for S3.This issue has been extensively flagged in previous reviews. The
todo!()will panic at runtime ifStreamOutputStorage::S3is configured. You must add an appropriateClientErrorvariant (e.g.,UnsupportedOperationorUnsupportedStreamOutputStorage) and return that error instead of panicking.
250-263: [DUPLICATE] Critical: Usetry_stream!for fallible streams with?operator.This issue has been extensively flagged in previous reviews. The
stream!macro does not support the?operator for error propagation in fallible streams. You must change totry_stream!to properly propagate IO errors asErr(ClientError)stream items.Additionally, the deserialization loop at lines 256-260 silently ends the stream on any decode error, which can truncate results without signaling corruption.
As noted in previous reviews, apply this fix:
-use async_stream::stream; +use async_stream::try_stream;And at line 250:
- let stream = stream! { + let stream = try_stream! { let mut entries = tokio::fs::read_dir(search_job_output_dir).await?; while let Some(entry) = entries.next_entry().await? { let search_result_path = entry.path(); let reader = std::fs::File::open(search_result_path)?; let mut deserializer = rmp_serde::Deserializer::new(reader); - while let Ok(event_tuple) = Deserialize::deserialize(&mut deserializer) { + loop { + match Deserialize::deserialize(&mut deserializer) { + Ok(event_tuple) => { - let event_tuple: EventTuple = event_tuple; - let event: Event = event_tuple.into(); - yield Ok(event.message); + let event_tuple: EventTuple = event_tuple; + let event: Event = event_tuple.into(); + yield event.message; + } + Err(err) if err.is_eof() => break, + Err(err) => yield Err(err.into()), + } } } };
266-266: [DUPLICATE] Minor: Add backticks aroundMongoDBto fix clippy doc-markdown error.This has been flagged in previous reviews and is causing a pipeline failure.
- /// Asynchronously fetches results of a completed search job from MongoDB. + /// Asynchronously fetches results of a completed search job from `MongoDB`.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (1)
components/api-server/src/client.rs(4 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-22T21:01:31.391Z
Learnt from: junhaoliao
Repo: y-scope/clp PR: 0
File: :0-0
Timestamp: 2025-10-22T21:01:31.391Z
Learning: In y-scope/clp, for the clp-rust-checks workflow (GitHub Actions) and the Taskfile target deps:lock:check-rust, we should verify Rust Cargo.lock is in sync with Cargo.toml using a non-mutating method (e.g., cargo metadata --locked / cargo check --locked) to keep CI deterministic and avoid updating dependencies during validation.
Applied to files:
components/api-server/src/client.rs
📚 Learning: 2024-10-08T15:52:50.753Z
Learnt from: AVMatthews
Repo: y-scope/clp PR: 543
File: components/core/src/clp_s/clp-s.cpp:196-265
Timestamp: 2024-10-08T15:52:50.753Z
Learning: In `clp-s.cpp`, the `run_serializer` function interleaves serialization and writing of IR files, making it difficult to restructure it into separate functions.
Applied to files:
components/api-server/src/client.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: package-image
- GitHub Check: lint-check (macos-15)
- GitHub Check: rust-checks
- GitHub Check: lint-check (ubuntu-24.04)
🔇 Additional comments (5)
components/api-server/src/client.rs (5)
19-36: LGTM: QueryConfig enhanced with validation and write_to_file field.The addition of
#[serde(deny_unknown_fields)]provides better validation by catching typos or deprecated fields in query configs. Thewrite_to_filefield properly integrates with the dual-backend streaming mechanism.
129-182: LGTM: fetch_results properly routes to File or Mongo backends.The refactored method signature and routing logic cleanly handle the dual-backend streaming. The early return pattern for the File variant improves readability and addresses previous feedback about nested if-else returns.
315-353: LGTM: SearchResultStream enum properly abstracts the dual backends.The
pin_project!macro provides safe pinning projection, addressing previous concerns about unsafe code. The enum design cleanly unifies File and Mongo streams while maintaining type safety and theSendrequirement for SSE. Error logging in theStreamimplementation provides good observability.
355-377: LGTM: EventTuple and Event types improve readability.The type alias and struct with
Fromimplementation provide a good balance between the compact tuple serialization format used by clp-s and code readability. The named fields inEventmake it clear what each tuple position represents, addressing previous concerns about positional access.
247-249: Code path construction is correct; no defects found.The
directoryfield is aStringsourced from the YAML config, with documented example showing relative path format ("var/data/streams"). The RustPath::join()method correctly handles this:Path::new("/").join("var/data/streams")produces/var/data/streams. The pattern is sound and will work correctly with container volume mounts when the config directory value matches the mounted path.
| /// # Returns | ||
| /// | ||
| /// A stream of the job's results on success. Each item in the stream is a [`Result`] that: | ||
| /// | ||
| /// ## Returns | ||
| /// | ||
| /// A parsed JSON value representing a search result on success. | ||
| /// | ||
| /// ## Errors | ||
| /// | ||
| /// Returns an error if: | ||
| /// | ||
| /// * [`ClientError::MalformedData`] if a retrieved document does not contain a "message" field, | ||
| /// or if the "message" field is not a BSON string. | ||
| /// * Forwards [`mongodb::error::Error`] produced by the `MongoDB` cursor item access. | ||
| /// * Forwards [`serde_json::from_str`]'s return values on failure. | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns an error if: | ||
| /// | ||
| /// * Forwards [`mongodb::Collection::find`]'s return values on failure. |
There was a problem hiding this comment.
Fix duplicate "# Errors" sections in docstring.
The docstring has two "# Errors" sections (lines 276 and 285), which is incorrect rustdoc formatting. The second one at line 285 should be removed since the error conditions are already documented in the first "## Errors" section under "Returns".
Apply this diff:
/// ## Errors
///
/// Returns an error if:
///
/// * [`ClientError::MalformedData`] if a retrieved document does not contain a "message" field,
/// or if the "message" field is not a BSON string.
/// * Forwards [`mongodb::error::Error`] produced by the `MongoDB` cursor item access.
/// * Forwards [`serde_json::from_str`]'s return values on failure.
- ///
- /// # Errors
- ///
- /// Returns an error if:
- ///
- /// * Forwards [`mongodb::Collection::find`]'s return values on failure.
async fn fetch_results_from_mongo(Note: Consolidate all error conditions under a single "## Errors" section (nested under "# Returns") if they apply to the stream items, or use "# Errors" at the top level for errors returned by the async function itself.
🤖 Prompt for AI Agents
In components/api-server/src/client.rs around lines 268 to 289, remove the
duplicate top-level "# Errors" block (the second one starting at ~line 285) and
consolidate its error bullet (forwards mongodb::Collection::find's return
values) into the existing "## Errors" subsection under "# Returns" if that error
applies to stream items; otherwise move that single error to a single top-level
"# Errors" section. Ensure only one Errors section exists and that all error
conditions are documented in the appropriate location.
Description
Checklist
breaking change.
Validation performed
postgresqlcompressed to datasetpostgresql.{"query_string": "line_num: 4", "dataset": "postgresql", "write_to_file": true}tohttp://localhost:3001/queryResponse:
{"query_results_uri":"/query_results/10"}http://localhost:3001/query_results/10. The result is correct.Summary by CodeRabbit
New Features
Improvements
Chores
✏️ Tip: You can customize this high-level summary in your review settings.